0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040 #include <linux/kernel.h>
0041 #include <linux/sched/mm.h>
0042 #include <linux/jiffies.h>
0043 #include <linux/slab.h>
0044 #include <linux/idr.h>
0045 #include <linux/kref.h>
0046 #include <linux/net.h>
0047 #include <linux/export.h>
0048 #include <net/tcp.h>
0049
0050 #include <linux/uaccess.h>
0051
0052 #include "heartbeat.h"
0053 #include "tcp.h"
0054 #include "nodemanager.h"
0055 #define MLOG_MASK_PREFIX ML_TCP
0056 #include "masklog.h"
0057 #include "quorum.h"
0058
0059 #include "tcp_internal.h"
0060
0061 #define SC_NODEF_FMT "node %s (num %u) at %pI4:%u"
0062 #define SC_NODEF_ARGS(sc) sc->sc_node->nd_name, sc->sc_node->nd_num, \
0063 &sc->sc_node->nd_ipv4_address, \
0064 ntohs(sc->sc_node->nd_ipv4_port)
0065
0066
0067
0068
0069
0070
0071 #define msglog(hdr, fmt, args...) do { \
0072 typeof(hdr) __hdr = (hdr); \
0073 mlog(ML_MSG, "[mag %u len %u typ %u stat %d sys_stat %d " \
0074 "key %08x num %u] " fmt, \
0075 be16_to_cpu(__hdr->magic), be16_to_cpu(__hdr->data_len), \
0076 be16_to_cpu(__hdr->msg_type), be32_to_cpu(__hdr->status), \
0077 be32_to_cpu(__hdr->sys_status), be32_to_cpu(__hdr->key), \
0078 be32_to_cpu(__hdr->msg_num) , ##args); \
0079 } while (0)
0080
0081 #define sclog(sc, fmt, args...) do { \
0082 typeof(sc) __sc = (sc); \
0083 mlog(ML_SOCKET, "[sc %p refs %d sock %p node %u page %p " \
0084 "pg_off %zu] " fmt, __sc, \
0085 kref_read(&__sc->sc_kref), __sc->sc_sock, \
0086 __sc->sc_node->nd_num, __sc->sc_page, __sc->sc_page_off , \
0087 ##args); \
0088 } while (0)
0089
0090 static DEFINE_RWLOCK(o2net_handler_lock);
0091 static struct rb_root o2net_handler_tree = RB_ROOT;
0092
0093 static struct o2net_node o2net_nodes[O2NM_MAX_NODES];
0094
0095
0096 static struct socket *o2net_listen_sock;
0097
0098
0099
0100
0101
0102
0103
0104
0105
0106 static struct workqueue_struct *o2net_wq;
0107 static struct work_struct o2net_listen_work;
0108
0109 static struct o2hb_callback_func o2net_hb_up, o2net_hb_down;
0110 #define O2NET_HB_PRI 0x1
0111
0112 static struct o2net_handshake *o2net_hand;
0113 static struct o2net_msg *o2net_keep_req, *o2net_keep_resp;
0114
0115 static int o2net_sys_err_translations[O2NET_ERR_MAX] =
0116 {[O2NET_ERR_NONE] = 0,
0117 [O2NET_ERR_NO_HNDLR] = -ENOPROTOOPT,
0118 [O2NET_ERR_OVERFLOW] = -EOVERFLOW,
0119 [O2NET_ERR_DIED] = -EHOSTDOWN,};
0120
0121
0122 static void o2net_sc_connect_completed(struct work_struct *work);
0123 static void o2net_rx_until_empty(struct work_struct *work);
0124 static void o2net_shutdown_sc(struct work_struct *work);
0125 static void o2net_listen_data_ready(struct sock *sk);
0126 static void o2net_sc_send_keep_req(struct work_struct *work);
0127 static void o2net_idle_timer(struct timer_list *t);
0128 static void o2net_sc_postpone_idle(struct o2net_sock_container *sc);
0129 static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc);
0130
0131 #ifdef CONFIG_DEBUG_FS
0132 static void o2net_init_nst(struct o2net_send_tracking *nst, u32 msgtype,
0133 u32 msgkey, struct task_struct *task, u8 node)
0134 {
0135 INIT_LIST_HEAD(&nst->st_net_debug_item);
0136 nst->st_task = task;
0137 nst->st_msg_type = msgtype;
0138 nst->st_msg_key = msgkey;
0139 nst->st_node = node;
0140 }
0141
0142 static inline void o2net_set_nst_sock_time(struct o2net_send_tracking *nst)
0143 {
0144 nst->st_sock_time = ktime_get();
0145 }
0146
0147 static inline void o2net_set_nst_send_time(struct o2net_send_tracking *nst)
0148 {
0149 nst->st_send_time = ktime_get();
0150 }
0151
0152 static inline void o2net_set_nst_status_time(struct o2net_send_tracking *nst)
0153 {
0154 nst->st_status_time = ktime_get();
0155 }
0156
0157 static inline void o2net_set_nst_sock_container(struct o2net_send_tracking *nst,
0158 struct o2net_sock_container *sc)
0159 {
0160 nst->st_sc = sc;
0161 }
0162
0163 static inline void o2net_set_nst_msg_id(struct o2net_send_tracking *nst,
0164 u32 msg_id)
0165 {
0166 nst->st_id = msg_id;
0167 }
0168
0169 static inline void o2net_set_sock_timer(struct o2net_sock_container *sc)
0170 {
0171 sc->sc_tv_timer = ktime_get();
0172 }
0173
0174 static inline void o2net_set_data_ready_time(struct o2net_sock_container *sc)
0175 {
0176 sc->sc_tv_data_ready = ktime_get();
0177 }
0178
0179 static inline void o2net_set_advance_start_time(struct o2net_sock_container *sc)
0180 {
0181 sc->sc_tv_advance_start = ktime_get();
0182 }
0183
0184 static inline void o2net_set_advance_stop_time(struct o2net_sock_container *sc)
0185 {
0186 sc->sc_tv_advance_stop = ktime_get();
0187 }
0188
0189 static inline void o2net_set_func_start_time(struct o2net_sock_container *sc)
0190 {
0191 sc->sc_tv_func_start = ktime_get();
0192 }
0193
0194 static inline void o2net_set_func_stop_time(struct o2net_sock_container *sc)
0195 {
0196 sc->sc_tv_func_stop = ktime_get();
0197 }
0198
0199 #else
0200 # define o2net_init_nst(a, b, c, d, e)
0201 # define o2net_set_nst_sock_time(a)
0202 # define o2net_set_nst_send_time(a)
0203 # define o2net_set_nst_status_time(a)
0204 # define o2net_set_nst_sock_container(a, b)
0205 # define o2net_set_nst_msg_id(a, b)
0206 # define o2net_set_sock_timer(a)
0207 # define o2net_set_data_ready_time(a)
0208 # define o2net_set_advance_start_time(a)
0209 # define o2net_set_advance_stop_time(a)
0210 # define o2net_set_func_start_time(a)
0211 # define o2net_set_func_stop_time(a)
0212 #endif
0213
0214 #ifdef CONFIG_OCFS2_FS_STATS
0215 static ktime_t o2net_get_func_run_time(struct o2net_sock_container *sc)
0216 {
0217 return ktime_sub(sc->sc_tv_func_stop, sc->sc_tv_func_start);
0218 }
0219
0220 static void o2net_update_send_stats(struct o2net_send_tracking *nst,
0221 struct o2net_sock_container *sc)
0222 {
0223 sc->sc_tv_status_total = ktime_add(sc->sc_tv_status_total,
0224 ktime_sub(ktime_get(),
0225 nst->st_status_time));
0226 sc->sc_tv_send_total = ktime_add(sc->sc_tv_send_total,
0227 ktime_sub(nst->st_status_time,
0228 nst->st_send_time));
0229 sc->sc_tv_acquiry_total = ktime_add(sc->sc_tv_acquiry_total,
0230 ktime_sub(nst->st_send_time,
0231 nst->st_sock_time));
0232 sc->sc_send_count++;
0233 }
0234
0235 static void o2net_update_recv_stats(struct o2net_sock_container *sc)
0236 {
0237 sc->sc_tv_process_total = ktime_add(sc->sc_tv_process_total,
0238 o2net_get_func_run_time(sc));
0239 sc->sc_recv_count++;
0240 }
0241
0242 #else
0243
0244 # define o2net_update_send_stats(a, b)
0245
0246 # define o2net_update_recv_stats(sc)
0247
0248 #endif
0249
0250 static inline unsigned int o2net_reconnect_delay(void)
0251 {
0252 return o2nm_single_cluster->cl_reconnect_delay_ms;
0253 }
0254
0255 static inline unsigned int o2net_keepalive_delay(void)
0256 {
0257 return o2nm_single_cluster->cl_keepalive_delay_ms;
0258 }
0259
0260 static inline unsigned int o2net_idle_timeout(void)
0261 {
0262 return o2nm_single_cluster->cl_idle_timeout_ms;
0263 }
0264
0265 static inline int o2net_sys_err_to_errno(enum o2net_system_error err)
0266 {
0267 int trans;
0268 BUG_ON(err >= O2NET_ERR_MAX);
0269 trans = o2net_sys_err_translations[err];
0270
0271
0272 BUG_ON(err != O2NET_ERR_NONE && trans == 0);
0273 return trans;
0274 }
0275
0276 static struct o2net_node * o2net_nn_from_num(u8 node_num)
0277 {
0278 BUG_ON(node_num >= ARRAY_SIZE(o2net_nodes));
0279 return &o2net_nodes[node_num];
0280 }
0281
0282 static u8 o2net_num_from_nn(struct o2net_node *nn)
0283 {
0284 BUG_ON(nn == NULL);
0285 return nn - o2net_nodes;
0286 }
0287
0288
0289
0290 static int o2net_prep_nsw(struct o2net_node *nn, struct o2net_status_wait *nsw)
0291 {
0292 int ret;
0293
0294 spin_lock(&nn->nn_lock);
0295 ret = idr_alloc(&nn->nn_status_idr, nsw, 0, 0, GFP_ATOMIC);
0296 if (ret >= 0) {
0297 nsw->ns_id = ret;
0298 list_add_tail(&nsw->ns_node_item, &nn->nn_status_list);
0299 }
0300 spin_unlock(&nn->nn_lock);
0301 if (ret < 0)
0302 return ret;
0303
0304 init_waitqueue_head(&nsw->ns_wq);
0305 nsw->ns_sys_status = O2NET_ERR_NONE;
0306 nsw->ns_status = 0;
0307 return 0;
0308 }
0309
0310 static void o2net_complete_nsw_locked(struct o2net_node *nn,
0311 struct o2net_status_wait *nsw,
0312 enum o2net_system_error sys_status,
0313 s32 status)
0314 {
0315 assert_spin_locked(&nn->nn_lock);
0316
0317 if (!list_empty(&nsw->ns_node_item)) {
0318 list_del_init(&nsw->ns_node_item);
0319 nsw->ns_sys_status = sys_status;
0320 nsw->ns_status = status;
0321 idr_remove(&nn->nn_status_idr, nsw->ns_id);
0322 wake_up(&nsw->ns_wq);
0323 }
0324 }
0325
0326 static void o2net_complete_nsw(struct o2net_node *nn,
0327 struct o2net_status_wait *nsw,
0328 u64 id, enum o2net_system_error sys_status,
0329 s32 status)
0330 {
0331 spin_lock(&nn->nn_lock);
0332 if (nsw == NULL) {
0333 if (id > INT_MAX)
0334 goto out;
0335
0336 nsw = idr_find(&nn->nn_status_idr, id);
0337 if (nsw == NULL)
0338 goto out;
0339 }
0340
0341 o2net_complete_nsw_locked(nn, nsw, sys_status, status);
0342
0343 out:
0344 spin_unlock(&nn->nn_lock);
0345 return;
0346 }
0347
0348 static void o2net_complete_nodes_nsw(struct o2net_node *nn)
0349 {
0350 struct o2net_status_wait *nsw, *tmp;
0351 unsigned int num_kills = 0;
0352
0353 assert_spin_locked(&nn->nn_lock);
0354
0355 list_for_each_entry_safe(nsw, tmp, &nn->nn_status_list, ns_node_item) {
0356 o2net_complete_nsw_locked(nn, nsw, O2NET_ERR_DIED, 0);
0357 num_kills++;
0358 }
0359
0360 mlog(0, "completed %d messages for node %u\n", num_kills,
0361 o2net_num_from_nn(nn));
0362 }
0363
0364 static int o2net_nsw_completed(struct o2net_node *nn,
0365 struct o2net_status_wait *nsw)
0366 {
0367 int completed;
0368 spin_lock(&nn->nn_lock);
0369 completed = list_empty(&nsw->ns_node_item);
0370 spin_unlock(&nn->nn_lock);
0371 return completed;
0372 }
0373
0374
0375
0376 static void sc_kref_release(struct kref *kref)
0377 {
0378 struct o2net_sock_container *sc = container_of(kref,
0379 struct o2net_sock_container, sc_kref);
0380 BUG_ON(timer_pending(&sc->sc_idle_timeout));
0381
0382 sclog(sc, "releasing\n");
0383
0384 if (sc->sc_sock) {
0385 sock_release(sc->sc_sock);
0386 sc->sc_sock = NULL;
0387 }
0388
0389 o2nm_undepend_item(&sc->sc_node->nd_item);
0390 o2nm_node_put(sc->sc_node);
0391 sc->sc_node = NULL;
0392
0393 o2net_debug_del_sc(sc);
0394
0395 if (sc->sc_page)
0396 __free_page(sc->sc_page);
0397 kfree(sc);
0398 }
0399
0400 static void sc_put(struct o2net_sock_container *sc)
0401 {
0402 sclog(sc, "put\n");
0403 kref_put(&sc->sc_kref, sc_kref_release);
0404 }
0405 static void sc_get(struct o2net_sock_container *sc)
0406 {
0407 sclog(sc, "get\n");
0408 kref_get(&sc->sc_kref);
0409 }
0410 static struct o2net_sock_container *sc_alloc(struct o2nm_node *node)
0411 {
0412 struct o2net_sock_container *sc, *ret = NULL;
0413 struct page *page = NULL;
0414 int status = 0;
0415
0416 page = alloc_page(GFP_NOFS);
0417 sc = kzalloc(sizeof(*sc), GFP_NOFS);
0418 if (sc == NULL || page == NULL)
0419 goto out;
0420
0421 kref_init(&sc->sc_kref);
0422 o2nm_node_get(node);
0423 sc->sc_node = node;
0424
0425
0426 status = o2nm_depend_item(&node->nd_item);
0427 if (status) {
0428 mlog_errno(status);
0429 o2nm_node_put(node);
0430 goto out;
0431 }
0432 INIT_WORK(&sc->sc_connect_work, o2net_sc_connect_completed);
0433 INIT_WORK(&sc->sc_rx_work, o2net_rx_until_empty);
0434 INIT_WORK(&sc->sc_shutdown_work, o2net_shutdown_sc);
0435 INIT_DELAYED_WORK(&sc->sc_keepalive_work, o2net_sc_send_keep_req);
0436
0437 timer_setup(&sc->sc_idle_timeout, o2net_idle_timer, 0);
0438
0439 sclog(sc, "alloced\n");
0440
0441 ret = sc;
0442 sc->sc_page = page;
0443 o2net_debug_add_sc(sc);
0444 sc = NULL;
0445 page = NULL;
0446
0447 out:
0448 if (page)
0449 __free_page(page);
0450 kfree(sc);
0451
0452 return ret;
0453 }
0454
0455
0456
0457 static void o2net_sc_queue_work(struct o2net_sock_container *sc,
0458 struct work_struct *work)
0459 {
0460 sc_get(sc);
0461 if (!queue_work(o2net_wq, work))
0462 sc_put(sc);
0463 }
0464 static void o2net_sc_queue_delayed_work(struct o2net_sock_container *sc,
0465 struct delayed_work *work,
0466 int delay)
0467 {
0468 sc_get(sc);
0469 if (!queue_delayed_work(o2net_wq, work, delay))
0470 sc_put(sc);
0471 }
0472 static void o2net_sc_cancel_delayed_work(struct o2net_sock_container *sc,
0473 struct delayed_work *work)
0474 {
0475 if (cancel_delayed_work(work))
0476 sc_put(sc);
0477 }
0478
0479 static atomic_t o2net_connected_peers = ATOMIC_INIT(0);
0480
0481 int o2net_num_connected_peers(void)
0482 {
0483 return atomic_read(&o2net_connected_peers);
0484 }
0485
0486 static void o2net_set_nn_state(struct o2net_node *nn,
0487 struct o2net_sock_container *sc,
0488 unsigned valid, int err)
0489 {
0490 int was_valid = nn->nn_sc_valid;
0491 int was_err = nn->nn_persistent_error;
0492 struct o2net_sock_container *old_sc = nn->nn_sc;
0493
0494 assert_spin_locked(&nn->nn_lock);
0495
0496 if (old_sc && !sc)
0497 atomic_dec(&o2net_connected_peers);
0498 else if (!old_sc && sc)
0499 atomic_inc(&o2net_connected_peers);
0500
0501
0502
0503 BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc);
0504 mlog_bug_on_msg(err && valid, "err %d valid %u\n", err, valid);
0505 mlog_bug_on_msg(valid && !sc, "valid %u sc %p\n", valid, sc);
0506
0507 if (was_valid && !valid && err == 0)
0508 err = -ENOTCONN;
0509
0510 mlog(ML_CONN, "node %u sc: %p -> %p, valid %u -> %u, err %d -> %d\n",
0511 o2net_num_from_nn(nn), nn->nn_sc, sc, nn->nn_sc_valid, valid,
0512 nn->nn_persistent_error, err);
0513
0514 nn->nn_sc = sc;
0515 nn->nn_sc_valid = valid ? 1 : 0;
0516 nn->nn_persistent_error = err;
0517
0518
0519 if (nn->nn_persistent_error || nn->nn_sc_valid)
0520 wake_up(&nn->nn_sc_wq);
0521
0522 if (was_valid && !was_err && nn->nn_persistent_error) {
0523 o2quo_conn_err(o2net_num_from_nn(nn));
0524 queue_delayed_work(o2net_wq, &nn->nn_still_up,
0525 msecs_to_jiffies(O2NET_QUORUM_DELAY_MS));
0526 }
0527
0528 if (was_valid && !valid) {
0529 if (old_sc)
0530 printk(KERN_NOTICE "o2net: No longer connected to "
0531 SC_NODEF_FMT "\n", SC_NODEF_ARGS(old_sc));
0532 o2net_complete_nodes_nsw(nn);
0533 }
0534
0535 if (!was_valid && valid) {
0536 o2quo_conn_up(o2net_num_from_nn(nn));
0537 cancel_delayed_work(&nn->nn_connect_expired);
0538 printk(KERN_NOTICE "o2net: %s " SC_NODEF_FMT "\n",
0539 o2nm_this_node() > sc->sc_node->nd_num ?
0540 "Connected to" : "Accepted connection from",
0541 SC_NODEF_ARGS(sc));
0542 }
0543
0544
0545
0546
0547
0548 if (!valid && o2net_wq) {
0549 unsigned long delay;
0550
0551
0552 delay = (nn->nn_last_connect_attempt +
0553 msecs_to_jiffies(o2net_reconnect_delay()))
0554 - jiffies;
0555 if (delay > msecs_to_jiffies(o2net_reconnect_delay()))
0556 delay = 0;
0557 mlog(ML_CONN, "queueing conn attempt in %lu jiffies\n", delay);
0558 queue_delayed_work(o2net_wq, &nn->nn_connect_work, delay);
0559
0560
0561
0562
0563
0564
0565
0566
0567
0568
0569 delay += msecs_to_jiffies(o2net_idle_timeout());
0570 queue_delayed_work(o2net_wq, &nn->nn_connect_expired, delay);
0571 }
0572
0573
0574 if ((old_sc == NULL) && sc)
0575 sc_get(sc);
0576 if (old_sc && (old_sc != sc)) {
0577 o2net_sc_queue_work(old_sc, &old_sc->sc_shutdown_work);
0578 sc_put(old_sc);
0579 }
0580 }
0581
0582
0583 static void o2net_data_ready(struct sock *sk)
0584 {
0585 void (*ready)(struct sock *sk);
0586 struct o2net_sock_container *sc;
0587
0588 read_lock_bh(&sk->sk_callback_lock);
0589 sc = sk->sk_user_data;
0590 if (sc) {
0591 sclog(sc, "data_ready hit\n");
0592 o2net_set_data_ready_time(sc);
0593 o2net_sc_queue_work(sc, &sc->sc_rx_work);
0594 ready = sc->sc_data_ready;
0595 } else {
0596 ready = sk->sk_data_ready;
0597 }
0598 read_unlock_bh(&sk->sk_callback_lock);
0599
0600 ready(sk);
0601 }
0602
0603
0604 static void o2net_state_change(struct sock *sk)
0605 {
0606 void (*state_change)(struct sock *sk);
0607 struct o2net_sock_container *sc;
0608
0609 read_lock_bh(&sk->sk_callback_lock);
0610 sc = sk->sk_user_data;
0611 if (sc == NULL) {
0612 state_change = sk->sk_state_change;
0613 goto out;
0614 }
0615
0616 sclog(sc, "state_change to %d\n", sk->sk_state);
0617
0618 state_change = sc->sc_state_change;
0619
0620 switch(sk->sk_state) {
0621
0622 case TCP_SYN_SENT:
0623 case TCP_SYN_RECV:
0624 break;
0625 case TCP_ESTABLISHED:
0626 o2net_sc_queue_work(sc, &sc->sc_connect_work);
0627 break;
0628 default:
0629 printk(KERN_INFO "o2net: Connection to " SC_NODEF_FMT
0630 " shutdown, state %d\n",
0631 SC_NODEF_ARGS(sc), sk->sk_state);
0632 o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
0633 break;
0634 }
0635 out:
0636 read_unlock_bh(&sk->sk_callback_lock);
0637 state_change(sk);
0638 }
0639
0640
0641
0642
0643
0644
0645 static void o2net_register_callbacks(struct sock *sk,
0646 struct o2net_sock_container *sc)
0647 {
0648 write_lock_bh(&sk->sk_callback_lock);
0649
0650
0651 if (sk->sk_data_ready == o2net_listen_data_ready) {
0652 sk->sk_data_ready = sk->sk_user_data;
0653 sk->sk_user_data = NULL;
0654 }
0655
0656 BUG_ON(sk->sk_user_data != NULL);
0657 sk->sk_user_data = sc;
0658 sc_get(sc);
0659
0660 sc->sc_data_ready = sk->sk_data_ready;
0661 sc->sc_state_change = sk->sk_state_change;
0662 sk->sk_data_ready = o2net_data_ready;
0663 sk->sk_state_change = o2net_state_change;
0664
0665 mutex_init(&sc->sc_send_lock);
0666
0667 write_unlock_bh(&sk->sk_callback_lock);
0668 }
0669
0670 static int o2net_unregister_callbacks(struct sock *sk,
0671 struct o2net_sock_container *sc)
0672 {
0673 int ret = 0;
0674
0675 write_lock_bh(&sk->sk_callback_lock);
0676 if (sk->sk_user_data == sc) {
0677 ret = 1;
0678 sk->sk_user_data = NULL;
0679 sk->sk_data_ready = sc->sc_data_ready;
0680 sk->sk_state_change = sc->sc_state_change;
0681 }
0682 write_unlock_bh(&sk->sk_callback_lock);
0683
0684 return ret;
0685 }
0686
0687
0688
0689
0690
0691
0692
0693 static void o2net_ensure_shutdown(struct o2net_node *nn,
0694 struct o2net_sock_container *sc,
0695 int err)
0696 {
0697 spin_lock(&nn->nn_lock);
0698 if (nn->nn_sc == sc)
0699 o2net_set_nn_state(nn, NULL, 0, err);
0700 spin_unlock(&nn->nn_lock);
0701 }
0702
0703
0704
0705
0706
0707
0708
0709
0710
0711 static void o2net_shutdown_sc(struct work_struct *work)
0712 {
0713 struct o2net_sock_container *sc =
0714 container_of(work, struct o2net_sock_container,
0715 sc_shutdown_work);
0716 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
0717
0718 sclog(sc, "shutting down\n");
0719
0720
0721 if (o2net_unregister_callbacks(sc->sc_sock->sk, sc)) {
0722
0723
0724 del_timer_sync(&sc->sc_idle_timeout);
0725 o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work);
0726 sc_put(sc);
0727 kernel_sock_shutdown(sc->sc_sock, SHUT_RDWR);
0728 }
0729
0730
0731
0732 o2net_ensure_shutdown(nn, sc, 0);
0733 sc_put(sc);
0734 }
0735
0736
0737
0738 static int o2net_handler_cmp(struct o2net_msg_handler *nmh, u32 msg_type,
0739 u32 key)
0740 {
0741 int ret = memcmp(&nmh->nh_key, &key, sizeof(key));
0742
0743 if (ret == 0)
0744 ret = memcmp(&nmh->nh_msg_type, &msg_type, sizeof(msg_type));
0745
0746 return ret;
0747 }
0748
0749 static struct o2net_msg_handler *
0750 o2net_handler_tree_lookup(u32 msg_type, u32 key, struct rb_node ***ret_p,
0751 struct rb_node **ret_parent)
0752 {
0753 struct rb_node **p = &o2net_handler_tree.rb_node;
0754 struct rb_node *parent = NULL;
0755 struct o2net_msg_handler *nmh, *ret = NULL;
0756 int cmp;
0757
0758 while (*p) {
0759 parent = *p;
0760 nmh = rb_entry(parent, struct o2net_msg_handler, nh_node);
0761 cmp = o2net_handler_cmp(nmh, msg_type, key);
0762
0763 if (cmp < 0)
0764 p = &(*p)->rb_left;
0765 else if (cmp > 0)
0766 p = &(*p)->rb_right;
0767 else {
0768 ret = nmh;
0769 break;
0770 }
0771 }
0772
0773 if (ret_p != NULL)
0774 *ret_p = p;
0775 if (ret_parent != NULL)
0776 *ret_parent = parent;
0777
0778 return ret;
0779 }
0780
0781 static void o2net_handler_kref_release(struct kref *kref)
0782 {
0783 struct o2net_msg_handler *nmh;
0784 nmh = container_of(kref, struct o2net_msg_handler, nh_kref);
0785
0786 kfree(nmh);
0787 }
0788
0789 static void o2net_handler_put(struct o2net_msg_handler *nmh)
0790 {
0791 kref_put(&nmh->nh_kref, o2net_handler_kref_release);
0792 }
0793
0794
0795
0796 int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
0797 o2net_msg_handler_func *func, void *data,
0798 o2net_post_msg_handler_func *post_func,
0799 struct list_head *unreg_list)
0800 {
0801 struct o2net_msg_handler *nmh = NULL;
0802 struct rb_node **p, *parent;
0803 int ret = 0;
0804
0805 if (max_len > O2NET_MAX_PAYLOAD_BYTES) {
0806 mlog(0, "max_len for message handler out of range: %u\n",
0807 max_len);
0808 ret = -EINVAL;
0809 goto out;
0810 }
0811
0812 if (!msg_type) {
0813 mlog(0, "no message type provided: %u, %p\n", msg_type, func);
0814 ret = -EINVAL;
0815 goto out;
0816
0817 }
0818 if (!func) {
0819 mlog(0, "no message handler provided: %u, %p\n",
0820 msg_type, func);
0821 ret = -EINVAL;
0822 goto out;
0823 }
0824
0825 nmh = kzalloc(sizeof(struct o2net_msg_handler), GFP_NOFS);
0826 if (nmh == NULL) {
0827 ret = -ENOMEM;
0828 goto out;
0829 }
0830
0831 nmh->nh_func = func;
0832 nmh->nh_func_data = data;
0833 nmh->nh_post_func = post_func;
0834 nmh->nh_msg_type = msg_type;
0835 nmh->nh_max_len = max_len;
0836 nmh->nh_key = key;
0837
0838
0839 kref_init(&nmh->nh_kref);
0840 INIT_LIST_HEAD(&nmh->nh_unregister_item);
0841
0842 write_lock(&o2net_handler_lock);
0843 if (o2net_handler_tree_lookup(msg_type, key, &p, &parent))
0844 ret = -EEXIST;
0845 else {
0846 rb_link_node(&nmh->nh_node, parent, p);
0847 rb_insert_color(&nmh->nh_node, &o2net_handler_tree);
0848 list_add_tail(&nmh->nh_unregister_item, unreg_list);
0849
0850 mlog(ML_TCP, "registered handler func %p type %u key %08x\n",
0851 func, msg_type, key);
0852
0853 mlog_bug_on_msg(o2net_handler_tree_lookup(msg_type, key, &p,
0854 &parent) == NULL,
0855 "couldn't find handler we *just* registered "
0856 "for type %u key %08x\n", msg_type, key);
0857 }
0858 write_unlock(&o2net_handler_lock);
0859
0860 out:
0861 if (ret)
0862 kfree(nmh);
0863
0864 return ret;
0865 }
0866 EXPORT_SYMBOL_GPL(o2net_register_handler);
0867
0868 void o2net_unregister_handler_list(struct list_head *list)
0869 {
0870 struct o2net_msg_handler *nmh, *n;
0871
0872 write_lock(&o2net_handler_lock);
0873 list_for_each_entry_safe(nmh, n, list, nh_unregister_item) {
0874 mlog(ML_TCP, "unregistering handler func %p type %u key %08x\n",
0875 nmh->nh_func, nmh->nh_msg_type, nmh->nh_key);
0876 rb_erase(&nmh->nh_node, &o2net_handler_tree);
0877 list_del_init(&nmh->nh_unregister_item);
0878 kref_put(&nmh->nh_kref, o2net_handler_kref_release);
0879 }
0880 write_unlock(&o2net_handler_lock);
0881 }
0882 EXPORT_SYMBOL_GPL(o2net_unregister_handler_list);
0883
0884 static struct o2net_msg_handler *o2net_handler_get(u32 msg_type, u32 key)
0885 {
0886 struct o2net_msg_handler *nmh;
0887
0888 read_lock(&o2net_handler_lock);
0889 nmh = o2net_handler_tree_lookup(msg_type, key, NULL, NULL);
0890 if (nmh)
0891 kref_get(&nmh->nh_kref);
0892 read_unlock(&o2net_handler_lock);
0893
0894 return nmh;
0895 }
0896
0897
0898
0899 static int o2net_recv_tcp_msg(struct socket *sock, void *data, size_t len)
0900 {
0901 struct kvec vec = { .iov_len = len, .iov_base = data, };
0902 struct msghdr msg = { .msg_flags = MSG_DONTWAIT, };
0903 iov_iter_kvec(&msg.msg_iter, READ, &vec, 1, len);
0904 return sock_recvmsg(sock, &msg, MSG_DONTWAIT);
0905 }
0906
0907 static int o2net_send_tcp_msg(struct socket *sock, struct kvec *vec,
0908 size_t veclen, size_t total)
0909 {
0910 int ret;
0911 struct msghdr msg = {.msg_flags = 0,};
0912
0913 if (sock == NULL) {
0914 ret = -EINVAL;
0915 goto out;
0916 }
0917
0918 ret = kernel_sendmsg(sock, &msg, vec, veclen, total);
0919 if (likely(ret == total))
0920 return 0;
0921 mlog(ML_ERROR, "sendmsg returned %d instead of %zu\n", ret, total);
0922 if (ret >= 0)
0923 ret = -EPIPE;
0924 out:
0925 mlog(0, "returning error: %d\n", ret);
0926 return ret;
0927 }
0928
0929 static void o2net_sendpage(struct o2net_sock_container *sc,
0930 void *kmalloced_virt,
0931 size_t size)
0932 {
0933 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
0934 ssize_t ret;
0935
0936 while (1) {
0937 mutex_lock(&sc->sc_send_lock);
0938 ret = sc->sc_sock->ops->sendpage(sc->sc_sock,
0939 virt_to_page(kmalloced_virt),
0940 offset_in_page(kmalloced_virt),
0941 size, MSG_DONTWAIT);
0942 mutex_unlock(&sc->sc_send_lock);
0943 if (ret == size)
0944 break;
0945 if (ret == (ssize_t)-EAGAIN) {
0946 mlog(0, "sendpage of size %zu to " SC_NODEF_FMT
0947 " returned EAGAIN\n", size, SC_NODEF_ARGS(sc));
0948 cond_resched();
0949 continue;
0950 }
0951 mlog(ML_ERROR, "sendpage of size %zu to " SC_NODEF_FMT
0952 " failed with %zd\n", size, SC_NODEF_ARGS(sc), ret);
0953 o2net_ensure_shutdown(nn, sc, 0);
0954 break;
0955 }
0956 }
0957
0958 static void o2net_init_msg(struct o2net_msg *msg, u16 data_len, u16 msg_type, u32 key)
0959 {
0960 memset(msg, 0, sizeof(struct o2net_msg));
0961 msg->magic = cpu_to_be16(O2NET_MSG_MAGIC);
0962 msg->data_len = cpu_to_be16(data_len);
0963 msg->msg_type = cpu_to_be16(msg_type);
0964 msg->sys_status = cpu_to_be32(O2NET_ERR_NONE);
0965 msg->status = 0;
0966 msg->key = cpu_to_be32(key);
0967 }
0968
0969 static int o2net_tx_can_proceed(struct o2net_node *nn,
0970 struct o2net_sock_container **sc_ret,
0971 int *error)
0972 {
0973 int ret = 0;
0974
0975 spin_lock(&nn->nn_lock);
0976 if (nn->nn_persistent_error) {
0977 ret = 1;
0978 *sc_ret = NULL;
0979 *error = nn->nn_persistent_error;
0980 } else if (nn->nn_sc_valid) {
0981 kref_get(&nn->nn_sc->sc_kref);
0982
0983 ret = 1;
0984 *sc_ret = nn->nn_sc;
0985 *error = 0;
0986 }
0987 spin_unlock(&nn->nn_lock);
0988
0989 return ret;
0990 }
0991
0992
0993 void o2net_fill_node_map(unsigned long *map, unsigned bytes)
0994 {
0995 struct o2net_sock_container *sc;
0996 int node, ret;
0997
0998 BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
0999
1000 memset(map, 0, bytes);
1001 for (node = 0; node < O2NM_MAX_NODES; ++node) {
1002 if (!o2net_tx_can_proceed(o2net_nn_from_num(node), &sc, &ret))
1003 continue;
1004 if (!ret) {
1005 set_bit(node, map);
1006 sc_put(sc);
1007 }
1008 }
1009 }
1010 EXPORT_SYMBOL_GPL(o2net_fill_node_map);
1011
1012 int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec,
1013 size_t caller_veclen, u8 target_node, int *status)
1014 {
1015 int ret = 0;
1016 struct o2net_msg *msg = NULL;
1017 size_t veclen, caller_bytes = 0;
1018 struct kvec *vec = NULL;
1019 struct o2net_sock_container *sc = NULL;
1020 struct o2net_node *nn = o2net_nn_from_num(target_node);
1021 struct o2net_status_wait nsw = {
1022 .ns_node_item = LIST_HEAD_INIT(nsw.ns_node_item),
1023 };
1024 struct o2net_send_tracking nst;
1025
1026 o2net_init_nst(&nst, msg_type, key, current, target_node);
1027
1028 if (o2net_wq == NULL) {
1029 mlog(0, "attempt to tx without o2netd running\n");
1030 ret = -ESRCH;
1031 goto out;
1032 }
1033
1034 if (caller_veclen == 0) {
1035 mlog(0, "bad kvec array length\n");
1036 ret = -EINVAL;
1037 goto out;
1038 }
1039
1040 caller_bytes = iov_length((struct iovec *)caller_vec, caller_veclen);
1041 if (caller_bytes > O2NET_MAX_PAYLOAD_BYTES) {
1042 mlog(0, "total payload len %zu too large\n", caller_bytes);
1043 ret = -EINVAL;
1044 goto out;
1045 }
1046
1047 if (target_node == o2nm_this_node()) {
1048 ret = -ELOOP;
1049 goto out;
1050 }
1051
1052 o2net_debug_add_nst(&nst);
1053
1054 o2net_set_nst_sock_time(&nst);
1055
1056 wait_event(nn->nn_sc_wq, o2net_tx_can_proceed(nn, &sc, &ret));
1057 if (ret)
1058 goto out;
1059
1060 o2net_set_nst_sock_container(&nst, sc);
1061
1062 veclen = caller_veclen + 1;
1063 vec = kmalloc_array(veclen, sizeof(struct kvec), GFP_ATOMIC);
1064 if (vec == NULL) {
1065 mlog(0, "failed to %zu element kvec!\n", veclen);
1066 ret = -ENOMEM;
1067 goto out;
1068 }
1069
1070 msg = kmalloc(sizeof(struct o2net_msg), GFP_ATOMIC);
1071 if (!msg) {
1072 mlog(0, "failed to allocate a o2net_msg!\n");
1073 ret = -ENOMEM;
1074 goto out;
1075 }
1076
1077 o2net_init_msg(msg, caller_bytes, msg_type, key);
1078
1079 vec[0].iov_len = sizeof(struct o2net_msg);
1080 vec[0].iov_base = msg;
1081 memcpy(&vec[1], caller_vec, caller_veclen * sizeof(struct kvec));
1082
1083 ret = o2net_prep_nsw(nn, &nsw);
1084 if (ret)
1085 goto out;
1086
1087 msg->msg_num = cpu_to_be32(nsw.ns_id);
1088 o2net_set_nst_msg_id(&nst, nsw.ns_id);
1089
1090 o2net_set_nst_send_time(&nst);
1091
1092
1093
1094 mutex_lock(&sc->sc_send_lock);
1095 ret = o2net_send_tcp_msg(sc->sc_sock, vec, veclen,
1096 sizeof(struct o2net_msg) + caller_bytes);
1097 mutex_unlock(&sc->sc_send_lock);
1098 msglog(msg, "sending returned %d\n", ret);
1099 if (ret < 0) {
1100 mlog(0, "error returned from o2net_send_tcp_msg=%d\n", ret);
1101 goto out;
1102 }
1103
1104
1105 o2net_set_nst_status_time(&nst);
1106 wait_event(nsw.ns_wq, o2net_nsw_completed(nn, &nsw));
1107
1108 o2net_update_send_stats(&nst, sc);
1109
1110
1111
1112
1113 ret = o2net_sys_err_to_errno(nsw.ns_sys_status);
1114 if (status && !ret)
1115 *status = nsw.ns_status;
1116
1117 mlog(0, "woken, returning system status %d, user status %d\n",
1118 ret, nsw.ns_status);
1119 out:
1120 o2net_debug_del_nst(&nst);
1121 if (sc)
1122 sc_put(sc);
1123 kfree(vec);
1124 kfree(msg);
1125 o2net_complete_nsw(nn, &nsw, 0, 0, 0);
1126 return ret;
1127 }
1128 EXPORT_SYMBOL_GPL(o2net_send_message_vec);
1129
1130 int o2net_send_message(u32 msg_type, u32 key, void *data, u32 len,
1131 u8 target_node, int *status)
1132 {
1133 struct kvec vec = {
1134 .iov_base = data,
1135 .iov_len = len,
1136 };
1137 return o2net_send_message_vec(msg_type, key, &vec, 1,
1138 target_node, status);
1139 }
1140 EXPORT_SYMBOL_GPL(o2net_send_message);
1141
1142 static int o2net_send_status_magic(struct socket *sock, struct o2net_msg *hdr,
1143 enum o2net_system_error syserr, int err)
1144 {
1145 struct kvec vec = {
1146 .iov_base = hdr,
1147 .iov_len = sizeof(struct o2net_msg),
1148 };
1149
1150 BUG_ON(syserr >= O2NET_ERR_MAX);
1151
1152
1153
1154 hdr->sys_status = cpu_to_be32(syserr);
1155 hdr->status = cpu_to_be32(err);
1156 hdr->magic = cpu_to_be16(O2NET_MSG_STATUS_MAGIC);
1157 hdr->data_len = 0;
1158
1159 msglog(hdr, "about to send status magic %d\n", err);
1160
1161 return o2net_send_tcp_msg(sock, &vec, 1, sizeof(struct o2net_msg));
1162 }
1163
1164
1165
1166 static int o2net_process_message(struct o2net_sock_container *sc,
1167 struct o2net_msg *hdr)
1168 {
1169 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
1170 int ret = 0, handler_status;
1171 enum o2net_system_error syserr;
1172 struct o2net_msg_handler *nmh = NULL;
1173 void *ret_data = NULL;
1174
1175 msglog(hdr, "processing message\n");
1176
1177 o2net_sc_postpone_idle(sc);
1178
1179 switch(be16_to_cpu(hdr->magic)) {
1180 case O2NET_MSG_STATUS_MAGIC:
1181
1182 o2net_complete_nsw(nn, NULL,
1183 be32_to_cpu(hdr->msg_num),
1184 be32_to_cpu(hdr->sys_status),
1185 be32_to_cpu(hdr->status));
1186 goto out;
1187 case O2NET_MSG_KEEP_REQ_MAGIC:
1188 o2net_sendpage(sc, o2net_keep_resp,
1189 sizeof(*o2net_keep_resp));
1190 goto out;
1191 case O2NET_MSG_KEEP_RESP_MAGIC:
1192 goto out;
1193 case O2NET_MSG_MAGIC:
1194 break;
1195 default:
1196 msglog(hdr, "bad magic\n");
1197 ret = -EINVAL;
1198 goto out;
1199 }
1200
1201
1202 handler_status = 0;
1203 nmh = o2net_handler_get(be16_to_cpu(hdr->msg_type),
1204 be32_to_cpu(hdr->key));
1205 if (!nmh) {
1206 mlog(ML_TCP, "couldn't find handler for type %u key %08x\n",
1207 be16_to_cpu(hdr->msg_type), be32_to_cpu(hdr->key));
1208 syserr = O2NET_ERR_NO_HNDLR;
1209 goto out_respond;
1210 }
1211
1212 syserr = O2NET_ERR_NONE;
1213
1214 if (be16_to_cpu(hdr->data_len) > nmh->nh_max_len)
1215 syserr = O2NET_ERR_OVERFLOW;
1216
1217 if (syserr != O2NET_ERR_NONE)
1218 goto out_respond;
1219
1220 o2net_set_func_start_time(sc);
1221 sc->sc_msg_key = be32_to_cpu(hdr->key);
1222 sc->sc_msg_type = be16_to_cpu(hdr->msg_type);
1223 handler_status = (nmh->nh_func)(hdr, sizeof(struct o2net_msg) +
1224 be16_to_cpu(hdr->data_len),
1225 nmh->nh_func_data, &ret_data);
1226 o2net_set_func_stop_time(sc);
1227
1228 o2net_update_recv_stats(sc);
1229
1230 out_respond:
1231
1232 mutex_lock(&sc->sc_send_lock);
1233 ret = o2net_send_status_magic(sc->sc_sock, hdr, syserr,
1234 handler_status);
1235 mutex_unlock(&sc->sc_send_lock);
1236 hdr = NULL;
1237 mlog(0, "sending handler status %d, syserr %d returned %d\n",
1238 handler_status, syserr, ret);
1239
1240 if (nmh) {
1241 BUG_ON(ret_data != NULL && nmh->nh_post_func == NULL);
1242 if (nmh->nh_post_func)
1243 (nmh->nh_post_func)(handler_status, nmh->nh_func_data,
1244 ret_data);
1245 }
1246
1247 out:
1248 if (nmh)
1249 o2net_handler_put(nmh);
1250 return ret;
1251 }
1252
1253 static int o2net_check_handshake(struct o2net_sock_container *sc)
1254 {
1255 struct o2net_handshake *hand = page_address(sc->sc_page);
1256 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
1257
1258 if (hand->protocol_version != cpu_to_be64(O2NET_PROTOCOL_VERSION)) {
1259 printk(KERN_NOTICE "o2net: " SC_NODEF_FMT " Advertised net "
1260 "protocol version %llu but %llu is required. "
1261 "Disconnecting.\n", SC_NODEF_ARGS(sc),
1262 (unsigned long long)be64_to_cpu(hand->protocol_version),
1263 O2NET_PROTOCOL_VERSION);
1264
1265
1266 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1267 return -1;
1268 }
1269
1270
1271
1272
1273
1274
1275 if (be32_to_cpu(hand->o2net_idle_timeout_ms) !=
1276 o2net_idle_timeout()) {
1277 printk(KERN_NOTICE "o2net: " SC_NODEF_FMT " uses a network "
1278 "idle timeout of %u ms, but we use %u ms locally. "
1279 "Disconnecting.\n", SC_NODEF_ARGS(sc),
1280 be32_to_cpu(hand->o2net_idle_timeout_ms),
1281 o2net_idle_timeout());
1282 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1283 return -1;
1284 }
1285
1286 if (be32_to_cpu(hand->o2net_keepalive_delay_ms) !=
1287 o2net_keepalive_delay()) {
1288 printk(KERN_NOTICE "o2net: " SC_NODEF_FMT " uses a keepalive "
1289 "delay of %u ms, but we use %u ms locally. "
1290 "Disconnecting.\n", SC_NODEF_ARGS(sc),
1291 be32_to_cpu(hand->o2net_keepalive_delay_ms),
1292 o2net_keepalive_delay());
1293 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1294 return -1;
1295 }
1296
1297 if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) !=
1298 O2HB_MAX_WRITE_TIMEOUT_MS) {
1299 printk(KERN_NOTICE "o2net: " SC_NODEF_FMT " uses a heartbeat "
1300 "timeout of %u ms, but we use %u ms locally. "
1301 "Disconnecting.\n", SC_NODEF_ARGS(sc),
1302 be32_to_cpu(hand->o2hb_heartbeat_timeout_ms),
1303 O2HB_MAX_WRITE_TIMEOUT_MS);
1304 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1305 return -1;
1306 }
1307
1308 sc->sc_handshake_ok = 1;
1309
1310 spin_lock(&nn->nn_lock);
1311
1312
1313 if (nn->nn_sc == sc) {
1314 o2net_sc_reset_idle_timer(sc);
1315 atomic_set(&nn->nn_timeout, 0);
1316 o2net_set_nn_state(nn, sc, 1, 0);
1317 }
1318 spin_unlock(&nn->nn_lock);
1319
1320
1321 sc->sc_page_off -= sizeof(struct o2net_handshake);
1322 if (sc->sc_page_off)
1323 memmove(hand, hand + 1, sc->sc_page_off);
1324
1325 return 0;
1326 }
1327
1328
1329
1330
1331 static int o2net_advance_rx(struct o2net_sock_container *sc)
1332 {
1333 struct o2net_msg *hdr;
1334 int ret = 0;
1335 void *data;
1336 size_t datalen;
1337
1338 sclog(sc, "receiving\n");
1339 o2net_set_advance_start_time(sc);
1340
1341 if (unlikely(sc->sc_handshake_ok == 0)) {
1342 if(sc->sc_page_off < sizeof(struct o2net_handshake)) {
1343 data = page_address(sc->sc_page) + sc->sc_page_off;
1344 datalen = sizeof(struct o2net_handshake) - sc->sc_page_off;
1345 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1346 if (ret > 0)
1347 sc->sc_page_off += ret;
1348 }
1349
1350 if (sc->sc_page_off == sizeof(struct o2net_handshake)) {
1351 o2net_check_handshake(sc);
1352 if (unlikely(sc->sc_handshake_ok == 0))
1353 ret = -EPROTO;
1354 }
1355 goto out;
1356 }
1357
1358
1359 if (sc->sc_page_off < sizeof(struct o2net_msg)) {
1360 data = page_address(sc->sc_page) + sc->sc_page_off;
1361 datalen = sizeof(struct o2net_msg) - sc->sc_page_off;
1362 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1363 if (ret > 0) {
1364 sc->sc_page_off += ret;
1365
1366
1367
1368 if (sc->sc_page_off == sizeof(struct o2net_msg)) {
1369 hdr = page_address(sc->sc_page);
1370 if (be16_to_cpu(hdr->data_len) >
1371 O2NET_MAX_PAYLOAD_BYTES)
1372 ret = -EOVERFLOW;
1373 }
1374 }
1375 if (ret <= 0)
1376 goto out;
1377 }
1378
1379 if (sc->sc_page_off < sizeof(struct o2net_msg)) {
1380
1381 goto out;
1382 }
1383
1384
1385 hdr = page_address(sc->sc_page);
1386
1387 msglog(hdr, "at page_off %zu\n", sc->sc_page_off);
1388
1389
1390 if (sc->sc_page_off - sizeof(struct o2net_msg) < be16_to_cpu(hdr->data_len)) {
1391
1392 data = page_address(sc->sc_page) + sc->sc_page_off;
1393 datalen = (sizeof(struct o2net_msg) + be16_to_cpu(hdr->data_len)) -
1394 sc->sc_page_off;
1395 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1396 if (ret > 0)
1397 sc->sc_page_off += ret;
1398 if (ret <= 0)
1399 goto out;
1400 }
1401
1402 if (sc->sc_page_off - sizeof(struct o2net_msg) == be16_to_cpu(hdr->data_len)) {
1403
1404
1405
1406 ret = o2net_process_message(sc, hdr);
1407 if (ret == 0)
1408 ret = 1;
1409 sc->sc_page_off = 0;
1410 }
1411
1412 out:
1413 sclog(sc, "ret = %d\n", ret);
1414 o2net_set_advance_stop_time(sc);
1415 return ret;
1416 }
1417
1418
1419
1420
1421 static void o2net_rx_until_empty(struct work_struct *work)
1422 {
1423 struct o2net_sock_container *sc =
1424 container_of(work, struct o2net_sock_container, sc_rx_work);
1425 int ret;
1426
1427 do {
1428 ret = o2net_advance_rx(sc);
1429 } while (ret > 0);
1430
1431 if (ret <= 0 && ret != -EAGAIN) {
1432 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
1433 sclog(sc, "saw error %d, closing\n", ret);
1434
1435 o2net_ensure_shutdown(nn, sc, 0);
1436 }
1437
1438 sc_put(sc);
1439 }
1440
1441 static void o2net_initialize_handshake(void)
1442 {
1443 o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32(
1444 O2HB_MAX_WRITE_TIMEOUT_MS);
1445 o2net_hand->o2net_idle_timeout_ms = cpu_to_be32(o2net_idle_timeout());
1446 o2net_hand->o2net_keepalive_delay_ms = cpu_to_be32(
1447 o2net_keepalive_delay());
1448 o2net_hand->o2net_reconnect_delay_ms = cpu_to_be32(
1449 o2net_reconnect_delay());
1450 }
1451
1452
1453
1454
1455
1456 static void o2net_sc_connect_completed(struct work_struct *work)
1457 {
1458 struct o2net_sock_container *sc =
1459 container_of(work, struct o2net_sock_container,
1460 sc_connect_work);
1461
1462 mlog(ML_MSG, "sc sending handshake with ver %llu id %llx\n",
1463 (unsigned long long)O2NET_PROTOCOL_VERSION,
1464 (unsigned long long)be64_to_cpu(o2net_hand->connector_id));
1465
1466 o2net_initialize_handshake();
1467 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
1468 sc_put(sc);
1469 }
1470
1471
1472 static void o2net_sc_send_keep_req(struct work_struct *work)
1473 {
1474 struct o2net_sock_container *sc =
1475 container_of(work, struct o2net_sock_container,
1476 sc_keepalive_work.work);
1477
1478 o2net_sendpage(sc, o2net_keep_req, sizeof(*o2net_keep_req));
1479 sc_put(sc);
1480 }
1481
1482
1483
1484
1485 static void o2net_idle_timer(struct timer_list *t)
1486 {
1487 struct o2net_sock_container *sc = from_timer(sc, t, sc_idle_timeout);
1488 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
1489 #ifdef CONFIG_DEBUG_FS
1490 unsigned long msecs = ktime_to_ms(ktime_get()) -
1491 ktime_to_ms(sc->sc_tv_timer);
1492 #else
1493 unsigned long msecs = o2net_idle_timeout();
1494 #endif
1495
1496 printk(KERN_NOTICE "o2net: Connection to " SC_NODEF_FMT " has been "
1497 "idle for %lu.%lu secs.\n",
1498 SC_NODEF_ARGS(sc), msecs / 1000, msecs % 1000);
1499
1500
1501
1502
1503
1504 atomic_set(&nn->nn_timeout, 1);
1505 o2quo_conn_err(o2net_num_from_nn(nn));
1506 queue_delayed_work(o2net_wq, &nn->nn_still_up,
1507 msecs_to_jiffies(O2NET_QUORUM_DELAY_MS));
1508
1509 o2net_sc_reset_idle_timer(sc);
1510
1511 }
1512
1513 static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc)
1514 {
1515 o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work);
1516 o2net_sc_queue_delayed_work(sc, &sc->sc_keepalive_work,
1517 msecs_to_jiffies(o2net_keepalive_delay()));
1518 o2net_set_sock_timer(sc);
1519 mod_timer(&sc->sc_idle_timeout,
1520 jiffies + msecs_to_jiffies(o2net_idle_timeout()));
1521 }
1522
1523 static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
1524 {
1525 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
1526
1527
1528 if (atomic_read(&nn->nn_timeout)) {
1529 o2quo_conn_up(o2net_num_from_nn(nn));
1530 cancel_delayed_work(&nn->nn_still_up);
1531 atomic_set(&nn->nn_timeout, 0);
1532 }
1533
1534
1535 if (timer_pending(&sc->sc_idle_timeout))
1536 o2net_sc_reset_idle_timer(sc);
1537 }
1538
1539
1540
1541
1542
1543
1544 static void o2net_start_connect(struct work_struct *work)
1545 {
1546 struct o2net_node *nn =
1547 container_of(work, struct o2net_node, nn_connect_work.work);
1548 struct o2net_sock_container *sc = NULL;
1549 struct o2nm_node *node = NULL, *mynode = NULL;
1550 struct socket *sock = NULL;
1551 struct sockaddr_in myaddr = {0, }, remoteaddr = {0, };
1552 int ret = 0, stop;
1553 unsigned int timeout;
1554 unsigned int nofs_flag;
1555
1556
1557
1558
1559
1560 nofs_flag = memalloc_nofs_save();
1561
1562 if (o2nm_this_node() <= o2net_num_from_nn(nn))
1563 goto out;
1564
1565
1566 node = o2nm_get_node_by_num(o2net_num_from_nn(nn));
1567 if (node == NULL)
1568 goto out;
1569
1570 mynode = o2nm_get_node_by_num(o2nm_this_node());
1571 if (mynode == NULL)
1572 goto out;
1573
1574 spin_lock(&nn->nn_lock);
1575
1576
1577
1578
1579
1580
1581
1582 timeout = atomic_read(&nn->nn_timeout);
1583 stop = (nn->nn_sc ||
1584 (nn->nn_persistent_error &&
1585 (nn->nn_persistent_error != -ENOTCONN || timeout == 0)));
1586 spin_unlock(&nn->nn_lock);
1587 if (stop)
1588 goto out;
1589
1590 nn->nn_last_connect_attempt = jiffies;
1591
1592 sc = sc_alloc(node);
1593 if (sc == NULL) {
1594 mlog(0, "couldn't allocate sc\n");
1595 ret = -ENOMEM;
1596 goto out;
1597 }
1598
1599 ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
1600 if (ret < 0) {
1601 mlog(0, "can't create socket: %d\n", ret);
1602 goto out;
1603 }
1604 sc->sc_sock = sock;
1605
1606 sock->sk->sk_allocation = GFP_ATOMIC;
1607
1608 myaddr.sin_family = AF_INET;
1609 myaddr.sin_addr.s_addr = mynode->nd_ipv4_address;
1610 myaddr.sin_port = htons(0);
1611
1612 ret = sock->ops->bind(sock, (struct sockaddr *)&myaddr,
1613 sizeof(myaddr));
1614 if (ret) {
1615 mlog(ML_ERROR, "bind failed with %d at address %pI4\n",
1616 ret, &mynode->nd_ipv4_address);
1617 goto out;
1618 }
1619
1620 tcp_sock_set_nodelay(sc->sc_sock->sk);
1621 tcp_sock_set_user_timeout(sock->sk, O2NET_TCP_USER_TIMEOUT);
1622
1623 o2net_register_callbacks(sc->sc_sock->sk, sc);
1624
1625 spin_lock(&nn->nn_lock);
1626
1627 o2net_set_nn_state(nn, sc, 0, 0);
1628 spin_unlock(&nn->nn_lock);
1629
1630 remoteaddr.sin_family = AF_INET;
1631 remoteaddr.sin_addr.s_addr = node->nd_ipv4_address;
1632 remoteaddr.sin_port = node->nd_ipv4_port;
1633
1634 ret = sc->sc_sock->ops->connect(sc->sc_sock,
1635 (struct sockaddr *)&remoteaddr,
1636 sizeof(remoteaddr),
1637 O_NONBLOCK);
1638 if (ret == -EINPROGRESS)
1639 ret = 0;
1640
1641 out:
1642 if (ret && sc) {
1643 printk(KERN_NOTICE "o2net: Connect attempt to " SC_NODEF_FMT
1644 " failed with errno %d\n", SC_NODEF_ARGS(sc), ret);
1645
1646
1647 o2net_ensure_shutdown(nn, sc, 0);
1648 }
1649 if (sc)
1650 sc_put(sc);
1651 if (node)
1652 o2nm_node_put(node);
1653 if (mynode)
1654 o2nm_node_put(mynode);
1655
1656 memalloc_nofs_restore(nofs_flag);
1657 return;
1658 }
1659
1660 static void o2net_connect_expired(struct work_struct *work)
1661 {
1662 struct o2net_node *nn =
1663 container_of(work, struct o2net_node, nn_connect_expired.work);
1664
1665 spin_lock(&nn->nn_lock);
1666 if (!nn->nn_sc_valid) {
1667 printk(KERN_NOTICE "o2net: No connection established with "
1668 "node %u after %u.%u seconds, check network and"
1669 " cluster configuration.\n",
1670 o2net_num_from_nn(nn),
1671 o2net_idle_timeout() / 1000,
1672 o2net_idle_timeout() % 1000);
1673
1674 o2net_set_nn_state(nn, NULL, 0, 0);
1675 }
1676 spin_unlock(&nn->nn_lock);
1677 }
1678
1679 static void o2net_still_up(struct work_struct *work)
1680 {
1681 struct o2net_node *nn =
1682 container_of(work, struct o2net_node, nn_still_up.work);
1683
1684 o2quo_hb_still_up(o2net_num_from_nn(nn));
1685 }
1686
1687
1688
1689 void o2net_disconnect_node(struct o2nm_node *node)
1690 {
1691 struct o2net_node *nn = o2net_nn_from_num(node->nd_num);
1692
1693
1694 spin_lock(&nn->nn_lock);
1695 atomic_set(&nn->nn_timeout, 0);
1696 o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
1697 spin_unlock(&nn->nn_lock);
1698
1699 if (o2net_wq) {
1700 cancel_delayed_work(&nn->nn_connect_expired);
1701 cancel_delayed_work(&nn->nn_connect_work);
1702 cancel_delayed_work(&nn->nn_still_up);
1703 flush_workqueue(o2net_wq);
1704 }
1705 }
1706
1707 static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
1708 void *data)
1709 {
1710 o2quo_hb_down(node_num);
1711
1712 if (!node)
1713 return;
1714
1715 if (node_num != o2nm_this_node())
1716 o2net_disconnect_node(node);
1717
1718 BUG_ON(atomic_read(&o2net_connected_peers) < 0);
1719 }
1720
1721 static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
1722 void *data)
1723 {
1724 struct o2net_node *nn = o2net_nn_from_num(node_num);
1725
1726 o2quo_hb_up(node_num);
1727
1728 BUG_ON(!node);
1729
1730
1731 nn->nn_last_connect_attempt = jiffies -
1732 (msecs_to_jiffies(o2net_reconnect_delay()) + 1);
1733
1734 if (node_num != o2nm_this_node()) {
1735
1736
1737
1738
1739 spin_lock(&nn->nn_lock);
1740 atomic_set(&nn->nn_timeout, 0);
1741 if (nn->nn_persistent_error)
1742 o2net_set_nn_state(nn, NULL, 0, 0);
1743 spin_unlock(&nn->nn_lock);
1744 }
1745 }
1746
1747 void o2net_unregister_hb_callbacks(void)
1748 {
1749 o2hb_unregister_callback(NULL, &o2net_hb_up);
1750 o2hb_unregister_callback(NULL, &o2net_hb_down);
1751 }
1752
1753 int o2net_register_hb_callbacks(void)
1754 {
1755 int ret;
1756
1757 o2hb_setup_callback(&o2net_hb_down, O2HB_NODE_DOWN_CB,
1758 o2net_hb_node_down_cb, NULL, O2NET_HB_PRI);
1759 o2hb_setup_callback(&o2net_hb_up, O2HB_NODE_UP_CB,
1760 o2net_hb_node_up_cb, NULL, O2NET_HB_PRI);
1761
1762 ret = o2hb_register_callback(NULL, &o2net_hb_up);
1763 if (ret == 0)
1764 ret = o2hb_register_callback(NULL, &o2net_hb_down);
1765
1766 if (ret)
1767 o2net_unregister_hb_callbacks();
1768
1769 return ret;
1770 }
1771
1772
1773
1774 static int o2net_accept_one(struct socket *sock, int *more)
1775 {
1776 int ret;
1777 struct sockaddr_in sin;
1778 struct socket *new_sock = NULL;
1779 struct o2nm_node *node = NULL;
1780 struct o2nm_node *local_node = NULL;
1781 struct o2net_sock_container *sc = NULL;
1782 struct o2net_node *nn;
1783 unsigned int nofs_flag;
1784
1785
1786
1787
1788
1789 nofs_flag = memalloc_nofs_save();
1790
1791 BUG_ON(sock == NULL);
1792 *more = 0;
1793 ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,
1794 sock->sk->sk_protocol, &new_sock);
1795 if (ret)
1796 goto out;
1797
1798 new_sock->type = sock->type;
1799 new_sock->ops = sock->ops;
1800 ret = sock->ops->accept(sock, new_sock, O_NONBLOCK, false);
1801 if (ret < 0)
1802 goto out;
1803
1804 *more = 1;
1805 new_sock->sk->sk_allocation = GFP_ATOMIC;
1806
1807 tcp_sock_set_nodelay(new_sock->sk);
1808 tcp_sock_set_user_timeout(new_sock->sk, O2NET_TCP_USER_TIMEOUT);
1809
1810 ret = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin, 1);
1811 if (ret < 0)
1812 goto out;
1813
1814 node = o2nm_get_node_by_ip(sin.sin_addr.s_addr);
1815 if (node == NULL) {
1816 printk(KERN_NOTICE "o2net: Attempt to connect from unknown "
1817 "node at %pI4:%d\n", &sin.sin_addr.s_addr,
1818 ntohs(sin.sin_port));
1819 ret = -EINVAL;
1820 goto out;
1821 }
1822
1823 if (o2nm_this_node() >= node->nd_num) {
1824 local_node = o2nm_get_node_by_num(o2nm_this_node());
1825 if (local_node)
1826 printk(KERN_NOTICE "o2net: Unexpected connect attempt "
1827 "seen at node '%s' (%u, %pI4:%d) from "
1828 "node '%s' (%u, %pI4:%d)\n",
1829 local_node->nd_name, local_node->nd_num,
1830 &(local_node->nd_ipv4_address),
1831 ntohs(local_node->nd_ipv4_port),
1832 node->nd_name,
1833 node->nd_num, &sin.sin_addr.s_addr,
1834 ntohs(sin.sin_port));
1835 ret = -EINVAL;
1836 goto out;
1837 }
1838
1839
1840
1841 if (!o2hb_check_node_heartbeating_from_callback(node->nd_num)) {
1842 mlog(ML_CONN, "attempt to connect from node '%s' at "
1843 "%pI4:%d but it isn't heartbeating\n",
1844 node->nd_name, &sin.sin_addr.s_addr,
1845 ntohs(sin.sin_port));
1846 ret = -EINVAL;
1847 goto out;
1848 }
1849
1850 nn = o2net_nn_from_num(node->nd_num);
1851
1852 spin_lock(&nn->nn_lock);
1853 if (nn->nn_sc)
1854 ret = -EBUSY;
1855 else
1856 ret = 0;
1857 spin_unlock(&nn->nn_lock);
1858 if (ret) {
1859 printk(KERN_NOTICE "o2net: Attempt to connect from node '%s' "
1860 "at %pI4:%d but it already has an open connection\n",
1861 node->nd_name, &sin.sin_addr.s_addr,
1862 ntohs(sin.sin_port));
1863 goto out;
1864 }
1865
1866 sc = sc_alloc(node);
1867 if (sc == NULL) {
1868 ret = -ENOMEM;
1869 goto out;
1870 }
1871
1872 sc->sc_sock = new_sock;
1873 new_sock = NULL;
1874
1875 spin_lock(&nn->nn_lock);
1876 atomic_set(&nn->nn_timeout, 0);
1877 o2net_set_nn_state(nn, sc, 0, 0);
1878 spin_unlock(&nn->nn_lock);
1879
1880 o2net_register_callbacks(sc->sc_sock->sk, sc);
1881 o2net_sc_queue_work(sc, &sc->sc_rx_work);
1882
1883 o2net_initialize_handshake();
1884 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
1885
1886 out:
1887 if (new_sock)
1888 sock_release(new_sock);
1889 if (node)
1890 o2nm_node_put(node);
1891 if (local_node)
1892 o2nm_node_put(local_node);
1893 if (sc)
1894 sc_put(sc);
1895
1896 memalloc_nofs_restore(nofs_flag);
1897 return ret;
1898 }
1899
1900
1901
1902
1903
1904
1905
1906 static void o2net_accept_many(struct work_struct *work)
1907 {
1908 struct socket *sock = o2net_listen_sock;
1909 int more;
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923 for (;;) {
1924 o2net_accept_one(sock, &more);
1925 if (!more)
1926 break;
1927 cond_resched();
1928 }
1929 }
1930
1931 static void o2net_listen_data_ready(struct sock *sk)
1932 {
1933 void (*ready)(struct sock *sk);
1934
1935 read_lock_bh(&sk->sk_callback_lock);
1936 ready = sk->sk_user_data;
1937 if (ready == NULL) {
1938 ready = sk->sk_data_ready;
1939 goto out;
1940 }
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955 if (sk->sk_state == TCP_LISTEN) {
1956 queue_work(o2net_wq, &o2net_listen_work);
1957 } else {
1958 ready = NULL;
1959 }
1960
1961 out:
1962 read_unlock_bh(&sk->sk_callback_lock);
1963 if (ready != NULL)
1964 ready(sk);
1965 }
1966
1967 static int o2net_open_listening_sock(__be32 addr, __be16 port)
1968 {
1969 struct socket *sock = NULL;
1970 int ret;
1971 struct sockaddr_in sin = {
1972 .sin_family = PF_INET,
1973 .sin_addr = { .s_addr = addr },
1974 .sin_port = port,
1975 };
1976
1977 ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
1978 if (ret < 0) {
1979 printk(KERN_ERR "o2net: Error %d while creating socket\n", ret);
1980 goto out;
1981 }
1982
1983 sock->sk->sk_allocation = GFP_ATOMIC;
1984
1985 write_lock_bh(&sock->sk->sk_callback_lock);
1986 sock->sk->sk_user_data = sock->sk->sk_data_ready;
1987 sock->sk->sk_data_ready = o2net_listen_data_ready;
1988 write_unlock_bh(&sock->sk->sk_callback_lock);
1989
1990 o2net_listen_sock = sock;
1991 INIT_WORK(&o2net_listen_work, o2net_accept_many);
1992
1993 sock->sk->sk_reuse = SK_CAN_REUSE;
1994 ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
1995 if (ret < 0) {
1996 printk(KERN_ERR "o2net: Error %d while binding socket at "
1997 "%pI4:%u\n", ret, &addr, ntohs(port));
1998 goto out;
1999 }
2000
2001 ret = sock->ops->listen(sock, 64);
2002 if (ret < 0)
2003 printk(KERN_ERR "o2net: Error %d while listening on %pI4:%u\n",
2004 ret, &addr, ntohs(port));
2005
2006 out:
2007 if (ret) {
2008 o2net_listen_sock = NULL;
2009 if (sock)
2010 sock_release(sock);
2011 }
2012 return ret;
2013 }
2014
2015
2016
2017
2018
2019
2020
2021
2022 int o2net_start_listening(struct o2nm_node *node)
2023 {
2024 int ret = 0;
2025
2026 BUG_ON(o2net_wq != NULL);
2027 BUG_ON(o2net_listen_sock != NULL);
2028
2029 mlog(ML_KTHREAD, "starting o2net thread...\n");
2030 o2net_wq = alloc_ordered_workqueue("o2net", WQ_MEM_RECLAIM);
2031 if (o2net_wq == NULL) {
2032 mlog(ML_ERROR, "unable to launch o2net thread\n");
2033 return -ENOMEM;
2034 }
2035
2036 ret = o2net_open_listening_sock(node->nd_ipv4_address,
2037 node->nd_ipv4_port);
2038 if (ret) {
2039 destroy_workqueue(o2net_wq);
2040 o2net_wq = NULL;
2041 } else
2042 o2quo_conn_up(node->nd_num);
2043
2044 return ret;
2045 }
2046
2047
2048
2049 void o2net_stop_listening(struct o2nm_node *node)
2050 {
2051 struct socket *sock = o2net_listen_sock;
2052 size_t i;
2053
2054 BUG_ON(o2net_wq == NULL);
2055 BUG_ON(o2net_listen_sock == NULL);
2056
2057
2058 write_lock_bh(&sock->sk->sk_callback_lock);
2059 sock->sk->sk_data_ready = sock->sk->sk_user_data;
2060 sock->sk->sk_user_data = NULL;
2061 write_unlock_bh(&sock->sk->sk_callback_lock);
2062
2063 for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) {
2064 struct o2nm_node *node = o2nm_get_node_by_num(i);
2065 if (node) {
2066 o2net_disconnect_node(node);
2067 o2nm_node_put(node);
2068 }
2069 }
2070
2071
2072 mlog(ML_KTHREAD, "waiting for o2net thread to exit....\n");
2073 destroy_workqueue(o2net_wq);
2074 o2net_wq = NULL;
2075
2076 sock_release(o2net_listen_sock);
2077 o2net_listen_sock = NULL;
2078
2079 o2quo_conn_err(node->nd_num);
2080 }
2081
2082
2083
2084 int o2net_init(void)
2085 {
2086 unsigned long i;
2087
2088 o2quo_init();
2089
2090 o2net_debugfs_init();
2091
2092 o2net_hand = kzalloc(sizeof(struct o2net_handshake), GFP_KERNEL);
2093 o2net_keep_req = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL);
2094 o2net_keep_resp = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL);
2095 if (!o2net_hand || !o2net_keep_req || !o2net_keep_resp)
2096 goto out;
2097
2098 o2net_hand->protocol_version = cpu_to_be64(O2NET_PROTOCOL_VERSION);
2099 o2net_hand->connector_id = cpu_to_be64(1);
2100
2101 o2net_keep_req->magic = cpu_to_be16(O2NET_MSG_KEEP_REQ_MAGIC);
2102 o2net_keep_resp->magic = cpu_to_be16(O2NET_MSG_KEEP_RESP_MAGIC);
2103
2104 for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) {
2105 struct o2net_node *nn = o2net_nn_from_num(i);
2106
2107 atomic_set(&nn->nn_timeout, 0);
2108 spin_lock_init(&nn->nn_lock);
2109 INIT_DELAYED_WORK(&nn->nn_connect_work, o2net_start_connect);
2110 INIT_DELAYED_WORK(&nn->nn_connect_expired,
2111 o2net_connect_expired);
2112 INIT_DELAYED_WORK(&nn->nn_still_up, o2net_still_up);
2113
2114 nn->nn_persistent_error = -ENOTCONN;
2115 init_waitqueue_head(&nn->nn_sc_wq);
2116 idr_init(&nn->nn_status_idr);
2117 INIT_LIST_HEAD(&nn->nn_status_list);
2118 }
2119
2120 return 0;
2121
2122 out:
2123 kfree(o2net_hand);
2124 kfree(o2net_keep_req);
2125 kfree(o2net_keep_resp);
2126 o2net_debugfs_exit();
2127 o2quo_exit();
2128 return -ENOMEM;
2129 }
2130
2131 void o2net_exit(void)
2132 {
2133 o2quo_exit();
2134 kfree(o2net_hand);
2135 kfree(o2net_keep_req);
2136 kfree(o2net_keep_resp);
2137 o2net_debugfs_exit();
2138 }