0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037 #include "core.h"
0038 #include "addr.h"
0039 #include "group.h"
0040 #include "bcast.h"
0041 #include "topsrv.h"
0042 #include "msg.h"
0043 #include "socket.h"
0044 #include "node.h"
0045 #include "name_table.h"
0046 #include "subscr.h"
0047
0048 #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
0049 #define ADV_IDLE ADV_UNIT
0050 #define ADV_ACTIVE (ADV_UNIT * 12)
0051
0052 enum mbr_state {
0053 MBR_JOINING,
0054 MBR_PUBLISHED,
0055 MBR_JOINED,
0056 MBR_PENDING,
0057 MBR_ACTIVE,
0058 MBR_RECLAIMING,
0059 MBR_REMITTED,
0060 MBR_LEAVING
0061 };
0062
0063 struct tipc_member {
0064 struct rb_node tree_node;
0065 struct list_head list;
0066 struct list_head small_win;
0067 struct sk_buff_head deferredq;
0068 struct tipc_group *group;
0069 u32 node;
0070 u32 port;
0071 u32 instance;
0072 enum mbr_state state;
0073 u16 advertised;
0074 u16 window;
0075 u16 bc_rcv_nxt;
0076 u16 bc_syncpt;
0077 u16 bc_acked;
0078 };
0079
0080 struct tipc_group {
0081 struct rb_root members;
0082 struct list_head small_win;
0083 struct list_head pending;
0084 struct list_head active;
0085 struct tipc_nlist dests;
0086 struct net *net;
0087 int subid;
0088 u32 type;
0089 u32 instance;
0090 u32 scope;
0091 u32 portid;
0092 u16 member_cnt;
0093 u16 active_cnt;
0094 u16 max_active;
0095 u16 bc_snd_nxt;
0096 u16 bc_ackers;
0097 bool *open;
0098 bool loopback;
0099 bool events;
0100 };
0101
0102 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
0103 int mtyp, struct sk_buff_head *xmitq);
0104
0105 static void tipc_group_open(struct tipc_member *m, bool *wakeup)
0106 {
0107 *wakeup = false;
0108 if (list_empty(&m->small_win))
0109 return;
0110 list_del_init(&m->small_win);
0111 *m->group->open = true;
0112 *wakeup = true;
0113 }
0114
0115 static void tipc_group_decr_active(struct tipc_group *grp,
0116 struct tipc_member *m)
0117 {
0118 if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING ||
0119 m->state == MBR_REMITTED)
0120 grp->active_cnt--;
0121 }
0122
0123 static int tipc_group_rcvbuf_limit(struct tipc_group *grp)
0124 {
0125 int max_active, active_pool, idle_pool;
0126 int mcnt = grp->member_cnt + 1;
0127
0128
0129 max_active = min(mcnt / 8, 64);
0130 max_active = max(max_active, 16);
0131 grp->max_active = max_active;
0132
0133
0134 active_pool = max_active * ADV_ACTIVE;
0135 idle_pool = (mcnt - max_active) * ADV_IDLE;
0136
0137
0138 return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4;
0139 }
0140
0141 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
0142 {
0143 return grp->bc_snd_nxt;
0144 }
0145
0146 static bool tipc_group_is_receiver(struct tipc_member *m)
0147 {
0148 return m && m->state != MBR_JOINING && m->state != MBR_LEAVING;
0149 }
0150
0151 static bool tipc_group_is_sender(struct tipc_member *m)
0152 {
0153 return m && m->state != MBR_JOINING && m->state != MBR_PUBLISHED;
0154 }
0155
0156 u32 tipc_group_exclude(struct tipc_group *grp)
0157 {
0158 if (!grp->loopback)
0159 return grp->portid;
0160 return 0;
0161 }
0162
0163 struct tipc_group *tipc_group_create(struct net *net, u32 portid,
0164 struct tipc_group_req *mreq,
0165 bool *group_is_open)
0166 {
0167 u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS;
0168 bool global = mreq->scope != TIPC_NODE_SCOPE;
0169 struct tipc_group *grp;
0170 u32 type = mreq->type;
0171
0172 grp = kzalloc(sizeof(*grp), GFP_ATOMIC);
0173 if (!grp)
0174 return NULL;
0175 tipc_nlist_init(&grp->dests, tipc_own_addr(net));
0176 INIT_LIST_HEAD(&grp->small_win);
0177 INIT_LIST_HEAD(&grp->active);
0178 INIT_LIST_HEAD(&grp->pending);
0179 grp->members = RB_ROOT;
0180 grp->net = net;
0181 grp->portid = portid;
0182 grp->type = type;
0183 grp->instance = mreq->instance;
0184 grp->scope = mreq->scope;
0185 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
0186 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
0187 grp->open = group_is_open;
0188 *grp->open = false;
0189 filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE;
0190 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0,
0191 filter, &grp->subid))
0192 return grp;
0193 kfree(grp);
0194 return NULL;
0195 }
0196
0197 void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf)
0198 {
0199 struct rb_root *tree = &grp->members;
0200 struct tipc_member *m, *tmp;
0201 struct sk_buff_head xmitq;
0202
0203 __skb_queue_head_init(&xmitq);
0204 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
0205 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq);
0206 tipc_group_update_member(m, 0);
0207 }
0208 tipc_node_distr_xmit(net, &xmitq);
0209 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
0210 }
0211
0212 void tipc_group_delete(struct net *net, struct tipc_group *grp)
0213 {
0214 struct rb_root *tree = &grp->members;
0215 struct tipc_member *m, *tmp;
0216 struct sk_buff_head xmitq;
0217
0218 __skb_queue_head_init(&xmitq);
0219
0220 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
0221 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq);
0222 __skb_queue_purge(&m->deferredq);
0223 list_del(&m->list);
0224 kfree(m);
0225 }
0226 tipc_node_distr_xmit(net, &xmitq);
0227 tipc_nlist_purge(&grp->dests);
0228 tipc_topsrv_kern_unsubscr(net, grp->subid);
0229 kfree(grp);
0230 }
0231
0232 static struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
0233 u32 node, u32 port)
0234 {
0235 struct rb_node *n = grp->members.rb_node;
0236 u64 nkey, key = (u64)node << 32 | port;
0237 struct tipc_member *m;
0238
0239 while (n) {
0240 m = container_of(n, struct tipc_member, tree_node);
0241 nkey = (u64)m->node << 32 | m->port;
0242 if (key < nkey)
0243 n = n->rb_left;
0244 else if (key > nkey)
0245 n = n->rb_right;
0246 else
0247 return m;
0248 }
0249 return NULL;
0250 }
0251
0252 static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp,
0253 u32 node, u32 port)
0254 {
0255 struct tipc_member *m;
0256
0257 m = tipc_group_find_member(grp, node, port);
0258 if (m && tipc_group_is_receiver(m))
0259 return m;
0260 return NULL;
0261 }
0262
0263 static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
0264 u32 node)
0265 {
0266 struct tipc_member *m;
0267 struct rb_node *n;
0268
0269 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
0270 m = container_of(n, struct tipc_member, tree_node);
0271 if (m->node == node)
0272 return m;
0273 }
0274 return NULL;
0275 }
0276
0277 static int tipc_group_add_to_tree(struct tipc_group *grp,
0278 struct tipc_member *m)
0279 {
0280 u64 nkey, key = (u64)m->node << 32 | m->port;
0281 struct rb_node **n, *parent = NULL;
0282 struct tipc_member *tmp;
0283
0284 n = &grp->members.rb_node;
0285 while (*n) {
0286 tmp = container_of(*n, struct tipc_member, tree_node);
0287 parent = *n;
0288 tmp = container_of(parent, struct tipc_member, tree_node);
0289 nkey = (u64)tmp->node << 32 | tmp->port;
0290 if (key < nkey)
0291 n = &(*n)->rb_left;
0292 else if (key > nkey)
0293 n = &(*n)->rb_right;
0294 else
0295 return -EEXIST;
0296 }
0297 rb_link_node(&m->tree_node, parent, n);
0298 rb_insert_color(&m->tree_node, &grp->members);
0299 return 0;
0300 }
0301
0302 static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
0303 u32 node, u32 port,
0304 u32 instance, int state)
0305 {
0306 struct tipc_member *m;
0307 int ret;
0308
0309 m = kzalloc(sizeof(*m), GFP_ATOMIC);
0310 if (!m)
0311 return NULL;
0312 INIT_LIST_HEAD(&m->list);
0313 INIT_LIST_HEAD(&m->small_win);
0314 __skb_queue_head_init(&m->deferredq);
0315 m->group = grp;
0316 m->node = node;
0317 m->port = port;
0318 m->instance = instance;
0319 m->bc_acked = grp->bc_snd_nxt - 1;
0320 ret = tipc_group_add_to_tree(grp, m);
0321 if (ret < 0) {
0322 kfree(m);
0323 return NULL;
0324 }
0325 grp->member_cnt++;
0326 tipc_nlist_add(&grp->dests, m->node);
0327 m->state = state;
0328 return m;
0329 }
0330
0331 void tipc_group_add_member(struct tipc_group *grp, u32 node,
0332 u32 port, u32 instance)
0333 {
0334 tipc_group_create_member(grp, node, port, instance, MBR_PUBLISHED);
0335 }
0336
0337 static void tipc_group_delete_member(struct tipc_group *grp,
0338 struct tipc_member *m)
0339 {
0340 rb_erase(&m->tree_node, &grp->members);
0341 grp->member_cnt--;
0342
0343
0344 if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1))
0345 grp->bc_ackers--;
0346
0347 list_del_init(&m->list);
0348 list_del_init(&m->small_win);
0349 tipc_group_decr_active(grp, m);
0350
0351
0352 if (!tipc_group_find_node(grp, m->node))
0353 tipc_nlist_del(&grp->dests, m->node);
0354
0355 kfree(m);
0356 }
0357
0358 struct tipc_nlist *tipc_group_dests(struct tipc_group *grp)
0359 {
0360 return &grp->dests;
0361 }
0362
0363 void tipc_group_self(struct tipc_group *grp, struct tipc_service_range *seq,
0364 int *scope)
0365 {
0366 seq->type = grp->type;
0367 seq->lower = grp->instance;
0368 seq->upper = grp->instance;
0369 *scope = grp->scope;
0370 }
0371
0372 void tipc_group_update_member(struct tipc_member *m, int len)
0373 {
0374 struct tipc_group *grp = m->group;
0375 struct tipc_member *_m, *tmp;
0376
0377 if (!tipc_group_is_receiver(m))
0378 return;
0379
0380 m->window -= len;
0381
0382 if (m->window >= ADV_IDLE)
0383 return;
0384
0385 list_del_init(&m->small_win);
0386
0387
0388 list_for_each_entry_safe(_m, tmp, &grp->small_win, small_win) {
0389 if (_m->window > m->window)
0390 break;
0391 }
0392 list_add_tail(&m->small_win, &_m->small_win);
0393 }
0394
0395 void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
0396 {
0397 u16 prev = grp->bc_snd_nxt - 1;
0398 struct tipc_member *m;
0399 struct rb_node *n;
0400 u16 ackers = 0;
0401
0402 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
0403 m = container_of(n, struct tipc_member, tree_node);
0404 if (tipc_group_is_receiver(m)) {
0405 tipc_group_update_member(m, len);
0406 m->bc_acked = prev;
0407 ackers++;
0408 }
0409 }
0410
0411
0412 if (ack)
0413 grp->bc_ackers = ackers;
0414 grp->bc_snd_nxt++;
0415 }
0416
0417 bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
0418 int len, struct tipc_member **mbr)
0419 {
0420 struct sk_buff_head xmitq;
0421 struct tipc_member *m;
0422 int adv, state;
0423
0424 m = tipc_group_find_dest(grp, dnode, dport);
0425 if (!tipc_group_is_receiver(m)) {
0426 *mbr = NULL;
0427 return false;
0428 }
0429 *mbr = m;
0430
0431 if (m->window >= len)
0432 return false;
0433
0434 *grp->open = false;
0435
0436
0437 adv = m->advertised;
0438 state = m->state;
0439 if (state == MBR_JOINED && adv == ADV_IDLE)
0440 return true;
0441 if (state == MBR_ACTIVE && adv == ADV_ACTIVE)
0442 return true;
0443 if (state == MBR_PENDING && adv == ADV_IDLE)
0444 return true;
0445 __skb_queue_head_init(&xmitq);
0446 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
0447 tipc_node_distr_xmit(grp->net, &xmitq);
0448 return true;
0449 }
0450
0451 bool tipc_group_bc_cong(struct tipc_group *grp, int len)
0452 {
0453 struct tipc_member *m = NULL;
0454
0455
0456 if (grp->bc_ackers) {
0457 *grp->open = false;
0458 return true;
0459 }
0460 if (list_empty(&grp->small_win))
0461 return false;
0462
0463 m = list_first_entry(&grp->small_win, struct tipc_member, small_win);
0464 if (m->window >= len)
0465 return false;
0466
0467 return tipc_group_cong(grp, m->node, m->port, len, &m);
0468 }
0469
0470
0471
0472 static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq)
0473 {
0474 struct tipc_msg *_hdr, *hdr = buf_msg(skb);
0475 u16 bc_seqno = msg_grp_bc_seqno(hdr);
0476 struct sk_buff *_skb, *tmp;
0477 int mtyp = msg_type(hdr);
0478
0479
0480 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
0481 skb_queue_walk_safe(defq, _skb, tmp) {
0482 _hdr = buf_msg(_skb);
0483 if (!less(bc_seqno, msg_grp_bc_seqno(_hdr)))
0484 continue;
0485 __skb_queue_before(defq, _skb, skb);
0486 return;
0487 }
0488
0489 }
0490
0491 __skb_queue_tail(defq, skb);
0492 }
0493
0494
0495
0496 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
0497 struct sk_buff_head *xmitq)
0498 {
0499 struct sk_buff *skb = __skb_dequeue(inputq);
0500 bool ack, deliver, update, leave = false;
0501 struct sk_buff_head *defq;
0502 struct tipc_member *m;
0503 struct tipc_msg *hdr;
0504 u32 node, port;
0505 int mtyp, blks;
0506
0507 if (!skb)
0508 return;
0509
0510 hdr = buf_msg(skb);
0511 node = msg_orignode(hdr);
0512 port = msg_origport(hdr);
0513
0514 if (!msg_in_group(hdr))
0515 goto drop;
0516
0517 m = tipc_group_find_member(grp, node, port);
0518 if (!tipc_group_is_sender(m))
0519 goto drop;
0520
0521 if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
0522 goto drop;
0523
0524 TIPC_SKB_CB(skb)->orig_member = m->instance;
0525 defq = &m->deferredq;
0526 tipc_group_sort_msg(skb, defq);
0527
0528 while ((skb = skb_peek(defq))) {
0529 hdr = buf_msg(skb);
0530 mtyp = msg_type(hdr);
0531 blks = msg_blocks(hdr);
0532 deliver = true;
0533 ack = false;
0534 update = false;
0535
0536 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
0537 break;
0538
0539
0540 switch (mtyp) {
0541 case TIPC_GRP_MCAST_MSG:
0542 if (msg_nameinst(hdr) != grp->instance) {
0543 update = true;
0544 deliver = false;
0545 }
0546 fallthrough;
0547 case TIPC_GRP_BCAST_MSG:
0548 m->bc_rcv_nxt++;
0549 ack = msg_grp_bc_ack_req(hdr);
0550 break;
0551 case TIPC_GRP_UCAST_MSG:
0552 break;
0553 case TIPC_GRP_MEMBER_EVT:
0554 if (m->state == MBR_LEAVING)
0555 leave = true;
0556 if (!grp->events)
0557 deliver = false;
0558 break;
0559 default:
0560 break;
0561 }
0562
0563
0564 __skb_dequeue(defq);
0565 if (deliver)
0566 __skb_queue_tail(inputq, skb);
0567 else
0568 kfree_skb(skb);
0569
0570 if (ack)
0571 tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq);
0572
0573 if (leave) {
0574 __skb_queue_purge(defq);
0575 tipc_group_delete_member(grp, m);
0576 break;
0577 }
0578 if (!update)
0579 continue;
0580
0581 tipc_group_update_rcv_win(grp, blks, node, port, xmitq);
0582 }
0583 return;
0584 drop:
0585 kfree_skb(skb);
0586 }
0587
0588 void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
0589 u32 port, struct sk_buff_head *xmitq)
0590 {
0591 struct list_head *active = &grp->active;
0592 int max_active = grp->max_active;
0593 int reclaim_limit = max_active * 3 / 4;
0594 int active_cnt = grp->active_cnt;
0595 struct tipc_member *m, *rm, *pm;
0596
0597 m = tipc_group_find_member(grp, node, port);
0598 if (!m)
0599 return;
0600
0601 m->advertised -= blks;
0602
0603 switch (m->state) {
0604 case MBR_JOINED:
0605
0606 if (active_cnt <= max_active) {
0607 m->state = MBR_ACTIVE;
0608 list_add_tail(&m->list, active);
0609 grp->active_cnt++;
0610 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
0611 } else {
0612 m->state = MBR_PENDING;
0613 list_add_tail(&m->list, &grp->pending);
0614 }
0615
0616 if (active_cnt < reclaim_limit)
0617 break;
0618
0619
0620 if (!list_empty(active)) {
0621 rm = list_first_entry(active, struct tipc_member, list);
0622 rm->state = MBR_RECLAIMING;
0623 list_del_init(&rm->list);
0624 tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq);
0625 break;
0626 }
0627
0628 pm = list_first_entry(&grp->pending, struct tipc_member, list);
0629 list_del_init(&pm->list);
0630 pm->state = MBR_JOINED;
0631 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
0632 break;
0633 case MBR_ACTIVE:
0634 if (!list_is_last(&m->list, &grp->active))
0635 list_move_tail(&m->list, &grp->active);
0636 if (m->advertised > (ADV_ACTIVE * 3 / 4))
0637 break;
0638 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
0639 break;
0640 case MBR_REMITTED:
0641 if (m->advertised > ADV_IDLE)
0642 break;
0643 m->state = MBR_JOINED;
0644 grp->active_cnt--;
0645 if (m->advertised < ADV_IDLE) {
0646 pr_warn_ratelimited("Rcv unexpected msg after REMIT\n");
0647 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
0648 }
0649
0650 if (list_empty(&grp->pending))
0651 return;
0652
0653
0654 pm = list_first_entry(&grp->pending, struct tipc_member, list);
0655 pm->state = MBR_ACTIVE;
0656 list_move_tail(&pm->list, &grp->active);
0657 grp->active_cnt++;
0658 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
0659 break;
0660 case MBR_RECLAIMING:
0661 case MBR_JOINING:
0662 case MBR_LEAVING:
0663 default:
0664 break;
0665 }
0666 }
0667
0668 static void tipc_group_create_event(struct tipc_group *grp,
0669 struct tipc_member *m,
0670 u32 event, u16 seqno,
0671 struct sk_buff_head *inputq)
0672 { u32 dnode = tipc_own_addr(grp->net);
0673 struct tipc_event evt;
0674 struct sk_buff *skb;
0675 struct tipc_msg *hdr;
0676
0677 memset(&evt, 0, sizeof(evt));
0678 evt.event = event;
0679 evt.found_lower = m->instance;
0680 evt.found_upper = m->instance;
0681 evt.port.ref = m->port;
0682 evt.port.node = m->node;
0683 evt.s.seq.type = grp->type;
0684 evt.s.seq.lower = m->instance;
0685 evt.s.seq.upper = m->instance;
0686
0687 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT,
0688 GROUP_H_SIZE, sizeof(evt), dnode, m->node,
0689 grp->portid, m->port, 0);
0690 if (!skb)
0691 return;
0692
0693 hdr = buf_msg(skb);
0694 msg_set_nametype(hdr, grp->type);
0695 msg_set_grp_evt(hdr, event);
0696 msg_set_dest_droppable(hdr, true);
0697 msg_set_grp_bc_seqno(hdr, seqno);
0698 memcpy(msg_data(hdr), &evt, sizeof(evt));
0699 TIPC_SKB_CB(skb)->orig_member = m->instance;
0700 __skb_queue_tail(inputq, skb);
0701 }
0702
0703 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
0704 int mtyp, struct sk_buff_head *xmitq)
0705 {
0706 struct tipc_msg *hdr;
0707 struct sk_buff *skb;
0708 int adv = 0;
0709
0710 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
0711 m->node, tipc_own_addr(grp->net),
0712 m->port, grp->portid, 0);
0713 if (!skb)
0714 return;
0715
0716 if (m->state == MBR_ACTIVE)
0717 adv = ADV_ACTIVE - m->advertised;
0718 else if (m->state == MBR_JOINED || m->state == MBR_PENDING)
0719 adv = ADV_IDLE - m->advertised;
0720
0721 hdr = buf_msg(skb);
0722
0723 if (mtyp == GRP_JOIN_MSG) {
0724 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
0725 msg_set_adv_win(hdr, adv);
0726 m->advertised += adv;
0727 } else if (mtyp == GRP_LEAVE_MSG) {
0728 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
0729 } else if (mtyp == GRP_ADV_MSG) {
0730 msg_set_adv_win(hdr, adv);
0731 m->advertised += adv;
0732 } else if (mtyp == GRP_ACK_MSG) {
0733 msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt);
0734 } else if (mtyp == GRP_REMIT_MSG) {
0735 msg_set_grp_remitted(hdr, m->window);
0736 }
0737 msg_set_dest_droppable(hdr, true);
0738 __skb_queue_tail(xmitq, skb);
0739 }
0740
0741 void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
0742 struct tipc_msg *hdr, struct sk_buff_head *inputq,
0743 struct sk_buff_head *xmitq)
0744 {
0745 u32 node = msg_orignode(hdr);
0746 u32 port = msg_origport(hdr);
0747 struct tipc_member *m, *pm;
0748 u16 remitted, in_flight;
0749
0750 if (!grp)
0751 return;
0752
0753 if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net))
0754 return;
0755
0756 m = tipc_group_find_member(grp, node, port);
0757
0758 switch (msg_type(hdr)) {
0759 case GRP_JOIN_MSG:
0760 if (!m)
0761 m = tipc_group_create_member(grp, node, port,
0762 0, MBR_JOINING);
0763 if (!m)
0764 return;
0765 m->bc_syncpt = msg_grp_bc_syncpt(hdr);
0766 m->bc_rcv_nxt = m->bc_syncpt;
0767 m->window += msg_adv_win(hdr);
0768
0769
0770 if (m->state != MBR_PUBLISHED)
0771 return;
0772
0773
0774 m->state = MBR_JOINED;
0775 tipc_group_open(m, usr_wakeup);
0776 tipc_group_update_member(m, 0);
0777 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
0778 tipc_group_create_event(grp, m, TIPC_PUBLISHED,
0779 m->bc_syncpt, inputq);
0780 return;
0781 case GRP_LEAVE_MSG:
0782 if (!m)
0783 return;
0784 m->bc_syncpt = msg_grp_bc_syncpt(hdr);
0785 list_del_init(&m->list);
0786 tipc_group_open(m, usr_wakeup);
0787 tipc_group_decr_active(grp, m);
0788 m->state = MBR_LEAVING;
0789 tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
0790 m->bc_syncpt, inputq);
0791 return;
0792 case GRP_ADV_MSG:
0793 if (!m)
0794 return;
0795 m->window += msg_adv_win(hdr);
0796 tipc_group_open(m, usr_wakeup);
0797 return;
0798 case GRP_ACK_MSG:
0799 if (!m)
0800 return;
0801 m->bc_acked = msg_grp_bc_acked(hdr);
0802 if (--grp->bc_ackers)
0803 return;
0804 list_del_init(&m->small_win);
0805 *m->group->open = true;
0806 *usr_wakeup = true;
0807 tipc_group_update_member(m, 0);
0808 return;
0809 case GRP_RECLAIM_MSG:
0810 if (!m)
0811 return;
0812 tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq);
0813 m->window = ADV_IDLE;
0814 tipc_group_open(m, usr_wakeup);
0815 return;
0816 case GRP_REMIT_MSG:
0817 if (!m || m->state != MBR_RECLAIMING)
0818 return;
0819
0820 remitted = msg_grp_remitted(hdr);
0821
0822
0823 if (m->advertised > remitted) {
0824 m->state = MBR_REMITTED;
0825 in_flight = m->advertised - remitted;
0826 m->advertised = ADV_IDLE + in_flight;
0827 return;
0828 }
0829
0830 if (m->advertised < remitted)
0831 pr_warn_ratelimited("Unexpected REMIT msg\n");
0832
0833
0834 m->state = MBR_JOINED;
0835 grp->active_cnt--;
0836 m->advertised = ADV_IDLE;
0837
0838
0839 if (list_empty(&grp->pending))
0840 return;
0841 pm = list_first_entry(&grp->pending, struct tipc_member, list);
0842 pm->state = MBR_ACTIVE;
0843 list_move_tail(&pm->list, &grp->active);
0844 grp->active_cnt++;
0845 if (pm->advertised <= (ADV_ACTIVE * 3 / 4))
0846 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
0847 return;
0848 default:
0849 pr_warn("Received unknown GROUP_PROTO message\n");
0850 }
0851 }
0852
0853
0854
0855 void tipc_group_member_evt(struct tipc_group *grp,
0856 bool *usr_wakeup,
0857 int *sk_rcvbuf,
0858 struct tipc_msg *hdr,
0859 struct sk_buff_head *inputq,
0860 struct sk_buff_head *xmitq)
0861 {
0862 struct tipc_event *evt = (void *)msg_data(hdr);
0863 u32 instance = evt->found_lower;
0864 u32 node = evt->port.node;
0865 u32 port = evt->port.ref;
0866 int event = evt->event;
0867 struct tipc_member *m;
0868 struct net *net;
0869 u32 self;
0870
0871 if (!grp)
0872 return;
0873
0874 net = grp->net;
0875 self = tipc_own_addr(net);
0876 if (!grp->loopback && node == self && port == grp->portid)
0877 return;
0878
0879 m = tipc_group_find_member(grp, node, port);
0880
0881 switch (event) {
0882 case TIPC_PUBLISHED:
0883
0884 if (!m) {
0885 m = tipc_group_create_member(grp, node, port, instance,
0886 MBR_PUBLISHED);
0887 if (!m)
0888 break;
0889 tipc_group_update_member(m, 0);
0890 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
0891 break;
0892 }
0893
0894 if (m->state != MBR_JOINING)
0895 break;
0896
0897
0898 m->instance = instance;
0899 m->state = MBR_JOINED;
0900 tipc_group_open(m, usr_wakeup);
0901 tipc_group_update_member(m, 0);
0902 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
0903 tipc_group_create_event(grp, m, TIPC_PUBLISHED,
0904 m->bc_syncpt, inputq);
0905 break;
0906 case TIPC_WITHDRAWN:
0907 if (!m)
0908 break;
0909
0910 tipc_group_decr_active(grp, m);
0911 m->state = MBR_LEAVING;
0912 list_del_init(&m->list);
0913 tipc_group_open(m, usr_wakeup);
0914
0915
0916 if (!tipc_node_is_up(net, node))
0917 tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
0918 m->bc_rcv_nxt, inputq);
0919 break;
0920 default:
0921 break;
0922 }
0923 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
0924 }
0925
0926 int tipc_group_fill_sock_diag(struct tipc_group *grp, struct sk_buff *skb)
0927 {
0928 struct nlattr *group = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_GROUP);
0929
0930 if (!group)
0931 return -EMSGSIZE;
0932
0933 if (nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_ID,
0934 grp->type) ||
0935 nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_INSTANCE,
0936 grp->instance) ||
0937 nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_BC_SEND_NEXT,
0938 grp->bc_snd_nxt))
0939 goto group_msg_cancel;
0940
0941 if (grp->scope == TIPC_NODE_SCOPE)
0942 if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_NODE_SCOPE))
0943 goto group_msg_cancel;
0944
0945 if (grp->scope == TIPC_CLUSTER_SCOPE)
0946 if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_CLUSTER_SCOPE))
0947 goto group_msg_cancel;
0948
0949 if (*grp->open)
0950 if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_OPEN))
0951 goto group_msg_cancel;
0952
0953 nla_nest_end(skb, group);
0954 return 0;
0955
0956 group_msg_cancel:
0957 nla_nest_cancel(skb, group);
0958 return -1;
0959 }