Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * net/tipc/server.c: TIPC server infrastructure
0003  *
0004  * Copyright (c) 2012-2013, Wind River Systems
0005  * Copyright (c) 2017-2018, Ericsson AB
0006  * All rights reserved.
0007  *
0008  * Redistribution and use in source and binary forms, with or without
0009  * modification, are permitted provided that the following conditions are met:
0010  *
0011  * 1. Redistributions of source code must retain the above copyright
0012  *    notice, this list of conditions and the following disclaimer.
0013  * 2. Redistributions in binary form must reproduce the above copyright
0014  *    notice, this list of conditions and the following disclaimer in the
0015  *    documentation and/or other materials provided with the distribution.
0016  * 3. Neither the names of the copyright holders nor the names of its
0017  *    contributors may be used to endorse or promote products derived from
0018  *    this software without specific prior written permission.
0019  *
0020  * Alternatively, this software may be distributed under the terms of the
0021  * GNU General Public License ("GPL") version 2 as published by the Free
0022  * Software Foundation.
0023  *
0024  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
0025  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
0026  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
0027  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
0028  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
0029  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
0030  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
0031  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
0032  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
0033  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
0034  * POSSIBILITY OF SUCH DAMAGE.
0035  */
0036 
0037 #include "subscr.h"
0038 #include "topsrv.h"
0039 #include "core.h"
0040 #include "socket.h"
0041 #include "addr.h"
0042 #include "msg.h"
0043 #include "bearer.h"
0044 #include <net/sock.h>
0045 #include <linux/module.h>
0046 
0047 /* Number of messages to send before rescheduling */
0048 #define MAX_SEND_MSG_COUNT  25
0049 #define MAX_RECV_MSG_COUNT  25
0050 #define CF_CONNECTED        1
0051 
0052 #define TIPC_SERVER_NAME_LEN    32
0053 
0054 /**
0055  * struct tipc_topsrv - TIPC server structure
0056  * @conn_idr: identifier set of connection
0057  * @idr_lock: protect the connection identifier set
0058  * @idr_in_use: amount of allocated identifier entry
0059  * @net: network namspace instance
0060  * @awork: accept work item
0061  * @rcv_wq: receive workqueue
0062  * @send_wq: send workqueue
0063  * @listener: topsrv listener socket
0064  * @name: server name
0065  */
0066 struct tipc_topsrv {
0067     struct idr conn_idr;
0068     spinlock_t idr_lock; /* for idr list */
0069     int idr_in_use;
0070     struct net *net;
0071     struct work_struct awork;
0072     struct workqueue_struct *rcv_wq;
0073     struct workqueue_struct *send_wq;
0074     struct socket *listener;
0075     char name[TIPC_SERVER_NAME_LEN];
0076 };
0077 
0078 /**
0079  * struct tipc_conn - TIPC connection structure
0080  * @kref: reference counter to connection object
0081  * @conid: connection identifier
0082  * @sock: socket handler associated with connection
0083  * @flags: indicates connection state
0084  * @server: pointer to connected server
0085  * @sub_list: lsit to all pertaing subscriptions
0086  * @sub_lock: lock protecting the subscription list
0087  * @rwork: receive work item
0088  * @outqueue: pointer to first outbound message in queue
0089  * @outqueue_lock: control access to the outqueue
0090  * @swork: send work item
0091  */
0092 struct tipc_conn {
0093     struct kref kref;
0094     int conid;
0095     struct socket *sock;
0096     unsigned long flags;
0097     struct tipc_topsrv *server;
0098     struct list_head sub_list;
0099     spinlock_t sub_lock; /* for subscription list */
0100     struct work_struct rwork;
0101     struct list_head outqueue;
0102     spinlock_t outqueue_lock; /* for outqueue */
0103     struct work_struct swork;
0104 };
0105 
0106 /* An entry waiting to be sent */
0107 struct outqueue_entry {
0108     bool inactive;
0109     struct tipc_event evt;
0110     struct list_head list;
0111 };
0112 
0113 static void tipc_conn_recv_work(struct work_struct *work);
0114 static void tipc_conn_send_work(struct work_struct *work);
0115 static void tipc_topsrv_kern_evt(struct net *net, struct tipc_event *evt);
0116 static void tipc_conn_delete_sub(struct tipc_conn *con, struct tipc_subscr *s);
0117 
0118 static bool connected(struct tipc_conn *con)
0119 {
0120     return con && test_bit(CF_CONNECTED, &con->flags);
0121 }
0122 
0123 static void tipc_conn_kref_release(struct kref *kref)
0124 {
0125     struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
0126     struct tipc_topsrv *s = con->server;
0127     struct outqueue_entry *e, *safe;
0128 
0129     spin_lock_bh(&s->idr_lock);
0130     idr_remove(&s->conn_idr, con->conid);
0131     s->idr_in_use--;
0132     spin_unlock_bh(&s->idr_lock);
0133     if (con->sock)
0134         sock_release(con->sock);
0135 
0136     spin_lock_bh(&con->outqueue_lock);
0137     list_for_each_entry_safe(e, safe, &con->outqueue, list) {
0138         list_del(&e->list);
0139         kfree(e);
0140     }
0141     spin_unlock_bh(&con->outqueue_lock);
0142     kfree(con);
0143 }
0144 
0145 static void conn_put(struct tipc_conn *con)
0146 {
0147     kref_put(&con->kref, tipc_conn_kref_release);
0148 }
0149 
0150 static void conn_get(struct tipc_conn *con)
0151 {
0152     kref_get(&con->kref);
0153 }
0154 
0155 static void tipc_conn_close(struct tipc_conn *con)
0156 {
0157     struct sock *sk = con->sock->sk;
0158     bool disconnect = false;
0159 
0160     write_lock_bh(&sk->sk_callback_lock);
0161     disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags);
0162 
0163     if (disconnect) {
0164         sk->sk_user_data = NULL;
0165         tipc_conn_delete_sub(con, NULL);
0166     }
0167     write_unlock_bh(&sk->sk_callback_lock);
0168 
0169     /* Handle concurrent calls from sending and receiving threads */
0170     if (!disconnect)
0171         return;
0172 
0173     /* Don't flush pending works, -just let them expire */
0174     kernel_sock_shutdown(con->sock, SHUT_RDWR);
0175 
0176     conn_put(con);
0177 }
0178 
0179 static struct tipc_conn *tipc_conn_alloc(struct tipc_topsrv *s)
0180 {
0181     struct tipc_conn *con;
0182     int ret;
0183 
0184     con = kzalloc(sizeof(*con), GFP_ATOMIC);
0185     if (!con)
0186         return ERR_PTR(-ENOMEM);
0187 
0188     kref_init(&con->kref);
0189     INIT_LIST_HEAD(&con->outqueue);
0190     INIT_LIST_HEAD(&con->sub_list);
0191     spin_lock_init(&con->outqueue_lock);
0192     spin_lock_init(&con->sub_lock);
0193     INIT_WORK(&con->swork, tipc_conn_send_work);
0194     INIT_WORK(&con->rwork, tipc_conn_recv_work);
0195 
0196     spin_lock_bh(&s->idr_lock);
0197     ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC);
0198     if (ret < 0) {
0199         kfree(con);
0200         spin_unlock_bh(&s->idr_lock);
0201         return ERR_PTR(-ENOMEM);
0202     }
0203     con->conid = ret;
0204     s->idr_in_use++;
0205     spin_unlock_bh(&s->idr_lock);
0206 
0207     set_bit(CF_CONNECTED, &con->flags);
0208     con->server = s;
0209 
0210     return con;
0211 }
0212 
0213 static struct tipc_conn *tipc_conn_lookup(struct tipc_topsrv *s, int conid)
0214 {
0215     struct tipc_conn *con;
0216 
0217     spin_lock_bh(&s->idr_lock);
0218     con = idr_find(&s->conn_idr, conid);
0219     if (!connected(con) || !kref_get_unless_zero(&con->kref))
0220         con = NULL;
0221     spin_unlock_bh(&s->idr_lock);
0222     return con;
0223 }
0224 
0225 /* tipc_conn_delete_sub - delete a specific or all subscriptions
0226  * for a given subscriber
0227  */
0228 static void tipc_conn_delete_sub(struct tipc_conn *con, struct tipc_subscr *s)
0229 {
0230     struct tipc_net *tn = tipc_net(con->server->net);
0231     struct list_head *sub_list = &con->sub_list;
0232     struct tipc_subscription *sub, *tmp;
0233 
0234     spin_lock_bh(&con->sub_lock);
0235     list_for_each_entry_safe(sub, tmp, sub_list, sub_list) {
0236         if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) {
0237             tipc_sub_unsubscribe(sub);
0238             atomic_dec(&tn->subscription_count);
0239             if (s)
0240                 break;
0241         }
0242     }
0243     spin_unlock_bh(&con->sub_lock);
0244 }
0245 
0246 static void tipc_conn_send_to_sock(struct tipc_conn *con)
0247 {
0248     struct list_head *queue = &con->outqueue;
0249     struct tipc_topsrv *srv = con->server;
0250     struct outqueue_entry *e;
0251     struct tipc_event *evt;
0252     struct msghdr msg;
0253     struct kvec iov;
0254     int count = 0;
0255     int ret;
0256 
0257     spin_lock_bh(&con->outqueue_lock);
0258 
0259     while (!list_empty(queue)) {
0260         e = list_first_entry(queue, struct outqueue_entry, list);
0261         evt = &e->evt;
0262         spin_unlock_bh(&con->outqueue_lock);
0263 
0264         if (e->inactive)
0265             tipc_conn_delete_sub(con, &evt->s);
0266 
0267         memset(&msg, 0, sizeof(msg));
0268         msg.msg_flags = MSG_DONTWAIT;
0269         iov.iov_base = evt;
0270         iov.iov_len = sizeof(*evt);
0271         msg.msg_name = NULL;
0272 
0273         if (con->sock) {
0274             ret = kernel_sendmsg(con->sock, &msg, &iov,
0275                          1, sizeof(*evt));
0276             if (ret == -EWOULDBLOCK || ret == 0) {
0277                 cond_resched();
0278                 return;
0279             } else if (ret < 0) {
0280                 return tipc_conn_close(con);
0281             }
0282         } else {
0283             tipc_topsrv_kern_evt(srv->net, evt);
0284         }
0285 
0286         /* Don't starve users filling buffers */
0287         if (++count >= MAX_SEND_MSG_COUNT) {
0288             cond_resched();
0289             count = 0;
0290         }
0291         spin_lock_bh(&con->outqueue_lock);
0292         list_del(&e->list);
0293         kfree(e);
0294     }
0295     spin_unlock_bh(&con->outqueue_lock);
0296 }
0297 
0298 static void tipc_conn_send_work(struct work_struct *work)
0299 {
0300     struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
0301 
0302     if (connected(con))
0303         tipc_conn_send_to_sock(con);
0304 
0305     conn_put(con);
0306 }
0307 
0308 /* tipc_topsrv_queue_evt() - interrupt level call from a subscription instance
0309  * The queued work is launched into tipc_conn_send_work()->tipc_conn_send_to_sock()
0310  */
0311 void tipc_topsrv_queue_evt(struct net *net, int conid,
0312                u32 event, struct tipc_event *evt)
0313 {
0314     struct tipc_topsrv *srv = tipc_topsrv(net);
0315     struct outqueue_entry *e;
0316     struct tipc_conn *con;
0317 
0318     con = tipc_conn_lookup(srv, conid);
0319     if (!con)
0320         return;
0321 
0322     if (!connected(con))
0323         goto err;
0324 
0325     e = kmalloc(sizeof(*e), GFP_ATOMIC);
0326     if (!e)
0327         goto err;
0328     e->inactive = (event == TIPC_SUBSCR_TIMEOUT);
0329     memcpy(&e->evt, evt, sizeof(*evt));
0330     spin_lock_bh(&con->outqueue_lock);
0331     list_add_tail(&e->list, &con->outqueue);
0332     spin_unlock_bh(&con->outqueue_lock);
0333 
0334     if (queue_work(srv->send_wq, &con->swork))
0335         return;
0336 err:
0337     conn_put(con);
0338 }
0339 
0340 /* tipc_conn_write_space - interrupt callback after a sendmsg EAGAIN
0341  * Indicates that there now is more space in the send buffer
0342  * The queued work is launched into tipc_send_work()->tipc_conn_send_to_sock()
0343  */
0344 static void tipc_conn_write_space(struct sock *sk)
0345 {
0346     struct tipc_conn *con;
0347 
0348     read_lock_bh(&sk->sk_callback_lock);
0349     con = sk->sk_user_data;
0350     if (connected(con)) {
0351         conn_get(con);
0352         if (!queue_work(con->server->send_wq, &con->swork))
0353             conn_put(con);
0354     }
0355     read_unlock_bh(&sk->sk_callback_lock);
0356 }
0357 
0358 static int tipc_conn_rcv_sub(struct tipc_topsrv *srv,
0359                  struct tipc_conn *con,
0360                  struct tipc_subscr *s)
0361 {
0362     struct tipc_net *tn = tipc_net(srv->net);
0363     struct tipc_subscription *sub;
0364     u32 s_filter = tipc_sub_read(s, filter);
0365 
0366     if (s_filter & TIPC_SUB_CANCEL) {
0367         tipc_sub_write(s, filter, s_filter & ~TIPC_SUB_CANCEL);
0368         tipc_conn_delete_sub(con, s);
0369         return 0;
0370     }
0371     if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) {
0372         pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR);
0373         return -1;
0374     }
0375     sub = tipc_sub_subscribe(srv->net, s, con->conid);
0376     if (!sub)
0377         return -1;
0378     atomic_inc(&tn->subscription_count);
0379     spin_lock_bh(&con->sub_lock);
0380     list_add(&sub->sub_list, &con->sub_list);
0381     spin_unlock_bh(&con->sub_lock);
0382     return 0;
0383 }
0384 
0385 static int tipc_conn_rcv_from_sock(struct tipc_conn *con)
0386 {
0387     struct tipc_topsrv *srv = con->server;
0388     struct sock *sk = con->sock->sk;
0389     struct msghdr msg = {};
0390     struct tipc_subscr s;
0391     struct kvec iov;
0392     int ret;
0393 
0394     iov.iov_base = &s;
0395     iov.iov_len = sizeof(s);
0396     msg.msg_name = NULL;
0397     iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, iov.iov_len);
0398     ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT);
0399     if (ret == -EWOULDBLOCK)
0400         return -EWOULDBLOCK;
0401     if (ret == sizeof(s)) {
0402         read_lock_bh(&sk->sk_callback_lock);
0403         /* RACE: the connection can be closed in the meantime */
0404         if (likely(connected(con)))
0405             ret = tipc_conn_rcv_sub(srv, con, &s);
0406         read_unlock_bh(&sk->sk_callback_lock);
0407         if (!ret)
0408             return 0;
0409     }
0410 
0411     tipc_conn_close(con);
0412     return ret;
0413 }
0414 
0415 static void tipc_conn_recv_work(struct work_struct *work)
0416 {
0417     struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
0418     int count = 0;
0419 
0420     while (connected(con)) {
0421         if (tipc_conn_rcv_from_sock(con))
0422             break;
0423 
0424         /* Don't flood Rx machine */
0425         if (++count >= MAX_RECV_MSG_COUNT) {
0426             cond_resched();
0427             count = 0;
0428         }
0429     }
0430     conn_put(con);
0431 }
0432 
0433 /* tipc_conn_data_ready - interrupt callback indicating the socket has data
0434  * The queued work is launched into tipc_recv_work()->tipc_conn_rcv_from_sock()
0435  */
0436 static void tipc_conn_data_ready(struct sock *sk)
0437 {
0438     struct tipc_conn *con;
0439 
0440     read_lock_bh(&sk->sk_callback_lock);
0441     con = sk->sk_user_data;
0442     if (connected(con)) {
0443         conn_get(con);
0444         if (!queue_work(con->server->rcv_wq, &con->rwork))
0445             conn_put(con);
0446     }
0447     read_unlock_bh(&sk->sk_callback_lock);
0448 }
0449 
0450 static void tipc_topsrv_accept(struct work_struct *work)
0451 {
0452     struct tipc_topsrv *srv = container_of(work, struct tipc_topsrv, awork);
0453     struct socket *lsock = srv->listener;
0454     struct socket *newsock;
0455     struct tipc_conn *con;
0456     struct sock *newsk;
0457     int ret;
0458 
0459     while (1) {
0460         ret = kernel_accept(lsock, &newsock, O_NONBLOCK);
0461         if (ret < 0)
0462             return;
0463         con = tipc_conn_alloc(srv);
0464         if (IS_ERR(con)) {
0465             ret = PTR_ERR(con);
0466             sock_release(newsock);
0467             return;
0468         }
0469         /* Register callbacks */
0470         newsk = newsock->sk;
0471         write_lock_bh(&newsk->sk_callback_lock);
0472         newsk->sk_data_ready = tipc_conn_data_ready;
0473         newsk->sk_write_space = tipc_conn_write_space;
0474         newsk->sk_user_data = con;
0475         con->sock = newsock;
0476         write_unlock_bh(&newsk->sk_callback_lock);
0477 
0478         /* Wake up receive process in case of 'SYN+' message */
0479         newsk->sk_data_ready(newsk);
0480     }
0481 }
0482 
0483 /* tipc_topsrv_listener_data_ready - interrupt callback with connection request
0484  * The queued job is launched into tipc_topsrv_accept()
0485  */
0486 static void tipc_topsrv_listener_data_ready(struct sock *sk)
0487 {
0488     struct tipc_topsrv *srv;
0489 
0490     read_lock_bh(&sk->sk_callback_lock);
0491     srv = sk->sk_user_data;
0492     if (srv->listener)
0493         queue_work(srv->rcv_wq, &srv->awork);
0494     read_unlock_bh(&sk->sk_callback_lock);
0495 }
0496 
0497 static int tipc_topsrv_create_listener(struct tipc_topsrv *srv)
0498 {
0499     struct socket *lsock = NULL;
0500     struct sockaddr_tipc saddr;
0501     struct sock *sk;
0502     int rc;
0503 
0504     rc = sock_create_kern(srv->net, AF_TIPC, SOCK_SEQPACKET, 0, &lsock);
0505     if (rc < 0)
0506         return rc;
0507 
0508     srv->listener = lsock;
0509     sk = lsock->sk;
0510     write_lock_bh(&sk->sk_callback_lock);
0511     sk->sk_data_ready = tipc_topsrv_listener_data_ready;
0512     sk->sk_user_data = srv;
0513     write_unlock_bh(&sk->sk_callback_lock);
0514 
0515     lock_sock(sk);
0516     rc = tsk_set_importance(sk, TIPC_CRITICAL_IMPORTANCE);
0517     release_sock(sk);
0518     if (rc < 0)
0519         goto err;
0520 
0521     saddr.family                    = AF_TIPC;
0522     saddr.addrtype              = TIPC_SERVICE_RANGE;
0523     saddr.addr.nameseq.type = TIPC_TOP_SRV;
0524     saddr.addr.nameseq.lower    = TIPC_TOP_SRV;
0525     saddr.addr.nameseq.upper    = TIPC_TOP_SRV;
0526     saddr.scope         = TIPC_NODE_SCOPE;
0527 
0528     rc = tipc_sk_bind(lsock, (struct sockaddr *)&saddr, sizeof(saddr));
0529     if (rc < 0)
0530         goto err;
0531     rc = kernel_listen(lsock, 0);
0532     if (rc < 0)
0533         goto err;
0534 
0535     /* As server's listening socket owner and creator is the same module,
0536      * we have to decrease TIPC module reference count to guarantee that
0537      * it remains zero after the server socket is created, otherwise,
0538      * executing "rmmod" command is unable to make TIPC module deleted
0539      * after TIPC module is inserted successfully.
0540      *
0541      * However, the reference count is ever increased twice in
0542      * sock_create_kern(): one is to increase the reference count of owner
0543      * of TIPC socket's proto_ops struct; another is to increment the
0544      * reference count of owner of TIPC proto struct. Therefore, we must
0545      * decrement the module reference count twice to ensure that it keeps
0546      * zero after server's listening socket is created. Of course, we
0547      * must bump the module reference count twice as well before the socket
0548      * is closed.
0549      */
0550     module_put(lsock->ops->owner);
0551     module_put(sk->sk_prot_creator->owner);
0552 
0553     return 0;
0554 err:
0555     sock_release(lsock);
0556     return -EINVAL;
0557 }
0558 
0559 bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
0560                  u32 upper, u32 filter, int *conid)
0561 {
0562     struct tipc_subscr sub;
0563     struct tipc_conn *con;
0564     int rc;
0565 
0566     sub.seq.type = type;
0567     sub.seq.lower = lower;
0568     sub.seq.upper = upper;
0569     sub.timeout = TIPC_WAIT_FOREVER;
0570     sub.filter = filter;
0571     *(u32 *)&sub.usr_handle = port;
0572 
0573     con = tipc_conn_alloc(tipc_topsrv(net));
0574     if (IS_ERR(con))
0575         return false;
0576 
0577     *conid = con->conid;
0578     con->sock = NULL;
0579     rc = tipc_conn_rcv_sub(tipc_topsrv(net), con, &sub);
0580     if (rc >= 0)
0581         return true;
0582     conn_put(con);
0583     return false;
0584 }
0585 
0586 void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
0587 {
0588     struct tipc_conn *con;
0589 
0590     con = tipc_conn_lookup(tipc_topsrv(net), conid);
0591     if (!con)
0592         return;
0593 
0594     test_and_clear_bit(CF_CONNECTED, &con->flags);
0595     tipc_conn_delete_sub(con, NULL);
0596     conn_put(con);
0597     conn_put(con);
0598 }
0599 
0600 static void tipc_topsrv_kern_evt(struct net *net, struct tipc_event *evt)
0601 {
0602     u32 port = *(u32 *)&evt->s.usr_handle;
0603     u32 self = tipc_own_addr(net);
0604     struct sk_buff_head evtq;
0605     struct sk_buff *skb;
0606 
0607     skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt),
0608                   self, self, port, port, 0);
0609     if (!skb)
0610         return;
0611     msg_set_dest_droppable(buf_msg(skb), true);
0612     memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt));
0613     skb_queue_head_init(&evtq);
0614     __skb_queue_tail(&evtq, skb);
0615     tipc_loopback_trace(net, &evtq);
0616     tipc_sk_rcv(net, &evtq);
0617 }
0618 
0619 static int tipc_topsrv_work_start(struct tipc_topsrv *s)
0620 {
0621     s->rcv_wq = alloc_ordered_workqueue("tipc_rcv", 0);
0622     if (!s->rcv_wq) {
0623         pr_err("can't start tipc receive workqueue\n");
0624         return -ENOMEM;
0625     }
0626 
0627     s->send_wq = alloc_ordered_workqueue("tipc_send", 0);
0628     if (!s->send_wq) {
0629         pr_err("can't start tipc send workqueue\n");
0630         destroy_workqueue(s->rcv_wq);
0631         return -ENOMEM;
0632     }
0633 
0634     return 0;
0635 }
0636 
0637 static void tipc_topsrv_work_stop(struct tipc_topsrv *s)
0638 {
0639     destroy_workqueue(s->rcv_wq);
0640     destroy_workqueue(s->send_wq);
0641 }
0642 
0643 static int tipc_topsrv_start(struct net *net)
0644 {
0645     struct tipc_net *tn = tipc_net(net);
0646     const char name[] = "topology_server";
0647     struct tipc_topsrv *srv;
0648     int ret;
0649 
0650     srv = kzalloc(sizeof(*srv), GFP_ATOMIC);
0651     if (!srv)
0652         return -ENOMEM;
0653 
0654     srv->net = net;
0655     INIT_WORK(&srv->awork, tipc_topsrv_accept);
0656 
0657     strscpy(srv->name, name, sizeof(srv->name));
0658     tn->topsrv = srv;
0659     atomic_set(&tn->subscription_count, 0);
0660 
0661     spin_lock_init(&srv->idr_lock);
0662     idr_init(&srv->conn_idr);
0663     srv->idr_in_use = 0;
0664 
0665     ret = tipc_topsrv_work_start(srv);
0666     if (ret < 0)
0667         goto err_start;
0668 
0669     ret = tipc_topsrv_create_listener(srv);
0670     if (ret < 0)
0671         goto err_create;
0672 
0673     return 0;
0674 
0675 err_create:
0676     tipc_topsrv_work_stop(srv);
0677 err_start:
0678     kfree(srv);
0679     return ret;
0680 }
0681 
0682 static void tipc_topsrv_stop(struct net *net)
0683 {
0684     struct tipc_topsrv *srv = tipc_topsrv(net);
0685     struct socket *lsock = srv->listener;
0686     struct tipc_conn *con;
0687     int id;
0688 
0689     spin_lock_bh(&srv->idr_lock);
0690     for (id = 0; srv->idr_in_use; id++) {
0691         con = idr_find(&srv->conn_idr, id);
0692         if (con) {
0693             spin_unlock_bh(&srv->idr_lock);
0694             tipc_conn_close(con);
0695             spin_lock_bh(&srv->idr_lock);
0696         }
0697     }
0698     __module_get(lsock->ops->owner);
0699     __module_get(lsock->sk->sk_prot_creator->owner);
0700     srv->listener = NULL;
0701     spin_unlock_bh(&srv->idr_lock);
0702     sock_release(lsock);
0703     tipc_topsrv_work_stop(srv);
0704     idr_destroy(&srv->conn_idr);
0705     kfree(srv);
0706 }
0707 
0708 int __net_init tipc_topsrv_init_net(struct net *net)
0709 {
0710     return tipc_topsrv_start(net);
0711 }
0712 
0713 void __net_exit tipc_topsrv_exit_net(struct net *net)
0714 {
0715     tipc_topsrv_stop(net);
0716 }