0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038 #include "core.h"
0039 #include "link.h"
0040 #include "name_distr.h"
0041
0042 int sysctl_tipc_named_timeout __read_mostly = 2000;
0043
0044 struct distr_queue_item {
0045 struct distr_item i;
0046 u32 dtype;
0047 u32 node;
0048 unsigned long expires;
0049 struct list_head next;
0050 };
0051
0052
0053
0054
0055
0056
0057 static void publ_to_item(struct distr_item *i, struct publication *p)
0058 {
0059 i->type = htonl(p->sr.type);
0060 i->lower = htonl(p->sr.lower);
0061 i->upper = htonl(p->sr.upper);
0062 i->port = htonl(p->sk.ref);
0063 i->key = htonl(p->key);
0064 }
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075 static struct sk_buff *named_prepare_buf(struct net *net, u32 type, u32 size,
0076 u32 dest)
0077 {
0078 struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE + size, GFP_ATOMIC);
0079 u32 self = tipc_own_addr(net);
0080 struct tipc_msg *msg;
0081
0082 if (buf != NULL) {
0083 msg = buf_msg(buf);
0084 tipc_msg_init(self, msg, NAME_DISTRIBUTOR,
0085 type, INT_H_SIZE, dest);
0086 msg_set_size(msg, INT_H_SIZE + size);
0087 }
0088 return buf;
0089 }
0090
0091
0092
0093
0094
0095
0096 struct sk_buff *tipc_named_publish(struct net *net, struct publication *p)
0097 {
0098 struct name_table *nt = tipc_name_table(net);
0099 struct distr_item *item;
0100 struct sk_buff *skb;
0101
0102 if (p->scope == TIPC_NODE_SCOPE) {
0103 list_add_tail_rcu(&p->binding_node, &nt->node_scope);
0104 return NULL;
0105 }
0106 write_lock_bh(&nt->cluster_scope_lock);
0107 list_add_tail(&p->binding_node, &nt->cluster_scope);
0108 write_unlock_bh(&nt->cluster_scope_lock);
0109 skb = named_prepare_buf(net, PUBLICATION, ITEM_SIZE, 0);
0110 if (!skb) {
0111 pr_warn("Publication distribution failure\n");
0112 return NULL;
0113 }
0114 msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
0115 msg_set_non_legacy(buf_msg(skb));
0116 item = (struct distr_item *)msg_data(buf_msg(skb));
0117 publ_to_item(item, p);
0118 return skb;
0119 }
0120
0121
0122
0123
0124
0125
0126 struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *p)
0127 {
0128 struct name_table *nt = tipc_name_table(net);
0129 struct distr_item *item;
0130 struct sk_buff *skb;
0131
0132 write_lock_bh(&nt->cluster_scope_lock);
0133 list_del(&p->binding_node);
0134 write_unlock_bh(&nt->cluster_scope_lock);
0135 if (p->scope == TIPC_NODE_SCOPE)
0136 return NULL;
0137
0138 skb = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0);
0139 if (!skb) {
0140 pr_warn("Withdrawal distribution failure\n");
0141 return NULL;
0142 }
0143 msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
0144 msg_set_non_legacy(buf_msg(skb));
0145 item = (struct distr_item *)msg_data(buf_msg(skb));
0146 publ_to_item(item, p);
0147 return skb;
0148 }
0149
0150
0151
0152
0153
0154
0155
0156
0157
0158 static void named_distribute(struct net *net, struct sk_buff_head *list,
0159 u32 dnode, struct list_head *pls, u16 seqno)
0160 {
0161 struct publication *publ;
0162 struct sk_buff *skb = NULL;
0163 struct distr_item *item = NULL;
0164 u32 msg_dsz = ((tipc_node_get_mtu(net, dnode, 0, false) - INT_H_SIZE) /
0165 ITEM_SIZE) * ITEM_SIZE;
0166 u32 msg_rem = msg_dsz;
0167 struct tipc_msg *hdr;
0168
0169 list_for_each_entry(publ, pls, binding_node) {
0170
0171 if (!skb) {
0172 skb = named_prepare_buf(net, PUBLICATION, msg_rem,
0173 dnode);
0174 if (!skb) {
0175 pr_warn("Bulk publication failure\n");
0176 return;
0177 }
0178 hdr = buf_msg(skb);
0179 msg_set_bc_ack_invalid(hdr, true);
0180 msg_set_bulk(hdr);
0181 msg_set_non_legacy(hdr);
0182 item = (struct distr_item *)msg_data(hdr);
0183 }
0184
0185
0186 publ_to_item(item, publ);
0187 item++;
0188 msg_rem -= ITEM_SIZE;
0189
0190
0191 if (!msg_rem) {
0192 __skb_queue_tail(list, skb);
0193 skb = NULL;
0194 msg_rem = msg_dsz;
0195 }
0196 }
0197 if (skb) {
0198 hdr = buf_msg(skb);
0199 msg_set_size(hdr, INT_H_SIZE + (msg_dsz - msg_rem));
0200 skb_trim(skb, INT_H_SIZE + (msg_dsz - msg_rem));
0201 __skb_queue_tail(list, skb);
0202 }
0203 hdr = buf_msg(skb_peek_tail(list));
0204 msg_set_last_bulk(hdr);
0205 msg_set_named_seqno(hdr, seqno);
0206 }
0207
0208
0209
0210
0211
0212
0213
0214 void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities)
0215 {
0216 struct name_table *nt = tipc_name_table(net);
0217 struct tipc_net *tn = tipc_net(net);
0218 struct sk_buff_head head;
0219 u16 seqno;
0220
0221 __skb_queue_head_init(&head);
0222 spin_lock_bh(&tn->nametbl_lock);
0223 if (!(capabilities & TIPC_NAMED_BCAST))
0224 nt->rc_dests++;
0225 seqno = nt->snd_nxt;
0226 spin_unlock_bh(&tn->nametbl_lock);
0227
0228 read_lock_bh(&nt->cluster_scope_lock);
0229 named_distribute(net, &head, dnode, &nt->cluster_scope, seqno);
0230 tipc_node_xmit(net, &head, dnode, 0);
0231 read_unlock_bh(&nt->cluster_scope_lock);
0232 }
0233
0234
0235
0236
0237
0238
0239
0240
0241
0242
0243 static void tipc_publ_purge(struct net *net, struct publication *p, u32 addr)
0244 {
0245 struct tipc_net *tn = tipc_net(net);
0246 struct publication *_p;
0247 struct tipc_uaddr ua;
0248
0249 tipc_uaddr(&ua, TIPC_SERVICE_RANGE, p->scope, p->sr.type,
0250 p->sr.lower, p->sr.upper);
0251 spin_lock_bh(&tn->nametbl_lock);
0252 _p = tipc_nametbl_remove_publ(net, &ua, &p->sk, p->key);
0253 if (_p)
0254 tipc_node_unsubscribe(net, &_p->binding_node, addr);
0255 spin_unlock_bh(&tn->nametbl_lock);
0256 if (_p)
0257 kfree_rcu(_p, rcu);
0258 }
0259
0260 void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
0261 u32 addr, u16 capabilities)
0262 {
0263 struct name_table *nt = tipc_name_table(net);
0264 struct tipc_net *tn = tipc_net(net);
0265
0266 struct publication *publ, *tmp;
0267
0268 list_for_each_entry_safe(publ, tmp, nsub_list, binding_node)
0269 tipc_publ_purge(net, publ, addr);
0270 spin_lock_bh(&tn->nametbl_lock);
0271 if (!(capabilities & TIPC_NAMED_BCAST))
0272 nt->rc_dests--;
0273 spin_unlock_bh(&tn->nametbl_lock);
0274 }
0275
0276
0277
0278
0279
0280
0281
0282
0283
0284
0285
0286
0287 static bool tipc_update_nametbl(struct net *net, struct distr_item *i,
0288 u32 node, u32 dtype)
0289 {
0290 struct publication *p = NULL;
0291 struct tipc_socket_addr sk;
0292 struct tipc_uaddr ua;
0293 u32 key = ntohl(i->key);
0294
0295 tipc_uaddr(&ua, TIPC_SERVICE_RANGE, TIPC_CLUSTER_SCOPE,
0296 ntohl(i->type), ntohl(i->lower), ntohl(i->upper));
0297 sk.ref = ntohl(i->port);
0298 sk.node = node;
0299
0300 if (dtype == PUBLICATION) {
0301 p = tipc_nametbl_insert_publ(net, &ua, &sk, key);
0302 if (p) {
0303 tipc_node_subscribe(net, &p->binding_node, node);
0304 return true;
0305 }
0306 } else if (dtype == WITHDRAWAL) {
0307 p = tipc_nametbl_remove_publ(net, &ua, &sk, key);
0308 if (p) {
0309 tipc_node_unsubscribe(net, &p->binding_node, node);
0310 kfree_rcu(p, rcu);
0311 return true;
0312 }
0313 pr_warn_ratelimited("Failed to remove binding %u,%u from %u\n",
0314 ua.sr.type, ua.sr.lower, node);
0315 } else {
0316 pr_warn_ratelimited("Unknown name table message received\n");
0317 }
0318 return false;
0319 }
0320
0321 static struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
0322 u16 *rcv_nxt, bool *open)
0323 {
0324 struct sk_buff *skb, *tmp;
0325 struct tipc_msg *hdr;
0326 u16 seqno;
0327
0328 spin_lock_bh(&namedq->lock);
0329 skb_queue_walk_safe(namedq, skb, tmp) {
0330 if (unlikely(skb_linearize(skb))) {
0331 __skb_unlink(skb, namedq);
0332 kfree_skb(skb);
0333 continue;
0334 }
0335 hdr = buf_msg(skb);
0336 seqno = msg_named_seqno(hdr);
0337 if (msg_is_last_bulk(hdr)) {
0338 *rcv_nxt = seqno;
0339 *open = true;
0340 }
0341
0342 if (msg_is_bulk(hdr) || msg_is_legacy(hdr)) {
0343 __skb_unlink(skb, namedq);
0344 spin_unlock_bh(&namedq->lock);
0345 return skb;
0346 }
0347
0348 if (*open && (*rcv_nxt == seqno)) {
0349 (*rcv_nxt)++;
0350 __skb_unlink(skb, namedq);
0351 spin_unlock_bh(&namedq->lock);
0352 return skb;
0353 }
0354
0355 if (less(seqno, *rcv_nxt)) {
0356 __skb_unlink(skb, namedq);
0357 kfree_skb(skb);
0358 continue;
0359 }
0360 }
0361 spin_unlock_bh(&namedq->lock);
0362 return NULL;
0363 }
0364
0365
0366
0367
0368
0369
0370
0371
0372 void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
0373 u16 *rcv_nxt, bool *open)
0374 {
0375 struct tipc_net *tn = tipc_net(net);
0376 struct distr_item *item;
0377 struct tipc_msg *hdr;
0378 struct sk_buff *skb;
0379 u32 count, node;
0380
0381 spin_lock_bh(&tn->nametbl_lock);
0382 while ((skb = tipc_named_dequeue(namedq, rcv_nxt, open))) {
0383 hdr = buf_msg(skb);
0384 node = msg_orignode(hdr);
0385 item = (struct distr_item *)msg_data(hdr);
0386 count = msg_data_sz(hdr) / ITEM_SIZE;
0387 while (count--) {
0388 tipc_update_nametbl(net, item, node, msg_type(hdr));
0389 item++;
0390 }
0391 kfree_skb(skb);
0392 }
0393 spin_unlock_bh(&tn->nametbl_lock);
0394 }
0395
0396
0397
0398
0399
0400
0401
0402
0403
0404 void tipc_named_reinit(struct net *net)
0405 {
0406 struct name_table *nt = tipc_name_table(net);
0407 struct tipc_net *tn = tipc_net(net);
0408 struct publication *p;
0409 u32 self = tipc_own_addr(net);
0410
0411 spin_lock_bh(&tn->nametbl_lock);
0412
0413 list_for_each_entry_rcu(p, &nt->node_scope, binding_node)
0414 p->sk.node = self;
0415 list_for_each_entry_rcu(p, &nt->cluster_scope, binding_node)
0416 p->sk.node = self;
0417 nt->rc_dests = 0;
0418 spin_unlock_bh(&tn->nametbl_lock);
0419 }