Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * net/tipc/group.c: TIPC group messaging code
0003  *
0004  * Copyright (c) 2017, Ericsson AB
0005  * Copyright (c) 2020, Red Hat Inc
0006  * All rights reserved.
0007  *
0008  * Redistribution and use in source and binary forms, with or without
0009  * modification, are permitted provided that the following conditions are met:
0010  *
0011  * 1. Redistributions of source code must retain the above copyright
0012  *    notice, this list of conditions and the following disclaimer.
0013  * 2. Redistributions in binary form must reproduce the above copyright
0014  *    notice, this list of conditions and the following disclaimer in the
0015  *    documentation and/or other materials provided with the distribution.
0016  * 3. Neither the names of the copyright holders nor the names of its
0017  *    contributors may be used to endorse or promote products derived from
0018  *    this software without specific prior written permission.
0019  *
0020  * Alternatively, this software may be distributed under the terms of the
0021  * GNU General Public License ("GPL") version 2 as published by the Free
0022  * Software Foundation.
0023  *
0024  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
0025  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
0026  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
0027  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
0028  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
0029  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
0030  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
0031  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
0032  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
0033  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
0034  * POSSIBILITY OF SUCH DAMAGE.
0035  */
0036 
0037 #include "core.h"
0038 #include "addr.h"
0039 #include "group.h"
0040 #include "bcast.h"
0041 #include "topsrv.h"
0042 #include "msg.h"
0043 #include "socket.h"
0044 #include "node.h"
0045 #include "name_table.h"
0046 #include "subscr.h"
0047 
0048 #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
0049 #define ADV_IDLE ADV_UNIT
0050 #define ADV_ACTIVE (ADV_UNIT * 12)
0051 
0052 enum mbr_state {
0053     MBR_JOINING,
0054     MBR_PUBLISHED,
0055     MBR_JOINED,
0056     MBR_PENDING,
0057     MBR_ACTIVE,
0058     MBR_RECLAIMING,
0059     MBR_REMITTED,
0060     MBR_LEAVING
0061 };
0062 
0063 struct tipc_member {
0064     struct rb_node tree_node;
0065     struct list_head list;
0066     struct list_head small_win;
0067     struct sk_buff_head deferredq;
0068     struct tipc_group *group;
0069     u32 node;
0070     u32 port;
0071     u32 instance;
0072     enum mbr_state state;
0073     u16 advertised;
0074     u16 window;
0075     u16 bc_rcv_nxt;
0076     u16 bc_syncpt;
0077     u16 bc_acked;
0078 };
0079 
0080 struct tipc_group {
0081     struct rb_root members;
0082     struct list_head small_win;
0083     struct list_head pending;
0084     struct list_head active;
0085     struct tipc_nlist dests;
0086     struct net *net;
0087     int subid;
0088     u32 type;
0089     u32 instance;
0090     u32 scope;
0091     u32 portid;
0092     u16 member_cnt;
0093     u16 active_cnt;
0094     u16 max_active;
0095     u16 bc_snd_nxt;
0096     u16 bc_ackers;
0097     bool *open;
0098     bool loopback;
0099     bool events;
0100 };
0101 
0102 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
0103                   int mtyp, struct sk_buff_head *xmitq);
0104 
0105 static void tipc_group_open(struct tipc_member *m, bool *wakeup)
0106 {
0107     *wakeup = false;
0108     if (list_empty(&m->small_win))
0109         return;
0110     list_del_init(&m->small_win);
0111     *m->group->open = true;
0112     *wakeup = true;
0113 }
0114 
0115 static void tipc_group_decr_active(struct tipc_group *grp,
0116                    struct tipc_member *m)
0117 {
0118     if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING ||
0119         m->state == MBR_REMITTED)
0120         grp->active_cnt--;
0121 }
0122 
0123 static int tipc_group_rcvbuf_limit(struct tipc_group *grp)
0124 {
0125     int max_active, active_pool, idle_pool;
0126     int mcnt = grp->member_cnt + 1;
0127 
0128     /* Limit simultaneous reception from other members */
0129     max_active = min(mcnt / 8, 64);
0130     max_active = max(max_active, 16);
0131     grp->max_active = max_active;
0132 
0133     /* Reserve blocks for active and idle members */
0134     active_pool = max_active * ADV_ACTIVE;
0135     idle_pool = (mcnt - max_active) * ADV_IDLE;
0136 
0137     /* Scale to bytes, considering worst-case truesize/msgsize ratio */
0138     return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4;
0139 }
0140 
0141 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
0142 {
0143     return grp->bc_snd_nxt;
0144 }
0145 
0146 static bool tipc_group_is_receiver(struct tipc_member *m)
0147 {
0148     return m && m->state != MBR_JOINING && m->state != MBR_LEAVING;
0149 }
0150 
0151 static bool tipc_group_is_sender(struct tipc_member *m)
0152 {
0153     return m && m->state != MBR_JOINING && m->state != MBR_PUBLISHED;
0154 }
0155 
0156 u32 tipc_group_exclude(struct tipc_group *grp)
0157 {
0158     if (!grp->loopback)
0159         return grp->portid;
0160     return 0;
0161 }
0162 
0163 struct tipc_group *tipc_group_create(struct net *net, u32 portid,
0164                      struct tipc_group_req *mreq,
0165                      bool *group_is_open)
0166 {
0167     u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS;
0168     bool global = mreq->scope != TIPC_NODE_SCOPE;
0169     struct tipc_group *grp;
0170     u32 type = mreq->type;
0171 
0172     grp = kzalloc(sizeof(*grp), GFP_ATOMIC);
0173     if (!grp)
0174         return NULL;
0175     tipc_nlist_init(&grp->dests, tipc_own_addr(net));
0176     INIT_LIST_HEAD(&grp->small_win);
0177     INIT_LIST_HEAD(&grp->active);
0178     INIT_LIST_HEAD(&grp->pending);
0179     grp->members = RB_ROOT;
0180     grp->net = net;
0181     grp->portid = portid;
0182     grp->type = type;
0183     grp->instance = mreq->instance;
0184     grp->scope = mreq->scope;
0185     grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
0186     grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
0187     grp->open = group_is_open;
0188     *grp->open = false;
0189     filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE;
0190     if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0,
0191                     filter, &grp->subid))
0192         return grp;
0193     kfree(grp);
0194     return NULL;
0195 }
0196 
0197 void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf)
0198 {
0199     struct rb_root *tree = &grp->members;
0200     struct tipc_member *m, *tmp;
0201     struct sk_buff_head xmitq;
0202 
0203     __skb_queue_head_init(&xmitq);
0204     rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
0205         tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq);
0206         tipc_group_update_member(m, 0);
0207     }
0208     tipc_node_distr_xmit(net, &xmitq);
0209     *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
0210 }
0211 
0212 void tipc_group_delete(struct net *net, struct tipc_group *grp)
0213 {
0214     struct rb_root *tree = &grp->members;
0215     struct tipc_member *m, *tmp;
0216     struct sk_buff_head xmitq;
0217 
0218     __skb_queue_head_init(&xmitq);
0219 
0220     rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
0221         tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq);
0222         __skb_queue_purge(&m->deferredq);
0223         list_del(&m->list);
0224         kfree(m);
0225     }
0226     tipc_node_distr_xmit(net, &xmitq);
0227     tipc_nlist_purge(&grp->dests);
0228     tipc_topsrv_kern_unsubscr(net, grp->subid);
0229     kfree(grp);
0230 }
0231 
0232 static struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
0233                           u32 node, u32 port)
0234 {
0235     struct rb_node *n = grp->members.rb_node;
0236     u64 nkey, key = (u64)node << 32 | port;
0237     struct tipc_member *m;
0238 
0239     while (n) {
0240         m = container_of(n, struct tipc_member, tree_node);
0241         nkey = (u64)m->node << 32 | m->port;
0242         if (key < nkey)
0243             n = n->rb_left;
0244         else if (key > nkey)
0245             n = n->rb_right;
0246         else
0247             return m;
0248     }
0249     return NULL;
0250 }
0251 
0252 static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp,
0253                         u32 node, u32 port)
0254 {
0255     struct tipc_member *m;
0256 
0257     m = tipc_group_find_member(grp, node, port);
0258     if (m && tipc_group_is_receiver(m))
0259         return m;
0260     return NULL;
0261 }
0262 
0263 static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
0264                         u32 node)
0265 {
0266     struct tipc_member *m;
0267     struct rb_node *n;
0268 
0269     for (n = rb_first(&grp->members); n; n = rb_next(n)) {
0270         m = container_of(n, struct tipc_member, tree_node);
0271         if (m->node == node)
0272             return m;
0273     }
0274     return NULL;
0275 }
0276 
0277 static int tipc_group_add_to_tree(struct tipc_group *grp,
0278                   struct tipc_member *m)
0279 {
0280     u64 nkey, key = (u64)m->node << 32 | m->port;
0281     struct rb_node **n, *parent = NULL;
0282     struct tipc_member *tmp;
0283 
0284     n = &grp->members.rb_node;
0285     while (*n) {
0286         tmp = container_of(*n, struct tipc_member, tree_node);
0287         parent = *n;
0288         tmp = container_of(parent, struct tipc_member, tree_node);
0289         nkey = (u64)tmp->node << 32 | tmp->port;
0290         if (key < nkey)
0291             n = &(*n)->rb_left;
0292         else if (key > nkey)
0293             n = &(*n)->rb_right;
0294         else
0295             return -EEXIST;
0296     }
0297     rb_link_node(&m->tree_node, parent, n);
0298     rb_insert_color(&m->tree_node, &grp->members);
0299     return 0;
0300 }
0301 
0302 static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
0303                             u32 node, u32 port,
0304                             u32 instance, int state)
0305 {
0306     struct tipc_member *m;
0307     int ret;
0308 
0309     m = kzalloc(sizeof(*m), GFP_ATOMIC);
0310     if (!m)
0311         return NULL;
0312     INIT_LIST_HEAD(&m->list);
0313     INIT_LIST_HEAD(&m->small_win);
0314     __skb_queue_head_init(&m->deferredq);
0315     m->group = grp;
0316     m->node = node;
0317     m->port = port;
0318     m->instance = instance;
0319     m->bc_acked = grp->bc_snd_nxt - 1;
0320     ret = tipc_group_add_to_tree(grp, m);
0321     if (ret < 0) {
0322         kfree(m);
0323         return NULL;
0324     }
0325     grp->member_cnt++;
0326     tipc_nlist_add(&grp->dests, m->node);
0327     m->state = state;
0328     return m;
0329 }
0330 
0331 void tipc_group_add_member(struct tipc_group *grp, u32 node,
0332                u32 port, u32 instance)
0333 {
0334     tipc_group_create_member(grp, node, port, instance, MBR_PUBLISHED);
0335 }
0336 
0337 static void tipc_group_delete_member(struct tipc_group *grp,
0338                      struct tipc_member *m)
0339 {
0340     rb_erase(&m->tree_node, &grp->members);
0341     grp->member_cnt--;
0342 
0343     /* Check if we were waiting for replicast ack from this member */
0344     if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1))
0345         grp->bc_ackers--;
0346 
0347     list_del_init(&m->list);
0348     list_del_init(&m->small_win);
0349     tipc_group_decr_active(grp, m);
0350 
0351     /* If last member on a node, remove node from dest list */
0352     if (!tipc_group_find_node(grp, m->node))
0353         tipc_nlist_del(&grp->dests, m->node);
0354 
0355     kfree(m);
0356 }
0357 
0358 struct tipc_nlist *tipc_group_dests(struct tipc_group *grp)
0359 {
0360     return &grp->dests;
0361 }
0362 
0363 void tipc_group_self(struct tipc_group *grp, struct tipc_service_range *seq,
0364              int *scope)
0365 {
0366     seq->type = grp->type;
0367     seq->lower = grp->instance;
0368     seq->upper = grp->instance;
0369     *scope = grp->scope;
0370 }
0371 
0372 void tipc_group_update_member(struct tipc_member *m, int len)
0373 {
0374     struct tipc_group *grp = m->group;
0375     struct tipc_member *_m, *tmp;
0376 
0377     if (!tipc_group_is_receiver(m))
0378         return;
0379 
0380     m->window -= len;
0381 
0382     if (m->window >= ADV_IDLE)
0383         return;
0384 
0385     list_del_init(&m->small_win);
0386 
0387     /* Sort member into small_window members' list */
0388     list_for_each_entry_safe(_m, tmp, &grp->small_win, small_win) {
0389         if (_m->window > m->window)
0390             break;
0391     }
0392     list_add_tail(&m->small_win, &_m->small_win);
0393 }
0394 
0395 void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
0396 {
0397     u16 prev = grp->bc_snd_nxt - 1;
0398     struct tipc_member *m;
0399     struct rb_node *n;
0400     u16 ackers = 0;
0401 
0402     for (n = rb_first(&grp->members); n; n = rb_next(n)) {
0403         m = container_of(n, struct tipc_member, tree_node);
0404         if (tipc_group_is_receiver(m)) {
0405             tipc_group_update_member(m, len);
0406             m->bc_acked = prev;
0407             ackers++;
0408         }
0409     }
0410 
0411     /* Mark number of acknowledges to expect, if any */
0412     if (ack)
0413         grp->bc_ackers = ackers;
0414     grp->bc_snd_nxt++;
0415 }
0416 
0417 bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
0418              int len, struct tipc_member **mbr)
0419 {
0420     struct sk_buff_head xmitq;
0421     struct tipc_member *m;
0422     int adv, state;
0423 
0424     m = tipc_group_find_dest(grp, dnode, dport);
0425     if (!tipc_group_is_receiver(m)) {
0426         *mbr = NULL;
0427         return false;
0428     }
0429     *mbr = m;
0430 
0431     if (m->window >= len)
0432         return false;
0433 
0434     *grp->open = false;
0435 
0436     /* If not fully advertised, do it now to prevent mutual blocking */
0437     adv = m->advertised;
0438     state = m->state;
0439     if (state == MBR_JOINED && adv == ADV_IDLE)
0440         return true;
0441     if (state == MBR_ACTIVE && adv == ADV_ACTIVE)
0442         return true;
0443     if (state == MBR_PENDING && adv == ADV_IDLE)
0444         return true;
0445     __skb_queue_head_init(&xmitq);
0446     tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
0447     tipc_node_distr_xmit(grp->net, &xmitq);
0448     return true;
0449 }
0450 
0451 bool tipc_group_bc_cong(struct tipc_group *grp, int len)
0452 {
0453     struct tipc_member *m = NULL;
0454 
0455     /* If prev bcast was replicast, reject until all receivers have acked */
0456     if (grp->bc_ackers) {
0457         *grp->open = false;
0458         return true;
0459     }
0460     if (list_empty(&grp->small_win))
0461         return false;
0462 
0463     m = list_first_entry(&grp->small_win, struct tipc_member, small_win);
0464     if (m->window >= len)
0465         return false;
0466 
0467     return tipc_group_cong(grp, m->node, m->port, len, &m);
0468 }
0469 
0470 /* tipc_group_sort_msg() - sort msg into queue by bcast sequence number
0471  */
0472 static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq)
0473 {
0474     struct tipc_msg *_hdr, *hdr = buf_msg(skb);
0475     u16 bc_seqno = msg_grp_bc_seqno(hdr);
0476     struct sk_buff *_skb, *tmp;
0477     int mtyp = msg_type(hdr);
0478 
0479     /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */
0480     if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
0481         skb_queue_walk_safe(defq, _skb, tmp) {
0482             _hdr = buf_msg(_skb);
0483             if (!less(bc_seqno, msg_grp_bc_seqno(_hdr)))
0484                 continue;
0485             __skb_queue_before(defq, _skb, skb);
0486             return;
0487         }
0488         /* Bcast was not bypassed, - add to tail */
0489     }
0490     /* Unicasts are never bypassed, - always add to tail */
0491     __skb_queue_tail(defq, skb);
0492 }
0493 
0494 /* tipc_group_filter_msg() - determine if we should accept arriving message
0495  */
0496 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
0497                struct sk_buff_head *xmitq)
0498 {
0499     struct sk_buff *skb = __skb_dequeue(inputq);
0500     bool ack, deliver, update, leave = false;
0501     struct sk_buff_head *defq;
0502     struct tipc_member *m;
0503     struct tipc_msg *hdr;
0504     u32 node, port;
0505     int mtyp, blks;
0506 
0507     if (!skb)
0508         return;
0509 
0510     hdr = buf_msg(skb);
0511     node =  msg_orignode(hdr);
0512     port = msg_origport(hdr);
0513 
0514     if (!msg_in_group(hdr))
0515         goto drop;
0516 
0517     m = tipc_group_find_member(grp, node, port);
0518     if (!tipc_group_is_sender(m))
0519         goto drop;
0520 
0521     if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
0522         goto drop;
0523 
0524     TIPC_SKB_CB(skb)->orig_member = m->instance;
0525     defq = &m->deferredq;
0526     tipc_group_sort_msg(skb, defq);
0527 
0528     while ((skb = skb_peek(defq))) {
0529         hdr = buf_msg(skb);
0530         mtyp = msg_type(hdr);
0531         blks = msg_blocks(hdr);
0532         deliver = true;
0533         ack = false;
0534         update = false;
0535 
0536         if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
0537             break;
0538 
0539         /* Decide what to do with message */
0540         switch (mtyp) {
0541         case TIPC_GRP_MCAST_MSG:
0542             if (msg_nameinst(hdr) != grp->instance) {
0543                 update = true;
0544                 deliver = false;
0545             }
0546             fallthrough;
0547         case TIPC_GRP_BCAST_MSG:
0548             m->bc_rcv_nxt++;
0549             ack = msg_grp_bc_ack_req(hdr);
0550             break;
0551         case TIPC_GRP_UCAST_MSG:
0552             break;
0553         case TIPC_GRP_MEMBER_EVT:
0554             if (m->state == MBR_LEAVING)
0555                 leave = true;
0556             if (!grp->events)
0557                 deliver = false;
0558             break;
0559         default:
0560             break;
0561         }
0562 
0563         /* Execute decisions */
0564         __skb_dequeue(defq);
0565         if (deliver)
0566             __skb_queue_tail(inputq, skb);
0567         else
0568             kfree_skb(skb);
0569 
0570         if (ack)
0571             tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq);
0572 
0573         if (leave) {
0574             __skb_queue_purge(defq);
0575             tipc_group_delete_member(grp, m);
0576             break;
0577         }
0578         if (!update)
0579             continue;
0580 
0581         tipc_group_update_rcv_win(grp, blks, node, port, xmitq);
0582     }
0583     return;
0584 drop:
0585     kfree_skb(skb);
0586 }
0587 
0588 void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
0589                    u32 port, struct sk_buff_head *xmitq)
0590 {
0591     struct list_head *active = &grp->active;
0592     int max_active = grp->max_active;
0593     int reclaim_limit = max_active * 3 / 4;
0594     int active_cnt = grp->active_cnt;
0595     struct tipc_member *m, *rm, *pm;
0596 
0597     m = tipc_group_find_member(grp, node, port);
0598     if (!m)
0599         return;
0600 
0601     m->advertised -= blks;
0602 
0603     switch (m->state) {
0604     case MBR_JOINED:
0605         /* First, decide if member can go active */
0606         if (active_cnt <= max_active) {
0607             m->state = MBR_ACTIVE;
0608             list_add_tail(&m->list, active);
0609             grp->active_cnt++;
0610             tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
0611         } else {
0612             m->state = MBR_PENDING;
0613             list_add_tail(&m->list, &grp->pending);
0614         }
0615 
0616         if (active_cnt < reclaim_limit)
0617             break;
0618 
0619         /* Reclaim from oldest active member, if possible */
0620         if (!list_empty(active)) {
0621             rm = list_first_entry(active, struct tipc_member, list);
0622             rm->state = MBR_RECLAIMING;
0623             list_del_init(&rm->list);
0624             tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq);
0625             break;
0626         }
0627         /* Nobody to reclaim from; - revert oldest pending to JOINED */
0628         pm = list_first_entry(&grp->pending, struct tipc_member, list);
0629         list_del_init(&pm->list);
0630         pm->state = MBR_JOINED;
0631         tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
0632         break;
0633     case MBR_ACTIVE:
0634         if (!list_is_last(&m->list, &grp->active))
0635             list_move_tail(&m->list, &grp->active);
0636         if (m->advertised > (ADV_ACTIVE * 3 / 4))
0637             break;
0638         tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
0639         break;
0640     case MBR_REMITTED:
0641         if (m->advertised > ADV_IDLE)
0642             break;
0643         m->state = MBR_JOINED;
0644         grp->active_cnt--;
0645         if (m->advertised < ADV_IDLE) {
0646             pr_warn_ratelimited("Rcv unexpected msg after REMIT\n");
0647             tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
0648         }
0649 
0650         if (list_empty(&grp->pending))
0651             return;
0652 
0653         /* Set oldest pending member to active and advertise */
0654         pm = list_first_entry(&grp->pending, struct tipc_member, list);
0655         pm->state = MBR_ACTIVE;
0656         list_move_tail(&pm->list, &grp->active);
0657         grp->active_cnt++;
0658         tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
0659         break;
0660     case MBR_RECLAIMING:
0661     case MBR_JOINING:
0662     case MBR_LEAVING:
0663     default:
0664         break;
0665     }
0666 }
0667 
0668 static void tipc_group_create_event(struct tipc_group *grp,
0669                     struct tipc_member *m,
0670                     u32 event, u16 seqno,
0671                     struct sk_buff_head *inputq)
0672 {   u32 dnode = tipc_own_addr(grp->net);
0673     struct tipc_event evt;
0674     struct sk_buff *skb;
0675     struct tipc_msg *hdr;
0676 
0677     memset(&evt, 0, sizeof(evt));
0678     evt.event = event;
0679     evt.found_lower = m->instance;
0680     evt.found_upper = m->instance;
0681     evt.port.ref = m->port;
0682     evt.port.node = m->node;
0683     evt.s.seq.type = grp->type;
0684     evt.s.seq.lower = m->instance;
0685     evt.s.seq.upper = m->instance;
0686 
0687     skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT,
0688                   GROUP_H_SIZE, sizeof(evt), dnode, m->node,
0689                   grp->portid, m->port, 0);
0690     if (!skb)
0691         return;
0692 
0693     hdr = buf_msg(skb);
0694     msg_set_nametype(hdr, grp->type);
0695     msg_set_grp_evt(hdr, event);
0696     msg_set_dest_droppable(hdr, true);
0697     msg_set_grp_bc_seqno(hdr, seqno);
0698     memcpy(msg_data(hdr), &evt, sizeof(evt));
0699     TIPC_SKB_CB(skb)->orig_member = m->instance;
0700     __skb_queue_tail(inputq, skb);
0701 }
0702 
0703 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
0704                   int mtyp, struct sk_buff_head *xmitq)
0705 {
0706     struct tipc_msg *hdr;
0707     struct sk_buff *skb;
0708     int adv = 0;
0709 
0710     skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
0711                   m->node, tipc_own_addr(grp->net),
0712                   m->port, grp->portid, 0);
0713     if (!skb)
0714         return;
0715 
0716     if (m->state == MBR_ACTIVE)
0717         adv = ADV_ACTIVE - m->advertised;
0718     else if (m->state == MBR_JOINED || m->state == MBR_PENDING)
0719         adv = ADV_IDLE - m->advertised;
0720 
0721     hdr = buf_msg(skb);
0722 
0723     if (mtyp == GRP_JOIN_MSG) {
0724         msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
0725         msg_set_adv_win(hdr, adv);
0726         m->advertised += adv;
0727     } else if (mtyp == GRP_LEAVE_MSG) {
0728         msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
0729     } else if (mtyp == GRP_ADV_MSG) {
0730         msg_set_adv_win(hdr, adv);
0731         m->advertised += adv;
0732     } else if (mtyp == GRP_ACK_MSG) {
0733         msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt);
0734     } else if (mtyp == GRP_REMIT_MSG) {
0735         msg_set_grp_remitted(hdr, m->window);
0736     }
0737     msg_set_dest_droppable(hdr, true);
0738     __skb_queue_tail(xmitq, skb);
0739 }
0740 
0741 void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
0742               struct tipc_msg *hdr, struct sk_buff_head *inputq,
0743               struct sk_buff_head *xmitq)
0744 {
0745     u32 node = msg_orignode(hdr);
0746     u32 port = msg_origport(hdr);
0747     struct tipc_member *m, *pm;
0748     u16 remitted, in_flight;
0749 
0750     if (!grp)
0751         return;
0752 
0753     if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net))
0754         return;
0755 
0756     m = tipc_group_find_member(grp, node, port);
0757 
0758     switch (msg_type(hdr)) {
0759     case GRP_JOIN_MSG:
0760         if (!m)
0761             m = tipc_group_create_member(grp, node, port,
0762                              0, MBR_JOINING);
0763         if (!m)
0764             return;
0765         m->bc_syncpt = msg_grp_bc_syncpt(hdr);
0766         m->bc_rcv_nxt = m->bc_syncpt;
0767         m->window += msg_adv_win(hdr);
0768 
0769         /* Wait until PUBLISH event is received if necessary */
0770         if (m->state != MBR_PUBLISHED)
0771             return;
0772 
0773         /* Member can be taken into service */
0774         m->state = MBR_JOINED;
0775         tipc_group_open(m, usr_wakeup);
0776         tipc_group_update_member(m, 0);
0777         tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
0778         tipc_group_create_event(grp, m, TIPC_PUBLISHED,
0779                     m->bc_syncpt, inputq);
0780         return;
0781     case GRP_LEAVE_MSG:
0782         if (!m)
0783             return;
0784         m->bc_syncpt = msg_grp_bc_syncpt(hdr);
0785         list_del_init(&m->list);
0786         tipc_group_open(m, usr_wakeup);
0787         tipc_group_decr_active(grp, m);
0788         m->state = MBR_LEAVING;
0789         tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
0790                     m->bc_syncpt, inputq);
0791         return;
0792     case GRP_ADV_MSG:
0793         if (!m)
0794             return;
0795         m->window += msg_adv_win(hdr);
0796         tipc_group_open(m, usr_wakeup);
0797         return;
0798     case GRP_ACK_MSG:
0799         if (!m)
0800             return;
0801         m->bc_acked = msg_grp_bc_acked(hdr);
0802         if (--grp->bc_ackers)
0803             return;
0804         list_del_init(&m->small_win);
0805         *m->group->open = true;
0806         *usr_wakeup = true;
0807         tipc_group_update_member(m, 0);
0808         return;
0809     case GRP_RECLAIM_MSG:
0810         if (!m)
0811             return;
0812         tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq);
0813         m->window = ADV_IDLE;
0814         tipc_group_open(m, usr_wakeup);
0815         return;
0816     case GRP_REMIT_MSG:
0817         if (!m || m->state != MBR_RECLAIMING)
0818             return;
0819 
0820         remitted = msg_grp_remitted(hdr);
0821 
0822         /* Messages preceding the REMIT still in receive queue */
0823         if (m->advertised > remitted) {
0824             m->state = MBR_REMITTED;
0825             in_flight = m->advertised - remitted;
0826             m->advertised = ADV_IDLE + in_flight;
0827             return;
0828         }
0829         /* This should never happen */
0830         if (m->advertised < remitted)
0831             pr_warn_ratelimited("Unexpected REMIT msg\n");
0832 
0833         /* All messages preceding the REMIT have been read */
0834         m->state = MBR_JOINED;
0835         grp->active_cnt--;
0836         m->advertised = ADV_IDLE;
0837 
0838         /* Set oldest pending member to active and advertise */
0839         if (list_empty(&grp->pending))
0840             return;
0841         pm = list_first_entry(&grp->pending, struct tipc_member, list);
0842         pm->state = MBR_ACTIVE;
0843         list_move_tail(&pm->list, &grp->active);
0844         grp->active_cnt++;
0845         if (pm->advertised <= (ADV_ACTIVE * 3 / 4))
0846             tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
0847         return;
0848     default:
0849         pr_warn("Received unknown GROUP_PROTO message\n");
0850     }
0851 }
0852 
0853 /* tipc_group_member_evt() - receive and handle a member up/down event
0854  */
0855 void tipc_group_member_evt(struct tipc_group *grp,
0856                bool *usr_wakeup,
0857                int *sk_rcvbuf,
0858                struct tipc_msg *hdr,
0859                struct sk_buff_head *inputq,
0860                struct sk_buff_head *xmitq)
0861 {
0862     struct tipc_event *evt = (void *)msg_data(hdr);
0863     u32 instance = evt->found_lower;
0864     u32 node = evt->port.node;
0865     u32 port = evt->port.ref;
0866     int event = evt->event;
0867     struct tipc_member *m;
0868     struct net *net;
0869     u32 self;
0870 
0871     if (!grp)
0872         return;
0873 
0874     net = grp->net;
0875     self = tipc_own_addr(net);
0876     if (!grp->loopback && node == self && port == grp->portid)
0877         return;
0878 
0879     m = tipc_group_find_member(grp, node, port);
0880 
0881     switch (event) {
0882     case TIPC_PUBLISHED:
0883         /* Send and wait for arrival of JOIN message if necessary */
0884         if (!m) {
0885             m = tipc_group_create_member(grp, node, port, instance,
0886                              MBR_PUBLISHED);
0887             if (!m)
0888                 break;
0889             tipc_group_update_member(m, 0);
0890             tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
0891             break;
0892         }
0893 
0894         if (m->state != MBR_JOINING)
0895             break;
0896 
0897         /* Member can be taken into service */
0898         m->instance = instance;
0899         m->state = MBR_JOINED;
0900         tipc_group_open(m, usr_wakeup);
0901         tipc_group_update_member(m, 0);
0902         tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
0903         tipc_group_create_event(grp, m, TIPC_PUBLISHED,
0904                     m->bc_syncpt, inputq);
0905         break;
0906     case TIPC_WITHDRAWN:
0907         if (!m)
0908             break;
0909 
0910         tipc_group_decr_active(grp, m);
0911         m->state = MBR_LEAVING;
0912         list_del_init(&m->list);
0913         tipc_group_open(m, usr_wakeup);
0914 
0915         /* Only send event if no LEAVE message can be expected */
0916         if (!tipc_node_is_up(net, node))
0917             tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
0918                         m->bc_rcv_nxt, inputq);
0919         break;
0920     default:
0921         break;
0922     }
0923     *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
0924 }
0925 
0926 int tipc_group_fill_sock_diag(struct tipc_group *grp, struct sk_buff *skb)
0927 {
0928     struct nlattr *group = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_GROUP);
0929 
0930     if (!group)
0931         return -EMSGSIZE;
0932 
0933     if (nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_ID,
0934             grp->type) ||
0935         nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_INSTANCE,
0936             grp->instance) ||
0937         nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_BC_SEND_NEXT,
0938             grp->bc_snd_nxt))
0939         goto group_msg_cancel;
0940 
0941     if (grp->scope == TIPC_NODE_SCOPE)
0942         if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_NODE_SCOPE))
0943             goto group_msg_cancel;
0944 
0945     if (grp->scope == TIPC_CLUSTER_SCOPE)
0946         if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_CLUSTER_SCOPE))
0947             goto group_msg_cancel;
0948 
0949     if (*grp->open)
0950         if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_OPEN))
0951             goto group_msg_cancel;
0952 
0953     nla_nest_end(skb, group);
0954     return 0;
0955 
0956 group_msg_cancel:
0957     nla_nest_cancel(skb, group);
0958     return -1;
0959 }