0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037 #include "subscr.h"
0038 #include "topsrv.h"
0039 #include "core.h"
0040 #include "socket.h"
0041 #include "addr.h"
0042 #include "msg.h"
0043 #include "bearer.h"
0044 #include <net/sock.h>
0045 #include <linux/module.h>
0046
0047
0048 #define MAX_SEND_MSG_COUNT 25
0049 #define MAX_RECV_MSG_COUNT 25
0050 #define CF_CONNECTED 1
0051
0052 #define TIPC_SERVER_NAME_LEN 32
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066 struct tipc_topsrv {
0067 struct idr conn_idr;
0068 spinlock_t idr_lock;
0069 int idr_in_use;
0070 struct net *net;
0071 struct work_struct awork;
0072 struct workqueue_struct *rcv_wq;
0073 struct workqueue_struct *send_wq;
0074 struct socket *listener;
0075 char name[TIPC_SERVER_NAME_LEN];
0076 };
0077
0078
0079
0080
0081
0082
0083
0084
0085
0086
0087
0088
0089
0090
0091
0092 struct tipc_conn {
0093 struct kref kref;
0094 int conid;
0095 struct socket *sock;
0096 unsigned long flags;
0097 struct tipc_topsrv *server;
0098 struct list_head sub_list;
0099 spinlock_t sub_lock;
0100 struct work_struct rwork;
0101 struct list_head outqueue;
0102 spinlock_t outqueue_lock;
0103 struct work_struct swork;
0104 };
0105
0106
0107 struct outqueue_entry {
0108 bool inactive;
0109 struct tipc_event evt;
0110 struct list_head list;
0111 };
0112
0113 static void tipc_conn_recv_work(struct work_struct *work);
0114 static void tipc_conn_send_work(struct work_struct *work);
0115 static void tipc_topsrv_kern_evt(struct net *net, struct tipc_event *evt);
0116 static void tipc_conn_delete_sub(struct tipc_conn *con, struct tipc_subscr *s);
0117
0118 static bool connected(struct tipc_conn *con)
0119 {
0120 return con && test_bit(CF_CONNECTED, &con->flags);
0121 }
0122
0123 static void tipc_conn_kref_release(struct kref *kref)
0124 {
0125 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
0126 struct tipc_topsrv *s = con->server;
0127 struct outqueue_entry *e, *safe;
0128
0129 spin_lock_bh(&s->idr_lock);
0130 idr_remove(&s->conn_idr, con->conid);
0131 s->idr_in_use--;
0132 spin_unlock_bh(&s->idr_lock);
0133 if (con->sock)
0134 sock_release(con->sock);
0135
0136 spin_lock_bh(&con->outqueue_lock);
0137 list_for_each_entry_safe(e, safe, &con->outqueue, list) {
0138 list_del(&e->list);
0139 kfree(e);
0140 }
0141 spin_unlock_bh(&con->outqueue_lock);
0142 kfree(con);
0143 }
0144
0145 static void conn_put(struct tipc_conn *con)
0146 {
0147 kref_put(&con->kref, tipc_conn_kref_release);
0148 }
0149
0150 static void conn_get(struct tipc_conn *con)
0151 {
0152 kref_get(&con->kref);
0153 }
0154
0155 static void tipc_conn_close(struct tipc_conn *con)
0156 {
0157 struct sock *sk = con->sock->sk;
0158 bool disconnect = false;
0159
0160 write_lock_bh(&sk->sk_callback_lock);
0161 disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags);
0162
0163 if (disconnect) {
0164 sk->sk_user_data = NULL;
0165 tipc_conn_delete_sub(con, NULL);
0166 }
0167 write_unlock_bh(&sk->sk_callback_lock);
0168
0169
0170 if (!disconnect)
0171 return;
0172
0173
0174 kernel_sock_shutdown(con->sock, SHUT_RDWR);
0175
0176 conn_put(con);
0177 }
0178
0179 static struct tipc_conn *tipc_conn_alloc(struct tipc_topsrv *s)
0180 {
0181 struct tipc_conn *con;
0182 int ret;
0183
0184 con = kzalloc(sizeof(*con), GFP_ATOMIC);
0185 if (!con)
0186 return ERR_PTR(-ENOMEM);
0187
0188 kref_init(&con->kref);
0189 INIT_LIST_HEAD(&con->outqueue);
0190 INIT_LIST_HEAD(&con->sub_list);
0191 spin_lock_init(&con->outqueue_lock);
0192 spin_lock_init(&con->sub_lock);
0193 INIT_WORK(&con->swork, tipc_conn_send_work);
0194 INIT_WORK(&con->rwork, tipc_conn_recv_work);
0195
0196 spin_lock_bh(&s->idr_lock);
0197 ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC);
0198 if (ret < 0) {
0199 kfree(con);
0200 spin_unlock_bh(&s->idr_lock);
0201 return ERR_PTR(-ENOMEM);
0202 }
0203 con->conid = ret;
0204 s->idr_in_use++;
0205 spin_unlock_bh(&s->idr_lock);
0206
0207 set_bit(CF_CONNECTED, &con->flags);
0208 con->server = s;
0209
0210 return con;
0211 }
0212
0213 static struct tipc_conn *tipc_conn_lookup(struct tipc_topsrv *s, int conid)
0214 {
0215 struct tipc_conn *con;
0216
0217 spin_lock_bh(&s->idr_lock);
0218 con = idr_find(&s->conn_idr, conid);
0219 if (!connected(con) || !kref_get_unless_zero(&con->kref))
0220 con = NULL;
0221 spin_unlock_bh(&s->idr_lock);
0222 return con;
0223 }
0224
0225
0226
0227
0228 static void tipc_conn_delete_sub(struct tipc_conn *con, struct tipc_subscr *s)
0229 {
0230 struct tipc_net *tn = tipc_net(con->server->net);
0231 struct list_head *sub_list = &con->sub_list;
0232 struct tipc_subscription *sub, *tmp;
0233
0234 spin_lock_bh(&con->sub_lock);
0235 list_for_each_entry_safe(sub, tmp, sub_list, sub_list) {
0236 if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) {
0237 tipc_sub_unsubscribe(sub);
0238 atomic_dec(&tn->subscription_count);
0239 if (s)
0240 break;
0241 }
0242 }
0243 spin_unlock_bh(&con->sub_lock);
0244 }
0245
0246 static void tipc_conn_send_to_sock(struct tipc_conn *con)
0247 {
0248 struct list_head *queue = &con->outqueue;
0249 struct tipc_topsrv *srv = con->server;
0250 struct outqueue_entry *e;
0251 struct tipc_event *evt;
0252 struct msghdr msg;
0253 struct kvec iov;
0254 int count = 0;
0255 int ret;
0256
0257 spin_lock_bh(&con->outqueue_lock);
0258
0259 while (!list_empty(queue)) {
0260 e = list_first_entry(queue, struct outqueue_entry, list);
0261 evt = &e->evt;
0262 spin_unlock_bh(&con->outqueue_lock);
0263
0264 if (e->inactive)
0265 tipc_conn_delete_sub(con, &evt->s);
0266
0267 memset(&msg, 0, sizeof(msg));
0268 msg.msg_flags = MSG_DONTWAIT;
0269 iov.iov_base = evt;
0270 iov.iov_len = sizeof(*evt);
0271 msg.msg_name = NULL;
0272
0273 if (con->sock) {
0274 ret = kernel_sendmsg(con->sock, &msg, &iov,
0275 1, sizeof(*evt));
0276 if (ret == -EWOULDBLOCK || ret == 0) {
0277 cond_resched();
0278 return;
0279 } else if (ret < 0) {
0280 return tipc_conn_close(con);
0281 }
0282 } else {
0283 tipc_topsrv_kern_evt(srv->net, evt);
0284 }
0285
0286
0287 if (++count >= MAX_SEND_MSG_COUNT) {
0288 cond_resched();
0289 count = 0;
0290 }
0291 spin_lock_bh(&con->outqueue_lock);
0292 list_del(&e->list);
0293 kfree(e);
0294 }
0295 spin_unlock_bh(&con->outqueue_lock);
0296 }
0297
0298 static void tipc_conn_send_work(struct work_struct *work)
0299 {
0300 struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
0301
0302 if (connected(con))
0303 tipc_conn_send_to_sock(con);
0304
0305 conn_put(con);
0306 }
0307
0308
0309
0310
0311 void tipc_topsrv_queue_evt(struct net *net, int conid,
0312 u32 event, struct tipc_event *evt)
0313 {
0314 struct tipc_topsrv *srv = tipc_topsrv(net);
0315 struct outqueue_entry *e;
0316 struct tipc_conn *con;
0317
0318 con = tipc_conn_lookup(srv, conid);
0319 if (!con)
0320 return;
0321
0322 if (!connected(con))
0323 goto err;
0324
0325 e = kmalloc(sizeof(*e), GFP_ATOMIC);
0326 if (!e)
0327 goto err;
0328 e->inactive = (event == TIPC_SUBSCR_TIMEOUT);
0329 memcpy(&e->evt, evt, sizeof(*evt));
0330 spin_lock_bh(&con->outqueue_lock);
0331 list_add_tail(&e->list, &con->outqueue);
0332 spin_unlock_bh(&con->outqueue_lock);
0333
0334 if (queue_work(srv->send_wq, &con->swork))
0335 return;
0336 err:
0337 conn_put(con);
0338 }
0339
0340
0341
0342
0343
0344 static void tipc_conn_write_space(struct sock *sk)
0345 {
0346 struct tipc_conn *con;
0347
0348 read_lock_bh(&sk->sk_callback_lock);
0349 con = sk->sk_user_data;
0350 if (connected(con)) {
0351 conn_get(con);
0352 if (!queue_work(con->server->send_wq, &con->swork))
0353 conn_put(con);
0354 }
0355 read_unlock_bh(&sk->sk_callback_lock);
0356 }
0357
0358 static int tipc_conn_rcv_sub(struct tipc_topsrv *srv,
0359 struct tipc_conn *con,
0360 struct tipc_subscr *s)
0361 {
0362 struct tipc_net *tn = tipc_net(srv->net);
0363 struct tipc_subscription *sub;
0364 u32 s_filter = tipc_sub_read(s, filter);
0365
0366 if (s_filter & TIPC_SUB_CANCEL) {
0367 tipc_sub_write(s, filter, s_filter & ~TIPC_SUB_CANCEL);
0368 tipc_conn_delete_sub(con, s);
0369 return 0;
0370 }
0371 if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) {
0372 pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR);
0373 return -1;
0374 }
0375 sub = tipc_sub_subscribe(srv->net, s, con->conid);
0376 if (!sub)
0377 return -1;
0378 atomic_inc(&tn->subscription_count);
0379 spin_lock_bh(&con->sub_lock);
0380 list_add(&sub->sub_list, &con->sub_list);
0381 spin_unlock_bh(&con->sub_lock);
0382 return 0;
0383 }
0384
0385 static int tipc_conn_rcv_from_sock(struct tipc_conn *con)
0386 {
0387 struct tipc_topsrv *srv = con->server;
0388 struct sock *sk = con->sock->sk;
0389 struct msghdr msg = {};
0390 struct tipc_subscr s;
0391 struct kvec iov;
0392 int ret;
0393
0394 iov.iov_base = &s;
0395 iov.iov_len = sizeof(s);
0396 msg.msg_name = NULL;
0397 iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, iov.iov_len);
0398 ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT);
0399 if (ret == -EWOULDBLOCK)
0400 return -EWOULDBLOCK;
0401 if (ret == sizeof(s)) {
0402 read_lock_bh(&sk->sk_callback_lock);
0403
0404 if (likely(connected(con)))
0405 ret = tipc_conn_rcv_sub(srv, con, &s);
0406 read_unlock_bh(&sk->sk_callback_lock);
0407 if (!ret)
0408 return 0;
0409 }
0410
0411 tipc_conn_close(con);
0412 return ret;
0413 }
0414
0415 static void tipc_conn_recv_work(struct work_struct *work)
0416 {
0417 struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
0418 int count = 0;
0419
0420 while (connected(con)) {
0421 if (tipc_conn_rcv_from_sock(con))
0422 break;
0423
0424
0425 if (++count >= MAX_RECV_MSG_COUNT) {
0426 cond_resched();
0427 count = 0;
0428 }
0429 }
0430 conn_put(con);
0431 }
0432
0433
0434
0435
0436 static void tipc_conn_data_ready(struct sock *sk)
0437 {
0438 struct tipc_conn *con;
0439
0440 read_lock_bh(&sk->sk_callback_lock);
0441 con = sk->sk_user_data;
0442 if (connected(con)) {
0443 conn_get(con);
0444 if (!queue_work(con->server->rcv_wq, &con->rwork))
0445 conn_put(con);
0446 }
0447 read_unlock_bh(&sk->sk_callback_lock);
0448 }
0449
0450 static void tipc_topsrv_accept(struct work_struct *work)
0451 {
0452 struct tipc_topsrv *srv = container_of(work, struct tipc_topsrv, awork);
0453 struct socket *lsock = srv->listener;
0454 struct socket *newsock;
0455 struct tipc_conn *con;
0456 struct sock *newsk;
0457 int ret;
0458
0459 while (1) {
0460 ret = kernel_accept(lsock, &newsock, O_NONBLOCK);
0461 if (ret < 0)
0462 return;
0463 con = tipc_conn_alloc(srv);
0464 if (IS_ERR(con)) {
0465 ret = PTR_ERR(con);
0466 sock_release(newsock);
0467 return;
0468 }
0469
0470 newsk = newsock->sk;
0471 write_lock_bh(&newsk->sk_callback_lock);
0472 newsk->sk_data_ready = tipc_conn_data_ready;
0473 newsk->sk_write_space = tipc_conn_write_space;
0474 newsk->sk_user_data = con;
0475 con->sock = newsock;
0476 write_unlock_bh(&newsk->sk_callback_lock);
0477
0478
0479 newsk->sk_data_ready(newsk);
0480 }
0481 }
0482
0483
0484
0485
0486 static void tipc_topsrv_listener_data_ready(struct sock *sk)
0487 {
0488 struct tipc_topsrv *srv;
0489
0490 read_lock_bh(&sk->sk_callback_lock);
0491 srv = sk->sk_user_data;
0492 if (srv->listener)
0493 queue_work(srv->rcv_wq, &srv->awork);
0494 read_unlock_bh(&sk->sk_callback_lock);
0495 }
0496
0497 static int tipc_topsrv_create_listener(struct tipc_topsrv *srv)
0498 {
0499 struct socket *lsock = NULL;
0500 struct sockaddr_tipc saddr;
0501 struct sock *sk;
0502 int rc;
0503
0504 rc = sock_create_kern(srv->net, AF_TIPC, SOCK_SEQPACKET, 0, &lsock);
0505 if (rc < 0)
0506 return rc;
0507
0508 srv->listener = lsock;
0509 sk = lsock->sk;
0510 write_lock_bh(&sk->sk_callback_lock);
0511 sk->sk_data_ready = tipc_topsrv_listener_data_ready;
0512 sk->sk_user_data = srv;
0513 write_unlock_bh(&sk->sk_callback_lock);
0514
0515 lock_sock(sk);
0516 rc = tsk_set_importance(sk, TIPC_CRITICAL_IMPORTANCE);
0517 release_sock(sk);
0518 if (rc < 0)
0519 goto err;
0520
0521 saddr.family = AF_TIPC;
0522 saddr.addrtype = TIPC_SERVICE_RANGE;
0523 saddr.addr.nameseq.type = TIPC_TOP_SRV;
0524 saddr.addr.nameseq.lower = TIPC_TOP_SRV;
0525 saddr.addr.nameseq.upper = TIPC_TOP_SRV;
0526 saddr.scope = TIPC_NODE_SCOPE;
0527
0528 rc = tipc_sk_bind(lsock, (struct sockaddr *)&saddr, sizeof(saddr));
0529 if (rc < 0)
0530 goto err;
0531 rc = kernel_listen(lsock, 0);
0532 if (rc < 0)
0533 goto err;
0534
0535
0536
0537
0538
0539
0540
0541
0542
0543
0544
0545
0546
0547
0548
0549
0550 module_put(lsock->ops->owner);
0551 module_put(sk->sk_prot_creator->owner);
0552
0553 return 0;
0554 err:
0555 sock_release(lsock);
0556 return -EINVAL;
0557 }
0558
0559 bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
0560 u32 upper, u32 filter, int *conid)
0561 {
0562 struct tipc_subscr sub;
0563 struct tipc_conn *con;
0564 int rc;
0565
0566 sub.seq.type = type;
0567 sub.seq.lower = lower;
0568 sub.seq.upper = upper;
0569 sub.timeout = TIPC_WAIT_FOREVER;
0570 sub.filter = filter;
0571 *(u32 *)&sub.usr_handle = port;
0572
0573 con = tipc_conn_alloc(tipc_topsrv(net));
0574 if (IS_ERR(con))
0575 return false;
0576
0577 *conid = con->conid;
0578 con->sock = NULL;
0579 rc = tipc_conn_rcv_sub(tipc_topsrv(net), con, &sub);
0580 if (rc >= 0)
0581 return true;
0582 conn_put(con);
0583 return false;
0584 }
0585
0586 void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
0587 {
0588 struct tipc_conn *con;
0589
0590 con = tipc_conn_lookup(tipc_topsrv(net), conid);
0591 if (!con)
0592 return;
0593
0594 test_and_clear_bit(CF_CONNECTED, &con->flags);
0595 tipc_conn_delete_sub(con, NULL);
0596 conn_put(con);
0597 conn_put(con);
0598 }
0599
0600 static void tipc_topsrv_kern_evt(struct net *net, struct tipc_event *evt)
0601 {
0602 u32 port = *(u32 *)&evt->s.usr_handle;
0603 u32 self = tipc_own_addr(net);
0604 struct sk_buff_head evtq;
0605 struct sk_buff *skb;
0606
0607 skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt),
0608 self, self, port, port, 0);
0609 if (!skb)
0610 return;
0611 msg_set_dest_droppable(buf_msg(skb), true);
0612 memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt));
0613 skb_queue_head_init(&evtq);
0614 __skb_queue_tail(&evtq, skb);
0615 tipc_loopback_trace(net, &evtq);
0616 tipc_sk_rcv(net, &evtq);
0617 }
0618
0619 static int tipc_topsrv_work_start(struct tipc_topsrv *s)
0620 {
0621 s->rcv_wq = alloc_ordered_workqueue("tipc_rcv", 0);
0622 if (!s->rcv_wq) {
0623 pr_err("can't start tipc receive workqueue\n");
0624 return -ENOMEM;
0625 }
0626
0627 s->send_wq = alloc_ordered_workqueue("tipc_send", 0);
0628 if (!s->send_wq) {
0629 pr_err("can't start tipc send workqueue\n");
0630 destroy_workqueue(s->rcv_wq);
0631 return -ENOMEM;
0632 }
0633
0634 return 0;
0635 }
0636
0637 static void tipc_topsrv_work_stop(struct tipc_topsrv *s)
0638 {
0639 destroy_workqueue(s->rcv_wq);
0640 destroy_workqueue(s->send_wq);
0641 }
0642
0643 static int tipc_topsrv_start(struct net *net)
0644 {
0645 struct tipc_net *tn = tipc_net(net);
0646 const char name[] = "topology_server";
0647 struct tipc_topsrv *srv;
0648 int ret;
0649
0650 srv = kzalloc(sizeof(*srv), GFP_ATOMIC);
0651 if (!srv)
0652 return -ENOMEM;
0653
0654 srv->net = net;
0655 INIT_WORK(&srv->awork, tipc_topsrv_accept);
0656
0657 strscpy(srv->name, name, sizeof(srv->name));
0658 tn->topsrv = srv;
0659 atomic_set(&tn->subscription_count, 0);
0660
0661 spin_lock_init(&srv->idr_lock);
0662 idr_init(&srv->conn_idr);
0663 srv->idr_in_use = 0;
0664
0665 ret = tipc_topsrv_work_start(srv);
0666 if (ret < 0)
0667 goto err_start;
0668
0669 ret = tipc_topsrv_create_listener(srv);
0670 if (ret < 0)
0671 goto err_create;
0672
0673 return 0;
0674
0675 err_create:
0676 tipc_topsrv_work_stop(srv);
0677 err_start:
0678 kfree(srv);
0679 return ret;
0680 }
0681
0682 static void tipc_topsrv_stop(struct net *net)
0683 {
0684 struct tipc_topsrv *srv = tipc_topsrv(net);
0685 struct socket *lsock = srv->listener;
0686 struct tipc_conn *con;
0687 int id;
0688
0689 spin_lock_bh(&srv->idr_lock);
0690 for (id = 0; srv->idr_in_use; id++) {
0691 con = idr_find(&srv->conn_idr, id);
0692 if (con) {
0693 spin_unlock_bh(&srv->idr_lock);
0694 tipc_conn_close(con);
0695 spin_lock_bh(&srv->idr_lock);
0696 }
0697 }
0698 __module_get(lsock->ops->owner);
0699 __module_get(lsock->sk->sk_prot_creator->owner);
0700 srv->listener = NULL;
0701 spin_unlock_bh(&srv->idr_lock);
0702 sock_release(lsock);
0703 tipc_topsrv_work_stop(srv);
0704 idr_destroy(&srv->conn_idr);
0705 kfree(srv);
0706 }
0707
0708 int __net_init tipc_topsrv_init_net(struct net *net)
0709 {
0710 return tipc_topsrv_start(net);
0711 }
0712
0713 void __net_exit tipc_topsrv_exit_net(struct net *net)
0714 {
0715 tipc_topsrv_stop(net);
0716 }