0001
0002 #include <linux/ceph/ceph_debug.h>
0003
0004 #include <linux/crc32c.h>
0005 #include <linux/ctype.h>
0006 #include <linux/highmem.h>
0007 #include <linux/inet.h>
0008 #include <linux/kthread.h>
0009 #include <linux/net.h>
0010 #include <linux/nsproxy.h>
0011 #include <linux/sched/mm.h>
0012 #include <linux/slab.h>
0013 #include <linux/socket.h>
0014 #include <linux/string.h>
0015 #ifdef CONFIG_BLOCK
0016 #include <linux/bio.h>
0017 #endif
0018 #include <linux/dns_resolver.h>
0019 #include <net/tcp.h>
0020
0021 #include <linux/ceph/ceph_features.h>
0022 #include <linux/ceph/libceph.h>
0023 #include <linux/ceph/messenger.h>
0024 #include <linux/ceph/decode.h>
0025 #include <linux/ceph/pagelist.h>
0026 #include <linux/export.h>
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079 #define CON_SOCK_STATE_NEW 0
0080 #define CON_SOCK_STATE_CLOSED 1
0081 #define CON_SOCK_STATE_CONNECTING 2
0082 #define CON_SOCK_STATE_CONNECTED 3
0083 #define CON_SOCK_STATE_CLOSING 4
0084
0085 static bool con_flag_valid(unsigned long con_flag)
0086 {
0087 switch (con_flag) {
0088 case CEPH_CON_F_LOSSYTX:
0089 case CEPH_CON_F_KEEPALIVE_PENDING:
0090 case CEPH_CON_F_WRITE_PENDING:
0091 case CEPH_CON_F_SOCK_CLOSED:
0092 case CEPH_CON_F_BACKOFF:
0093 return true;
0094 default:
0095 return false;
0096 }
0097 }
0098
0099 void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
0100 {
0101 BUG_ON(!con_flag_valid(con_flag));
0102
0103 clear_bit(con_flag, &con->flags);
0104 }
0105
0106 void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag)
0107 {
0108 BUG_ON(!con_flag_valid(con_flag));
0109
0110 set_bit(con_flag, &con->flags);
0111 }
0112
0113 bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag)
0114 {
0115 BUG_ON(!con_flag_valid(con_flag));
0116
0117 return test_bit(con_flag, &con->flags);
0118 }
0119
0120 bool ceph_con_flag_test_and_clear(struct ceph_connection *con,
0121 unsigned long con_flag)
0122 {
0123 BUG_ON(!con_flag_valid(con_flag));
0124
0125 return test_and_clear_bit(con_flag, &con->flags);
0126 }
0127
0128 bool ceph_con_flag_test_and_set(struct ceph_connection *con,
0129 unsigned long con_flag)
0130 {
0131 BUG_ON(!con_flag_valid(con_flag));
0132
0133 return test_and_set_bit(con_flag, &con->flags);
0134 }
0135
0136
0137
0138 static struct kmem_cache *ceph_msg_cache;
0139
0140 #ifdef CONFIG_LOCKDEP
0141 static struct lock_class_key socket_class;
0142 #endif
0143
0144 static void queue_con(struct ceph_connection *con);
0145 static void cancel_con(struct ceph_connection *con);
0146 static void ceph_con_workfn(struct work_struct *);
0147 static void con_fault(struct ceph_connection *con);
0148
0149
0150
0151
0152
0153 #define ADDR_STR_COUNT_LOG 5
0154 #define ADDR_STR_COUNT (1 << ADDR_STR_COUNT_LOG)
0155 #define ADDR_STR_COUNT_MASK (ADDR_STR_COUNT - 1)
0156 #define MAX_ADDR_STR_LEN 64
0157
0158 static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN];
0159 static atomic_t addr_str_seq = ATOMIC_INIT(0);
0160
0161 struct page *ceph_zero_page;
0162
0163 const char *ceph_pr_addr(const struct ceph_entity_addr *addr)
0164 {
0165 int i;
0166 char *s;
0167 struct sockaddr_storage ss = addr->in_addr;
0168 struct sockaddr_in *in4 = (struct sockaddr_in *)&ss;
0169 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)&ss;
0170
0171 i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK;
0172 s = addr_str[i];
0173
0174 switch (ss.ss_family) {
0175 case AF_INET:
0176 snprintf(s, MAX_ADDR_STR_LEN, "(%d)%pI4:%hu",
0177 le32_to_cpu(addr->type), &in4->sin_addr,
0178 ntohs(in4->sin_port));
0179 break;
0180
0181 case AF_INET6:
0182 snprintf(s, MAX_ADDR_STR_LEN, "(%d)[%pI6c]:%hu",
0183 le32_to_cpu(addr->type), &in6->sin6_addr,
0184 ntohs(in6->sin6_port));
0185 break;
0186
0187 default:
0188 snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)",
0189 ss.ss_family);
0190 }
0191
0192 return s;
0193 }
0194 EXPORT_SYMBOL(ceph_pr_addr);
0195
0196 void ceph_encode_my_addr(struct ceph_messenger *msgr)
0197 {
0198 if (!ceph_msgr2(from_msgr(msgr))) {
0199 memcpy(&msgr->my_enc_addr, &msgr->inst.addr,
0200 sizeof(msgr->my_enc_addr));
0201 ceph_encode_banner_addr(&msgr->my_enc_addr);
0202 }
0203 }
0204
0205
0206
0207
0208 static struct workqueue_struct *ceph_msgr_wq;
0209
0210 static int ceph_msgr_slab_init(void)
0211 {
0212 BUG_ON(ceph_msg_cache);
0213 ceph_msg_cache = KMEM_CACHE(ceph_msg, 0);
0214 if (!ceph_msg_cache)
0215 return -ENOMEM;
0216
0217 return 0;
0218 }
0219
0220 static void ceph_msgr_slab_exit(void)
0221 {
0222 BUG_ON(!ceph_msg_cache);
0223 kmem_cache_destroy(ceph_msg_cache);
0224 ceph_msg_cache = NULL;
0225 }
0226
0227 static void _ceph_msgr_exit(void)
0228 {
0229 if (ceph_msgr_wq) {
0230 destroy_workqueue(ceph_msgr_wq);
0231 ceph_msgr_wq = NULL;
0232 }
0233
0234 BUG_ON(!ceph_zero_page);
0235 put_page(ceph_zero_page);
0236 ceph_zero_page = NULL;
0237
0238 ceph_msgr_slab_exit();
0239 }
0240
0241 int __init ceph_msgr_init(void)
0242 {
0243 if (ceph_msgr_slab_init())
0244 return -ENOMEM;
0245
0246 BUG_ON(ceph_zero_page);
0247 ceph_zero_page = ZERO_PAGE(0);
0248 get_page(ceph_zero_page);
0249
0250
0251
0252
0253
0254 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_MEM_RECLAIM, 0);
0255 if (ceph_msgr_wq)
0256 return 0;
0257
0258 pr_err("msgr_init failed to create workqueue\n");
0259 _ceph_msgr_exit();
0260
0261 return -ENOMEM;
0262 }
0263
0264 void ceph_msgr_exit(void)
0265 {
0266 BUG_ON(ceph_msgr_wq == NULL);
0267
0268 _ceph_msgr_exit();
0269 }
0270
0271 void ceph_msgr_flush(void)
0272 {
0273 flush_workqueue(ceph_msgr_wq);
0274 }
0275 EXPORT_SYMBOL(ceph_msgr_flush);
0276
0277
0278
0279 static void con_sock_state_init(struct ceph_connection *con)
0280 {
0281 int old_state;
0282
0283 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
0284 if (WARN_ON(old_state != CON_SOCK_STATE_NEW))
0285 printk("%s: unexpected old state %d\n", __func__, old_state);
0286 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
0287 CON_SOCK_STATE_CLOSED);
0288 }
0289
0290 static void con_sock_state_connecting(struct ceph_connection *con)
0291 {
0292 int old_state;
0293
0294 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING);
0295 if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED))
0296 printk("%s: unexpected old state %d\n", __func__, old_state);
0297 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
0298 CON_SOCK_STATE_CONNECTING);
0299 }
0300
0301 static void con_sock_state_connected(struct ceph_connection *con)
0302 {
0303 int old_state;
0304
0305 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED);
0306 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING))
0307 printk("%s: unexpected old state %d\n", __func__, old_state);
0308 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
0309 CON_SOCK_STATE_CONNECTED);
0310 }
0311
0312 static void con_sock_state_closing(struct ceph_connection *con)
0313 {
0314 int old_state;
0315
0316 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING);
0317 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING &&
0318 old_state != CON_SOCK_STATE_CONNECTED &&
0319 old_state != CON_SOCK_STATE_CLOSING))
0320 printk("%s: unexpected old state %d\n", __func__, old_state);
0321 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
0322 CON_SOCK_STATE_CLOSING);
0323 }
0324
0325 static void con_sock_state_closed(struct ceph_connection *con)
0326 {
0327 int old_state;
0328
0329 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
0330 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED &&
0331 old_state != CON_SOCK_STATE_CLOSING &&
0332 old_state != CON_SOCK_STATE_CONNECTING &&
0333 old_state != CON_SOCK_STATE_CLOSED))
0334 printk("%s: unexpected old state %d\n", __func__, old_state);
0335 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
0336 CON_SOCK_STATE_CLOSED);
0337 }
0338
0339
0340
0341
0342
0343
0344 static void ceph_sock_data_ready(struct sock *sk)
0345 {
0346 struct ceph_connection *con = sk->sk_user_data;
0347 if (atomic_read(&con->msgr->stopping)) {
0348 return;
0349 }
0350
0351 if (sk->sk_state != TCP_CLOSE_WAIT) {
0352 dout("%s %p state = %d, queueing work\n", __func__,
0353 con, con->state);
0354 queue_con(con);
0355 }
0356 }
0357
0358
0359 static void ceph_sock_write_space(struct sock *sk)
0360 {
0361 struct ceph_connection *con = sk->sk_user_data;
0362
0363
0364
0365
0366
0367
0368
0369
0370 if (ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) {
0371 if (sk_stream_is_writeable(sk)) {
0372 dout("%s %p queueing write work\n", __func__, con);
0373 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
0374 queue_con(con);
0375 }
0376 } else {
0377 dout("%s %p nothing to write\n", __func__, con);
0378 }
0379 }
0380
0381
0382 static void ceph_sock_state_change(struct sock *sk)
0383 {
0384 struct ceph_connection *con = sk->sk_user_data;
0385
0386 dout("%s %p state = %d sk_state = %u\n", __func__,
0387 con, con->state, sk->sk_state);
0388
0389 switch (sk->sk_state) {
0390 case TCP_CLOSE:
0391 dout("%s TCP_CLOSE\n", __func__);
0392 fallthrough;
0393 case TCP_CLOSE_WAIT:
0394 dout("%s TCP_CLOSE_WAIT\n", __func__);
0395 con_sock_state_closing(con);
0396 ceph_con_flag_set(con, CEPH_CON_F_SOCK_CLOSED);
0397 queue_con(con);
0398 break;
0399 case TCP_ESTABLISHED:
0400 dout("%s TCP_ESTABLISHED\n", __func__);
0401 con_sock_state_connected(con);
0402 queue_con(con);
0403 break;
0404 default:
0405 break;
0406 }
0407 }
0408
0409
0410
0411
0412 static void set_sock_callbacks(struct socket *sock,
0413 struct ceph_connection *con)
0414 {
0415 struct sock *sk = sock->sk;
0416 sk->sk_user_data = con;
0417 sk->sk_data_ready = ceph_sock_data_ready;
0418 sk->sk_write_space = ceph_sock_write_space;
0419 sk->sk_state_change = ceph_sock_state_change;
0420 }
0421
0422
0423
0424
0425
0426
0427
0428
0429
0430 int ceph_tcp_connect(struct ceph_connection *con)
0431 {
0432 struct sockaddr_storage ss = con->peer_addr.in_addr;
0433 struct socket *sock;
0434 unsigned int noio_flag;
0435 int ret;
0436
0437 dout("%s con %p peer_addr %s\n", __func__, con,
0438 ceph_pr_addr(&con->peer_addr));
0439 BUG_ON(con->sock);
0440
0441
0442 noio_flag = memalloc_noio_save();
0443 ret = sock_create_kern(read_pnet(&con->msgr->net), ss.ss_family,
0444 SOCK_STREAM, IPPROTO_TCP, &sock);
0445 memalloc_noio_restore(noio_flag);
0446 if (ret)
0447 return ret;
0448 sock->sk->sk_allocation = GFP_NOFS;
0449
0450 #ifdef CONFIG_LOCKDEP
0451 lockdep_set_class(&sock->sk->sk_lock, &socket_class);
0452 #endif
0453
0454 set_sock_callbacks(sock, con);
0455
0456 con_sock_state_connecting(con);
0457 ret = sock->ops->connect(sock, (struct sockaddr *)&ss, sizeof(ss),
0458 O_NONBLOCK);
0459 if (ret == -EINPROGRESS) {
0460 dout("connect %s EINPROGRESS sk_state = %u\n",
0461 ceph_pr_addr(&con->peer_addr),
0462 sock->sk->sk_state);
0463 } else if (ret < 0) {
0464 pr_err("connect %s error %d\n",
0465 ceph_pr_addr(&con->peer_addr), ret);
0466 sock_release(sock);
0467 return ret;
0468 }
0469
0470 if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY))
0471 tcp_sock_set_nodelay(sock->sk);
0472
0473 con->sock = sock;
0474 return 0;
0475 }
0476
0477
0478
0479
0480 int ceph_con_close_socket(struct ceph_connection *con)
0481 {
0482 int rc = 0;
0483
0484 dout("%s con %p sock %p\n", __func__, con, con->sock);
0485 if (con->sock) {
0486 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
0487 sock_release(con->sock);
0488 con->sock = NULL;
0489 }
0490
0491
0492
0493
0494
0495
0496
0497 ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED);
0498
0499 con_sock_state_closed(con);
0500 return rc;
0501 }
0502
0503 static void ceph_con_reset_protocol(struct ceph_connection *con)
0504 {
0505 dout("%s con %p\n", __func__, con);
0506
0507 ceph_con_close_socket(con);
0508 if (con->in_msg) {
0509 WARN_ON(con->in_msg->con != con);
0510 ceph_msg_put(con->in_msg);
0511 con->in_msg = NULL;
0512 }
0513 if (con->out_msg) {
0514 WARN_ON(con->out_msg->con != con);
0515 ceph_msg_put(con->out_msg);
0516 con->out_msg = NULL;
0517 }
0518 if (con->bounce_page) {
0519 __free_page(con->bounce_page);
0520 con->bounce_page = NULL;
0521 }
0522
0523 if (ceph_msgr2(from_msgr(con->msgr)))
0524 ceph_con_v2_reset_protocol(con);
0525 else
0526 ceph_con_v1_reset_protocol(con);
0527 }
0528
0529
0530
0531
0532
0533 static void ceph_msg_remove(struct ceph_msg *msg)
0534 {
0535 list_del_init(&msg->list_head);
0536
0537 ceph_msg_put(msg);
0538 }
0539
0540 static void ceph_msg_remove_list(struct list_head *head)
0541 {
0542 while (!list_empty(head)) {
0543 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
0544 list_head);
0545 ceph_msg_remove(msg);
0546 }
0547 }
0548
0549 void ceph_con_reset_session(struct ceph_connection *con)
0550 {
0551 dout("%s con %p\n", __func__, con);
0552
0553 WARN_ON(con->in_msg);
0554 WARN_ON(con->out_msg);
0555 ceph_msg_remove_list(&con->out_queue);
0556 ceph_msg_remove_list(&con->out_sent);
0557 con->out_seq = 0;
0558 con->in_seq = 0;
0559 con->in_seq_acked = 0;
0560
0561 if (ceph_msgr2(from_msgr(con->msgr)))
0562 ceph_con_v2_reset_session(con);
0563 else
0564 ceph_con_v1_reset_session(con);
0565 }
0566
0567
0568
0569
0570 void ceph_con_close(struct ceph_connection *con)
0571 {
0572 mutex_lock(&con->mutex);
0573 dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr));
0574 con->state = CEPH_CON_S_CLOSED;
0575
0576 ceph_con_flag_clear(con, CEPH_CON_F_LOSSYTX);
0577
0578 ceph_con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING);
0579 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
0580 ceph_con_flag_clear(con, CEPH_CON_F_BACKOFF);
0581
0582 ceph_con_reset_protocol(con);
0583 ceph_con_reset_session(con);
0584 cancel_con(con);
0585 mutex_unlock(&con->mutex);
0586 }
0587 EXPORT_SYMBOL(ceph_con_close);
0588
0589
0590
0591
0592 void ceph_con_open(struct ceph_connection *con,
0593 __u8 entity_type, __u64 entity_num,
0594 struct ceph_entity_addr *addr)
0595 {
0596 mutex_lock(&con->mutex);
0597 dout("con_open %p %s\n", con, ceph_pr_addr(addr));
0598
0599 WARN_ON(con->state != CEPH_CON_S_CLOSED);
0600 con->state = CEPH_CON_S_PREOPEN;
0601
0602 con->peer_name.type = (__u8) entity_type;
0603 con->peer_name.num = cpu_to_le64(entity_num);
0604
0605 memcpy(&con->peer_addr, addr, sizeof(*addr));
0606 con->delay = 0;
0607 mutex_unlock(&con->mutex);
0608 queue_con(con);
0609 }
0610 EXPORT_SYMBOL(ceph_con_open);
0611
0612
0613
0614
0615 bool ceph_con_opened(struct ceph_connection *con)
0616 {
0617 if (ceph_msgr2(from_msgr(con->msgr)))
0618 return ceph_con_v2_opened(con);
0619
0620 return ceph_con_v1_opened(con);
0621 }
0622
0623
0624
0625
0626 void ceph_con_init(struct ceph_connection *con, void *private,
0627 const struct ceph_connection_operations *ops,
0628 struct ceph_messenger *msgr)
0629 {
0630 dout("con_init %p\n", con);
0631 memset(con, 0, sizeof(*con));
0632 con->private = private;
0633 con->ops = ops;
0634 con->msgr = msgr;
0635
0636 con_sock_state_init(con);
0637
0638 mutex_init(&con->mutex);
0639 INIT_LIST_HEAD(&con->out_queue);
0640 INIT_LIST_HEAD(&con->out_sent);
0641 INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
0642
0643 con->state = CEPH_CON_S_CLOSED;
0644 }
0645 EXPORT_SYMBOL(ceph_con_init);
0646
0647
0648
0649
0650
0651 u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt)
0652 {
0653 u32 ret;
0654
0655 spin_lock(&msgr->global_seq_lock);
0656 if (msgr->global_seq < gt)
0657 msgr->global_seq = gt;
0658 ret = ++msgr->global_seq;
0659 spin_unlock(&msgr->global_seq_lock);
0660 return ret;
0661 }
0662
0663
0664
0665
0666 void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq)
0667 {
0668 struct ceph_msg *msg;
0669 u64 seq;
0670
0671 dout("%s con %p ack_seq %llu\n", __func__, con, ack_seq);
0672 while (!list_empty(&con->out_sent)) {
0673 msg = list_first_entry(&con->out_sent, struct ceph_msg,
0674 list_head);
0675 WARN_ON(msg->needs_out_seq);
0676 seq = le64_to_cpu(msg->hdr.seq);
0677 if (seq > ack_seq)
0678 break;
0679
0680 dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
0681 msg, seq);
0682 ceph_msg_remove(msg);
0683 }
0684 }
0685
0686
0687
0688
0689
0690
0691 void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
0692 {
0693 struct ceph_msg *msg;
0694 u64 seq;
0695
0696 dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq);
0697 while (!list_empty(&con->out_queue)) {
0698 msg = list_first_entry(&con->out_queue, struct ceph_msg,
0699 list_head);
0700 if (msg->needs_out_seq)
0701 break;
0702 seq = le64_to_cpu(msg->hdr.seq);
0703 if (seq > reconnect_seq)
0704 break;
0705
0706 dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
0707 msg, seq);
0708 ceph_msg_remove(msg);
0709 }
0710 }
0711
0712 #ifdef CONFIG_BLOCK
0713
0714
0715
0716
0717
0718
0719 static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
0720 size_t length)
0721 {
0722 struct ceph_msg_data *data = cursor->data;
0723 struct ceph_bio_iter *it = &cursor->bio_iter;
0724
0725 cursor->resid = min_t(size_t, length, data->bio_length);
0726 *it = data->bio_pos;
0727 if (cursor->resid < it->iter.bi_size)
0728 it->iter.bi_size = cursor->resid;
0729
0730 BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
0731 cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
0732 }
0733
0734 static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
0735 size_t *page_offset,
0736 size_t *length)
0737 {
0738 struct bio_vec bv = bio_iter_iovec(cursor->bio_iter.bio,
0739 cursor->bio_iter.iter);
0740
0741 *page_offset = bv.bv_offset;
0742 *length = bv.bv_len;
0743 return bv.bv_page;
0744 }
0745
0746 static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
0747 size_t bytes)
0748 {
0749 struct ceph_bio_iter *it = &cursor->bio_iter;
0750 struct page *page = bio_iter_page(it->bio, it->iter);
0751
0752 BUG_ON(bytes > cursor->resid);
0753 BUG_ON(bytes > bio_iter_len(it->bio, it->iter));
0754 cursor->resid -= bytes;
0755 bio_advance_iter(it->bio, &it->iter, bytes);
0756
0757 if (!cursor->resid) {
0758 BUG_ON(!cursor->last_piece);
0759 return false;
0760 }
0761
0762 if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done &&
0763 page == bio_iter_page(it->bio, it->iter)))
0764 return false;
0765
0766 if (!it->iter.bi_size) {
0767 it->bio = it->bio->bi_next;
0768 it->iter = it->bio->bi_iter;
0769 if (cursor->resid < it->iter.bi_size)
0770 it->iter.bi_size = cursor->resid;
0771 }
0772
0773 BUG_ON(cursor->last_piece);
0774 BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
0775 cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
0776 return true;
0777 }
0778 #endif
0779
0780 static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor,
0781 size_t length)
0782 {
0783 struct ceph_msg_data *data = cursor->data;
0784 struct bio_vec *bvecs = data->bvec_pos.bvecs;
0785
0786 cursor->resid = min_t(size_t, length, data->bvec_pos.iter.bi_size);
0787 cursor->bvec_iter = data->bvec_pos.iter;
0788 cursor->bvec_iter.bi_size = cursor->resid;
0789
0790 BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
0791 cursor->last_piece =
0792 cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
0793 }
0794
0795 static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor,
0796 size_t *page_offset,
0797 size_t *length)
0798 {
0799 struct bio_vec bv = bvec_iter_bvec(cursor->data->bvec_pos.bvecs,
0800 cursor->bvec_iter);
0801
0802 *page_offset = bv.bv_offset;
0803 *length = bv.bv_len;
0804 return bv.bv_page;
0805 }
0806
0807 static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor,
0808 size_t bytes)
0809 {
0810 struct bio_vec *bvecs = cursor->data->bvec_pos.bvecs;
0811 struct page *page = bvec_iter_page(bvecs, cursor->bvec_iter);
0812
0813 BUG_ON(bytes > cursor->resid);
0814 BUG_ON(bytes > bvec_iter_len(bvecs, cursor->bvec_iter));
0815 cursor->resid -= bytes;
0816 bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes);
0817
0818 if (!cursor->resid) {
0819 BUG_ON(!cursor->last_piece);
0820 return false;
0821 }
0822
0823 if (!bytes || (cursor->bvec_iter.bi_bvec_done &&
0824 page == bvec_iter_page(bvecs, cursor->bvec_iter)))
0825 return false;
0826
0827 BUG_ON(cursor->last_piece);
0828 BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
0829 cursor->last_piece =
0830 cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
0831 return true;
0832 }
0833
0834
0835
0836
0837
0838 static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
0839 size_t length)
0840 {
0841 struct ceph_msg_data *data = cursor->data;
0842 int page_count;
0843
0844 BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
0845
0846 BUG_ON(!data->pages);
0847 BUG_ON(!data->length);
0848
0849 cursor->resid = min(length, data->length);
0850 page_count = calc_pages_for(data->alignment, (u64)data->length);
0851 cursor->page_offset = data->alignment & ~PAGE_MASK;
0852 cursor->page_index = 0;
0853 BUG_ON(page_count > (int)USHRT_MAX);
0854 cursor->page_count = (unsigned short)page_count;
0855 BUG_ON(length > SIZE_MAX - cursor->page_offset);
0856 cursor->last_piece = cursor->page_offset + cursor->resid <= PAGE_SIZE;
0857 }
0858
0859 static struct page *
0860 ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
0861 size_t *page_offset, size_t *length)
0862 {
0863 struct ceph_msg_data *data = cursor->data;
0864
0865 BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
0866
0867 BUG_ON(cursor->page_index >= cursor->page_count);
0868 BUG_ON(cursor->page_offset >= PAGE_SIZE);
0869
0870 *page_offset = cursor->page_offset;
0871 if (cursor->last_piece)
0872 *length = cursor->resid;
0873 else
0874 *length = PAGE_SIZE - *page_offset;
0875
0876 return data->pages[cursor->page_index];
0877 }
0878
0879 static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
0880 size_t bytes)
0881 {
0882 BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
0883
0884 BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
0885
0886
0887
0888 cursor->resid -= bytes;
0889 cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK;
0890 if (!bytes || cursor->page_offset)
0891 return false;
0892
0893 if (!cursor->resid)
0894 return false;
0895
0896
0897
0898 BUG_ON(cursor->page_index >= cursor->page_count);
0899 cursor->page_index++;
0900 cursor->last_piece = cursor->resid <= PAGE_SIZE;
0901
0902 return true;
0903 }
0904
0905
0906
0907
0908
0909 static void
0910 ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
0911 size_t length)
0912 {
0913 struct ceph_msg_data *data = cursor->data;
0914 struct ceph_pagelist *pagelist;
0915 struct page *page;
0916
0917 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
0918
0919 pagelist = data->pagelist;
0920 BUG_ON(!pagelist);
0921
0922 if (!length)
0923 return;
0924
0925 BUG_ON(list_empty(&pagelist->head));
0926 page = list_first_entry(&pagelist->head, struct page, lru);
0927
0928 cursor->resid = min(length, pagelist->length);
0929 cursor->page = page;
0930 cursor->offset = 0;
0931 cursor->last_piece = cursor->resid <= PAGE_SIZE;
0932 }
0933
0934 static struct page *
0935 ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
0936 size_t *page_offset, size_t *length)
0937 {
0938 struct ceph_msg_data *data = cursor->data;
0939 struct ceph_pagelist *pagelist;
0940
0941 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
0942
0943 pagelist = data->pagelist;
0944 BUG_ON(!pagelist);
0945
0946 BUG_ON(!cursor->page);
0947 BUG_ON(cursor->offset + cursor->resid != pagelist->length);
0948
0949
0950 *page_offset = cursor->offset & ~PAGE_MASK;
0951 if (cursor->last_piece)
0952 *length = cursor->resid;
0953 else
0954 *length = PAGE_SIZE - *page_offset;
0955
0956 return cursor->page;
0957 }
0958
0959 static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
0960 size_t bytes)
0961 {
0962 struct ceph_msg_data *data = cursor->data;
0963 struct ceph_pagelist *pagelist;
0964
0965 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
0966
0967 pagelist = data->pagelist;
0968 BUG_ON(!pagelist);
0969
0970 BUG_ON(cursor->offset + cursor->resid != pagelist->length);
0971 BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
0972
0973
0974
0975 cursor->resid -= bytes;
0976 cursor->offset += bytes;
0977
0978 if (!bytes || cursor->offset & ~PAGE_MASK)
0979 return false;
0980
0981 if (!cursor->resid)
0982 return false;
0983
0984
0985
0986 BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
0987 cursor->page = list_next_entry(cursor->page, lru);
0988 cursor->last_piece = cursor->resid <= PAGE_SIZE;
0989
0990 return true;
0991 }
0992
0993
0994
0995
0996
0997
0998
0999
1000
1001 static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
1002 {
1003 size_t length = cursor->total_resid;
1004
1005 switch (cursor->data->type) {
1006 case CEPH_MSG_DATA_PAGELIST:
1007 ceph_msg_data_pagelist_cursor_init(cursor, length);
1008 break;
1009 case CEPH_MSG_DATA_PAGES:
1010 ceph_msg_data_pages_cursor_init(cursor, length);
1011 break;
1012 #ifdef CONFIG_BLOCK
1013 case CEPH_MSG_DATA_BIO:
1014 ceph_msg_data_bio_cursor_init(cursor, length);
1015 break;
1016 #endif
1017 case CEPH_MSG_DATA_BVECS:
1018 ceph_msg_data_bvecs_cursor_init(cursor, length);
1019 break;
1020 case CEPH_MSG_DATA_NONE:
1021 default:
1022
1023 break;
1024 }
1025 cursor->need_crc = true;
1026 }
1027
1028 void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor,
1029 struct ceph_msg *msg, size_t length)
1030 {
1031 BUG_ON(!length);
1032 BUG_ON(length > msg->data_length);
1033 BUG_ON(!msg->num_data_items);
1034
1035 cursor->total_resid = length;
1036 cursor->data = msg->data;
1037
1038 __ceph_msg_data_cursor_init(cursor);
1039 }
1040
1041
1042
1043
1044
1045
1046 struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
1047 size_t *page_offset, size_t *length,
1048 bool *last_piece)
1049 {
1050 struct page *page;
1051
1052 switch (cursor->data->type) {
1053 case CEPH_MSG_DATA_PAGELIST:
1054 page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
1055 break;
1056 case CEPH_MSG_DATA_PAGES:
1057 page = ceph_msg_data_pages_next(cursor, page_offset, length);
1058 break;
1059 #ifdef CONFIG_BLOCK
1060 case CEPH_MSG_DATA_BIO:
1061 page = ceph_msg_data_bio_next(cursor, page_offset, length);
1062 break;
1063 #endif
1064 case CEPH_MSG_DATA_BVECS:
1065 page = ceph_msg_data_bvecs_next(cursor, page_offset, length);
1066 break;
1067 case CEPH_MSG_DATA_NONE:
1068 default:
1069 page = NULL;
1070 break;
1071 }
1072
1073 BUG_ON(!page);
1074 BUG_ON(*page_offset + *length > PAGE_SIZE);
1075 BUG_ON(!*length);
1076 BUG_ON(*length > cursor->resid);
1077 if (last_piece)
1078 *last_piece = cursor->last_piece;
1079
1080 return page;
1081 }
1082
1083
1084
1085
1086
1087 void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes)
1088 {
1089 bool new_piece;
1090
1091 BUG_ON(bytes > cursor->resid);
1092 switch (cursor->data->type) {
1093 case CEPH_MSG_DATA_PAGELIST:
1094 new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
1095 break;
1096 case CEPH_MSG_DATA_PAGES:
1097 new_piece = ceph_msg_data_pages_advance(cursor, bytes);
1098 break;
1099 #ifdef CONFIG_BLOCK
1100 case CEPH_MSG_DATA_BIO:
1101 new_piece = ceph_msg_data_bio_advance(cursor, bytes);
1102 break;
1103 #endif
1104 case CEPH_MSG_DATA_BVECS:
1105 new_piece = ceph_msg_data_bvecs_advance(cursor, bytes);
1106 break;
1107 case CEPH_MSG_DATA_NONE:
1108 default:
1109 BUG();
1110 break;
1111 }
1112 cursor->total_resid -= bytes;
1113
1114 if (!cursor->resid && cursor->total_resid) {
1115 WARN_ON(!cursor->last_piece);
1116 cursor->data++;
1117 __ceph_msg_data_cursor_init(cursor);
1118 new_piece = true;
1119 }
1120 cursor->need_crc = new_piece;
1121 }
1122
1123 u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset,
1124 unsigned int length)
1125 {
1126 char *kaddr;
1127
1128 kaddr = kmap(page);
1129 BUG_ON(kaddr == NULL);
1130 crc = crc32c(crc, kaddr + page_offset, length);
1131 kunmap(page);
1132
1133 return crc;
1134 }
1135
1136 bool ceph_addr_is_blank(const struct ceph_entity_addr *addr)
1137 {
1138 struct sockaddr_storage ss = addr->in_addr;
1139 struct in_addr *addr4 = &((struct sockaddr_in *)&ss)->sin_addr;
1140 struct in6_addr *addr6 = &((struct sockaddr_in6 *)&ss)->sin6_addr;
1141
1142 switch (ss.ss_family) {
1143 case AF_INET:
1144 return addr4->s_addr == htonl(INADDR_ANY);
1145 case AF_INET6:
1146 return ipv6_addr_any(addr6);
1147 default:
1148 return true;
1149 }
1150 }
1151
1152 int ceph_addr_port(const struct ceph_entity_addr *addr)
1153 {
1154 switch (get_unaligned(&addr->in_addr.ss_family)) {
1155 case AF_INET:
1156 return ntohs(get_unaligned(&((struct sockaddr_in *)&addr->in_addr)->sin_port));
1157 case AF_INET6:
1158 return ntohs(get_unaligned(&((struct sockaddr_in6 *)&addr->in_addr)->sin6_port));
1159 }
1160 return 0;
1161 }
1162
1163 void ceph_addr_set_port(struct ceph_entity_addr *addr, int p)
1164 {
1165 switch (get_unaligned(&addr->in_addr.ss_family)) {
1166 case AF_INET:
1167 put_unaligned(htons(p), &((struct sockaddr_in *)&addr->in_addr)->sin_port);
1168 break;
1169 case AF_INET6:
1170 put_unaligned(htons(p), &((struct sockaddr_in6 *)&addr->in_addr)->sin6_port);
1171 break;
1172 }
1173 }
1174
1175
1176
1177
1178 static int ceph_pton(const char *str, size_t len, struct ceph_entity_addr *addr,
1179 char delim, const char **ipend)
1180 {
1181 memset(&addr->in_addr, 0, sizeof(addr->in_addr));
1182
1183 if (in4_pton(str, len, (u8 *)&((struct sockaddr_in *)&addr->in_addr)->sin_addr.s_addr, delim, ipend)) {
1184 put_unaligned(AF_INET, &addr->in_addr.ss_family);
1185 return 0;
1186 }
1187
1188 if (in6_pton(str, len, (u8 *)&((struct sockaddr_in6 *)&addr->in_addr)->sin6_addr.s6_addr, delim, ipend)) {
1189 put_unaligned(AF_INET6, &addr->in_addr.ss_family);
1190 return 0;
1191 }
1192
1193 return -EINVAL;
1194 }
1195
1196
1197
1198
1199 #ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
1200 static int ceph_dns_resolve_name(const char *name, size_t namelen,
1201 struct ceph_entity_addr *addr, char delim, const char **ipend)
1202 {
1203 const char *end, *delim_p;
1204 char *colon_p, *ip_addr = NULL;
1205 int ip_len, ret;
1206
1207
1208
1209
1210
1211 delim_p = memchr(name, delim, namelen);
1212 colon_p = memchr(name, ':', namelen);
1213
1214 if (delim_p && colon_p)
1215 end = delim_p < colon_p ? delim_p : colon_p;
1216 else if (!delim_p && colon_p)
1217 end = colon_p;
1218 else {
1219 end = delim_p;
1220 if (!end)
1221 end = name + namelen;
1222 }
1223
1224 if (end <= name)
1225 return -EINVAL;
1226
1227
1228 ip_len = dns_query(current->nsproxy->net_ns,
1229 NULL, name, end - name, NULL, &ip_addr, NULL, false);
1230 if (ip_len > 0)
1231 ret = ceph_pton(ip_addr, ip_len, addr, -1, NULL);
1232 else
1233 ret = -ESRCH;
1234
1235 kfree(ip_addr);
1236
1237 *ipend = end;
1238
1239 pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name,
1240 ret, ret ? "failed" : ceph_pr_addr(addr));
1241
1242 return ret;
1243 }
1244 #else
1245 static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
1246 struct ceph_entity_addr *addr, char delim, const char **ipend)
1247 {
1248 return -EINVAL;
1249 }
1250 #endif
1251
1252
1253
1254
1255
1256 static int ceph_parse_server_name(const char *name, size_t namelen,
1257 struct ceph_entity_addr *addr, char delim, const char **ipend)
1258 {
1259 int ret;
1260
1261 ret = ceph_pton(name, namelen, addr, delim, ipend);
1262 if (ret)
1263 ret = ceph_dns_resolve_name(name, namelen, addr, delim, ipend);
1264
1265 return ret;
1266 }
1267
1268
1269
1270
1271
1272 int ceph_parse_ips(const char *c, const char *end,
1273 struct ceph_entity_addr *addr,
1274 int max_count, int *count, char delim)
1275 {
1276 int i, ret = -EINVAL;
1277 const char *p = c;
1278
1279 dout("parse_ips on '%.*s'\n", (int)(end-c), c);
1280 for (i = 0; i < max_count; i++) {
1281 char cur_delim = delim;
1282 const char *ipend;
1283 int port;
1284
1285 if (*p == '[') {
1286 cur_delim = ']';
1287 p++;
1288 }
1289
1290 ret = ceph_parse_server_name(p, end - p, &addr[i], cur_delim,
1291 &ipend);
1292 if (ret)
1293 goto bad;
1294 ret = -EINVAL;
1295
1296 p = ipend;
1297
1298 if (cur_delim == ']') {
1299 if (*p != ']') {
1300 dout("missing matching ']'\n");
1301 goto bad;
1302 }
1303 p++;
1304 }
1305
1306
1307 if (p < end && *p == ':') {
1308 port = 0;
1309 p++;
1310 while (p < end && *p >= '0' && *p <= '9') {
1311 port = (port * 10) + (*p - '0');
1312 p++;
1313 }
1314 if (port == 0)
1315 port = CEPH_MON_PORT;
1316 else if (port > 65535)
1317 goto bad;
1318 } else {
1319 port = CEPH_MON_PORT;
1320 }
1321
1322 ceph_addr_set_port(&addr[i], port);
1323
1324
1325
1326
1327
1328
1329
1330
1331 addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY;
1332 addr[i].nonce = 0;
1333
1334 dout("%s got %s\n", __func__, ceph_pr_addr(&addr[i]));
1335
1336 if (p == end)
1337 break;
1338 if (*p != delim)
1339 goto bad;
1340 p++;
1341 }
1342
1343 if (p != end)
1344 goto bad;
1345
1346 if (count)
1347 *count = i + 1;
1348 return 0;
1349
1350 bad:
1351 return ret;
1352 }
1353
1354
1355
1356
1357
1358
1359 void ceph_con_process_message(struct ceph_connection *con)
1360 {
1361 struct ceph_msg *msg = con->in_msg;
1362
1363 BUG_ON(con->in_msg->con != con);
1364 con->in_msg = NULL;
1365
1366
1367 if (con->peer_name.type == 0)
1368 con->peer_name = msg->hdr.src;
1369
1370 con->in_seq++;
1371 mutex_unlock(&con->mutex);
1372
1373 dout("===== %p %llu from %s%lld %d=%s len %d+%d+%d (%u %u %u) =====\n",
1374 msg, le64_to_cpu(msg->hdr.seq),
1375 ENTITY_NAME(msg->hdr.src),
1376 le16_to_cpu(msg->hdr.type),
1377 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
1378 le32_to_cpu(msg->hdr.front_len),
1379 le32_to_cpu(msg->hdr.middle_len),
1380 le32_to_cpu(msg->hdr.data_len),
1381 con->in_front_crc, con->in_middle_crc, con->in_data_crc);
1382 con->ops->dispatch(con, msg);
1383
1384 mutex_lock(&con->mutex);
1385 }
1386
1387
1388
1389
1390
1391
1392 static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
1393 {
1394 if (!con->ops->get(con)) {
1395 dout("%s %p ref count 0\n", __func__, con);
1396 return -ENOENT;
1397 }
1398
1399 if (delay >= HZ)
1400 delay = round_jiffies_relative(delay);
1401
1402 dout("%s %p %lu\n", __func__, con, delay);
1403 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
1404 dout("%s %p - already queued\n", __func__, con);
1405 con->ops->put(con);
1406 return -EBUSY;
1407 }
1408
1409 return 0;
1410 }
1411
1412 static void queue_con(struct ceph_connection *con)
1413 {
1414 (void) queue_con_delay(con, 0);
1415 }
1416
1417 static void cancel_con(struct ceph_connection *con)
1418 {
1419 if (cancel_delayed_work(&con->work)) {
1420 dout("%s %p\n", __func__, con);
1421 con->ops->put(con);
1422 }
1423 }
1424
1425 static bool con_sock_closed(struct ceph_connection *con)
1426 {
1427 if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED))
1428 return false;
1429
1430 #define CASE(x) \
1431 case CEPH_CON_S_ ## x: \
1432 con->error_msg = "socket closed (con state " #x ")"; \
1433 break;
1434
1435 switch (con->state) {
1436 CASE(CLOSED);
1437 CASE(PREOPEN);
1438 CASE(V1_BANNER);
1439 CASE(V1_CONNECT_MSG);
1440 CASE(V2_BANNER_PREFIX);
1441 CASE(V2_BANNER_PAYLOAD);
1442 CASE(V2_HELLO);
1443 CASE(V2_AUTH);
1444 CASE(V2_AUTH_SIGNATURE);
1445 CASE(V2_SESSION_CONNECT);
1446 CASE(V2_SESSION_RECONNECT);
1447 CASE(OPEN);
1448 CASE(STANDBY);
1449 default:
1450 BUG();
1451 }
1452 #undef CASE
1453
1454 return true;
1455 }
1456
1457 static bool con_backoff(struct ceph_connection *con)
1458 {
1459 int ret;
1460
1461 if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF))
1462 return false;
1463
1464 ret = queue_con_delay(con, con->delay);
1465 if (ret) {
1466 dout("%s: con %p FAILED to back off %lu\n", __func__,
1467 con, con->delay);
1468 BUG_ON(ret == -ENOENT);
1469 ceph_con_flag_set(con, CEPH_CON_F_BACKOFF);
1470 }
1471
1472 return true;
1473 }
1474
1475
1476
1477 static void con_fault_finish(struct ceph_connection *con)
1478 {
1479 dout("%s %p\n", __func__, con);
1480
1481
1482
1483
1484
1485 if (con->v1.auth_retry) {
1486 dout("auth_retry %d, invalidating\n", con->v1.auth_retry);
1487 if (con->ops->invalidate_authorizer)
1488 con->ops->invalidate_authorizer(con);
1489 con->v1.auth_retry = 0;
1490 }
1491
1492 if (con->ops->fault)
1493 con->ops->fault(con);
1494 }
1495
1496
1497
1498
1499 static void ceph_con_workfn(struct work_struct *work)
1500 {
1501 struct ceph_connection *con = container_of(work, struct ceph_connection,
1502 work.work);
1503 bool fault;
1504
1505 mutex_lock(&con->mutex);
1506 while (true) {
1507 int ret;
1508
1509 if ((fault = con_sock_closed(con))) {
1510 dout("%s: con %p SOCK_CLOSED\n", __func__, con);
1511 break;
1512 }
1513 if (con_backoff(con)) {
1514 dout("%s: con %p BACKOFF\n", __func__, con);
1515 break;
1516 }
1517 if (con->state == CEPH_CON_S_STANDBY) {
1518 dout("%s: con %p STANDBY\n", __func__, con);
1519 break;
1520 }
1521 if (con->state == CEPH_CON_S_CLOSED) {
1522 dout("%s: con %p CLOSED\n", __func__, con);
1523 BUG_ON(con->sock);
1524 break;
1525 }
1526 if (con->state == CEPH_CON_S_PREOPEN) {
1527 dout("%s: con %p PREOPEN\n", __func__, con);
1528 BUG_ON(con->sock);
1529 }
1530
1531 if (ceph_msgr2(from_msgr(con->msgr)))
1532 ret = ceph_con_v2_try_read(con);
1533 else
1534 ret = ceph_con_v1_try_read(con);
1535 if (ret < 0) {
1536 if (ret == -EAGAIN)
1537 continue;
1538 if (!con->error_msg)
1539 con->error_msg = "socket error on read";
1540 fault = true;
1541 break;
1542 }
1543
1544 if (ceph_msgr2(from_msgr(con->msgr)))
1545 ret = ceph_con_v2_try_write(con);
1546 else
1547 ret = ceph_con_v1_try_write(con);
1548 if (ret < 0) {
1549 if (ret == -EAGAIN)
1550 continue;
1551 if (!con->error_msg)
1552 con->error_msg = "socket error on write";
1553 fault = true;
1554 }
1555
1556 break;
1557 }
1558 if (fault)
1559 con_fault(con);
1560 mutex_unlock(&con->mutex);
1561
1562 if (fault)
1563 con_fault_finish(con);
1564
1565 con->ops->put(con);
1566 }
1567
1568
1569
1570
1571
1572 static void con_fault(struct ceph_connection *con)
1573 {
1574 dout("fault %p state %d to peer %s\n",
1575 con, con->state, ceph_pr_addr(&con->peer_addr));
1576
1577 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
1578 ceph_pr_addr(&con->peer_addr), con->error_msg);
1579 con->error_msg = NULL;
1580
1581 WARN_ON(con->state == CEPH_CON_S_STANDBY ||
1582 con->state == CEPH_CON_S_CLOSED);
1583
1584 ceph_con_reset_protocol(con);
1585
1586 if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) {
1587 dout("fault on LOSSYTX channel, marking CLOSED\n");
1588 con->state = CEPH_CON_S_CLOSED;
1589 return;
1590 }
1591
1592
1593 list_splice_init(&con->out_sent, &con->out_queue);
1594
1595
1596
1597 if (list_empty(&con->out_queue) &&
1598 !ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
1599 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
1600 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
1601 con->state = CEPH_CON_S_STANDBY;
1602 } else {
1603
1604 con->state = CEPH_CON_S_PREOPEN;
1605 if (!con->delay) {
1606 con->delay = BASE_DELAY_INTERVAL;
1607 } else if (con->delay < MAX_DELAY_INTERVAL) {
1608 con->delay *= 2;
1609 if (con->delay > MAX_DELAY_INTERVAL)
1610 con->delay = MAX_DELAY_INTERVAL;
1611 }
1612 ceph_con_flag_set(con, CEPH_CON_F_BACKOFF);
1613 queue_con(con);
1614 }
1615 }
1616
1617 void ceph_messenger_reset_nonce(struct ceph_messenger *msgr)
1618 {
1619 u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000;
1620 msgr->inst.addr.nonce = cpu_to_le32(nonce);
1621 ceph_encode_my_addr(msgr);
1622 }
1623
1624
1625
1626
1627 void ceph_messenger_init(struct ceph_messenger *msgr,
1628 struct ceph_entity_addr *myaddr)
1629 {
1630 spin_lock_init(&msgr->global_seq_lock);
1631
1632 if (myaddr) {
1633 memcpy(&msgr->inst.addr.in_addr, &myaddr->in_addr,
1634 sizeof(msgr->inst.addr.in_addr));
1635 ceph_addr_set_port(&msgr->inst.addr, 0);
1636 }
1637
1638
1639
1640
1641
1642 msgr->inst.addr.type = CEPH_ENTITY_ADDR_TYPE_ANY;
1643
1644
1645 do {
1646 get_random_bytes(&msgr->inst.addr.nonce,
1647 sizeof(msgr->inst.addr.nonce));
1648 } while (!msgr->inst.addr.nonce);
1649 ceph_encode_my_addr(msgr);
1650
1651 atomic_set(&msgr->stopping, 0);
1652 write_pnet(&msgr->net, get_net(current->nsproxy->net_ns));
1653
1654 dout("%s %p\n", __func__, msgr);
1655 }
1656
1657 void ceph_messenger_fini(struct ceph_messenger *msgr)
1658 {
1659 put_net(read_pnet(&msgr->net));
1660 }
1661
1662 static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
1663 {
1664 if (msg->con)
1665 msg->con->ops->put(msg->con);
1666
1667 msg->con = con ? con->ops->get(con) : NULL;
1668 BUG_ON(msg->con != con);
1669 }
1670
1671 static void clear_standby(struct ceph_connection *con)
1672 {
1673
1674 if (con->state == CEPH_CON_S_STANDBY) {
1675 dout("clear_standby %p and ++connect_seq\n", con);
1676 con->state = CEPH_CON_S_PREOPEN;
1677 con->v1.connect_seq++;
1678 WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING));
1679 WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING));
1680 }
1681 }
1682
1683
1684
1685
1686
1687
1688 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
1689 {
1690
1691 msg->hdr.src = con->msgr->inst.name;
1692 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
1693 msg->needs_out_seq = true;
1694
1695 mutex_lock(&con->mutex);
1696
1697 if (con->state == CEPH_CON_S_CLOSED) {
1698 dout("con_send %p closed, dropping %p\n", con, msg);
1699 ceph_msg_put(msg);
1700 mutex_unlock(&con->mutex);
1701 return;
1702 }
1703
1704 msg_con_set(msg, con);
1705
1706 BUG_ON(!list_empty(&msg->list_head));
1707 list_add_tail(&msg->list_head, &con->out_queue);
1708 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
1709 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
1710 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
1711 le32_to_cpu(msg->hdr.front_len),
1712 le32_to_cpu(msg->hdr.middle_len),
1713 le32_to_cpu(msg->hdr.data_len));
1714
1715 clear_standby(con);
1716 mutex_unlock(&con->mutex);
1717
1718
1719
1720 if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
1721 queue_con(con);
1722 }
1723 EXPORT_SYMBOL(ceph_con_send);
1724
1725
1726
1727
1728 void ceph_msg_revoke(struct ceph_msg *msg)
1729 {
1730 struct ceph_connection *con = msg->con;
1731
1732 if (!con) {
1733 dout("%s msg %p null con\n", __func__, msg);
1734 return;
1735 }
1736
1737 mutex_lock(&con->mutex);
1738 if (list_empty(&msg->list_head)) {
1739 WARN_ON(con->out_msg == msg);
1740 dout("%s con %p msg %p not linked\n", __func__, con, msg);
1741 mutex_unlock(&con->mutex);
1742 return;
1743 }
1744
1745 dout("%s con %p msg %p was linked\n", __func__, con, msg);
1746 msg->hdr.seq = 0;
1747 ceph_msg_remove(msg);
1748
1749 if (con->out_msg == msg) {
1750 WARN_ON(con->state != CEPH_CON_S_OPEN);
1751 dout("%s con %p msg %p was sending\n", __func__, con, msg);
1752 if (ceph_msgr2(from_msgr(con->msgr)))
1753 ceph_con_v2_revoke(con);
1754 else
1755 ceph_con_v1_revoke(con);
1756 ceph_msg_put(con->out_msg);
1757 con->out_msg = NULL;
1758 } else {
1759 dout("%s con %p msg %p not current, out_msg %p\n", __func__,
1760 con, msg, con->out_msg);
1761 }
1762 mutex_unlock(&con->mutex);
1763 }
1764
1765
1766
1767
1768 void ceph_msg_revoke_incoming(struct ceph_msg *msg)
1769 {
1770 struct ceph_connection *con = msg->con;
1771
1772 if (!con) {
1773 dout("%s msg %p null con\n", __func__, msg);
1774 return;
1775 }
1776
1777 mutex_lock(&con->mutex);
1778 if (con->in_msg == msg) {
1779 WARN_ON(con->state != CEPH_CON_S_OPEN);
1780 dout("%s con %p msg %p was recving\n", __func__, con, msg);
1781 if (ceph_msgr2(from_msgr(con->msgr)))
1782 ceph_con_v2_revoke_incoming(con);
1783 else
1784 ceph_con_v1_revoke_incoming(con);
1785 ceph_msg_put(con->in_msg);
1786 con->in_msg = NULL;
1787 } else {
1788 dout("%s con %p msg %p not current, in_msg %p\n", __func__,
1789 con, msg, con->in_msg);
1790 }
1791 mutex_unlock(&con->mutex);
1792 }
1793
1794
1795
1796
1797 void ceph_con_keepalive(struct ceph_connection *con)
1798 {
1799 dout("con_keepalive %p\n", con);
1800 mutex_lock(&con->mutex);
1801 clear_standby(con);
1802 ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING);
1803 mutex_unlock(&con->mutex);
1804
1805 if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
1806 queue_con(con);
1807 }
1808 EXPORT_SYMBOL(ceph_con_keepalive);
1809
1810 bool ceph_con_keepalive_expired(struct ceph_connection *con,
1811 unsigned long interval)
1812 {
1813 if (interval > 0 &&
1814 (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) {
1815 struct timespec64 now;
1816 struct timespec64 ts;
1817 ktime_get_real_ts64(&now);
1818 jiffies_to_timespec64(interval, &ts);
1819 ts = timespec64_add(con->last_keepalive_ack, ts);
1820 return timespec64_compare(&now, &ts) >= 0;
1821 }
1822 return false;
1823 }
1824
1825 static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)
1826 {
1827 BUG_ON(msg->num_data_items >= msg->max_data_items);
1828 return &msg->data[msg->num_data_items++];
1829 }
1830
1831 static void ceph_msg_data_destroy(struct ceph_msg_data *data)
1832 {
1833 if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
1834 int num_pages = calc_pages_for(data->alignment, data->length);
1835 ceph_release_page_vector(data->pages, num_pages);
1836 } else if (data->type == CEPH_MSG_DATA_PAGELIST) {
1837 ceph_pagelist_release(data->pagelist);
1838 }
1839 }
1840
1841 void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
1842 size_t length, size_t alignment, bool own_pages)
1843 {
1844 struct ceph_msg_data *data;
1845
1846 BUG_ON(!pages);
1847 BUG_ON(!length);
1848
1849 data = ceph_msg_data_add(msg);
1850 data->type = CEPH_MSG_DATA_PAGES;
1851 data->pages = pages;
1852 data->length = length;
1853 data->alignment = alignment & ~PAGE_MASK;
1854 data->own_pages = own_pages;
1855
1856 msg->data_length += length;
1857 }
1858 EXPORT_SYMBOL(ceph_msg_data_add_pages);
1859
1860 void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
1861 struct ceph_pagelist *pagelist)
1862 {
1863 struct ceph_msg_data *data;
1864
1865 BUG_ON(!pagelist);
1866 BUG_ON(!pagelist->length);
1867
1868 data = ceph_msg_data_add(msg);
1869 data->type = CEPH_MSG_DATA_PAGELIST;
1870 refcount_inc(&pagelist->refcnt);
1871 data->pagelist = pagelist;
1872
1873 msg->data_length += pagelist->length;
1874 }
1875 EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
1876
1877 #ifdef CONFIG_BLOCK
1878 void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
1879 u32 length)
1880 {
1881 struct ceph_msg_data *data;
1882
1883 data = ceph_msg_data_add(msg);
1884 data->type = CEPH_MSG_DATA_BIO;
1885 data->bio_pos = *bio_pos;
1886 data->bio_length = length;
1887
1888 msg->data_length += length;
1889 }
1890 EXPORT_SYMBOL(ceph_msg_data_add_bio);
1891 #endif
1892
1893 void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
1894 struct ceph_bvec_iter *bvec_pos)
1895 {
1896 struct ceph_msg_data *data;
1897
1898 data = ceph_msg_data_add(msg);
1899 data->type = CEPH_MSG_DATA_BVECS;
1900 data->bvec_pos = *bvec_pos;
1901
1902 msg->data_length += bvec_pos->iter.bi_size;
1903 }
1904 EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
1905
1906
1907
1908
1909
1910 struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
1911 gfp_t flags, bool can_fail)
1912 {
1913 struct ceph_msg *m;
1914
1915 m = kmem_cache_zalloc(ceph_msg_cache, flags);
1916 if (m == NULL)
1917 goto out;
1918
1919 m->hdr.type = cpu_to_le16(type);
1920 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
1921 m->hdr.front_len = cpu_to_le32(front_len);
1922
1923 INIT_LIST_HEAD(&m->list_head);
1924 kref_init(&m->kref);
1925
1926
1927 if (front_len) {
1928 m->front.iov_base = kvmalloc(front_len, flags);
1929 if (m->front.iov_base == NULL) {
1930 dout("ceph_msg_new can't allocate %d bytes\n",
1931 front_len);
1932 goto out2;
1933 }
1934 } else {
1935 m->front.iov_base = NULL;
1936 }
1937 m->front_alloc_len = m->front.iov_len = front_len;
1938
1939 if (max_data_items) {
1940 m->data = kmalloc_array(max_data_items, sizeof(*m->data),
1941 flags);
1942 if (!m->data)
1943 goto out2;
1944
1945 m->max_data_items = max_data_items;
1946 }
1947
1948 dout("ceph_msg_new %p front %d\n", m, front_len);
1949 return m;
1950
1951 out2:
1952 ceph_msg_put(m);
1953 out:
1954 if (!can_fail) {
1955 pr_err("msg_new can't create type %d front %d\n", type,
1956 front_len);
1957 WARN_ON(1);
1958 } else {
1959 dout("msg_new can't create type %d front %d\n", type,
1960 front_len);
1961 }
1962 return NULL;
1963 }
1964 EXPORT_SYMBOL(ceph_msg_new2);
1965
1966 struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
1967 bool can_fail)
1968 {
1969 return ceph_msg_new2(type, front_len, 0, flags, can_fail);
1970 }
1971 EXPORT_SYMBOL(ceph_msg_new);
1972
1973
1974
1975
1976
1977
1978
1979
1980 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
1981 {
1982 int type = le16_to_cpu(msg->hdr.type);
1983 int middle_len = le32_to_cpu(msg->hdr.middle_len);
1984
1985 dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
1986 ceph_msg_type_name(type), middle_len);
1987 BUG_ON(!middle_len);
1988 BUG_ON(msg->middle);
1989
1990 msg->middle = ceph_buffer_new(middle_len, GFP_NOFS);
1991 if (!msg->middle)
1992 return -ENOMEM;
1993 return 0;
1994 }
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011 int ceph_con_in_msg_alloc(struct ceph_connection *con,
2012 struct ceph_msg_header *hdr, int *skip)
2013 {
2014 int middle_len = le32_to_cpu(hdr->middle_len);
2015 struct ceph_msg *msg;
2016 int ret = 0;
2017
2018 BUG_ON(con->in_msg != NULL);
2019 BUG_ON(!con->ops->alloc_msg);
2020
2021 mutex_unlock(&con->mutex);
2022 msg = con->ops->alloc_msg(con, hdr, skip);
2023 mutex_lock(&con->mutex);
2024 if (con->state != CEPH_CON_S_OPEN) {
2025 if (msg)
2026 ceph_msg_put(msg);
2027 return -EAGAIN;
2028 }
2029 if (msg) {
2030 BUG_ON(*skip);
2031 msg_con_set(msg, con);
2032 con->in_msg = msg;
2033 } else {
2034
2035
2036
2037
2038
2039 if (*skip)
2040 return 0;
2041
2042 con->error_msg = "error allocating memory for incoming message";
2043 return -ENOMEM;
2044 }
2045 memcpy(&con->in_msg->hdr, hdr, sizeof(*hdr));
2046
2047 if (middle_len && !con->in_msg->middle) {
2048 ret = ceph_alloc_middle(con, con->in_msg);
2049 if (ret < 0) {
2050 ceph_msg_put(con->in_msg);
2051 con->in_msg = NULL;
2052 }
2053 }
2054
2055 return ret;
2056 }
2057
2058 void ceph_con_get_out_msg(struct ceph_connection *con)
2059 {
2060 struct ceph_msg *msg;
2061
2062 BUG_ON(list_empty(&con->out_queue));
2063 msg = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
2064 WARN_ON(msg->con != con);
2065
2066
2067
2068
2069
2070 list_move_tail(&msg->list_head, &con->out_sent);
2071
2072
2073
2074
2075
2076 if (msg->needs_out_seq) {
2077 msg->hdr.seq = cpu_to_le64(++con->out_seq);
2078 msg->needs_out_seq = false;
2079
2080 if (con->ops->reencode_message)
2081 con->ops->reencode_message(msg);
2082 }
2083
2084
2085
2086
2087
2088 WARN_ON(con->out_msg);
2089 con->out_msg = ceph_msg_get(msg);
2090 }
2091
2092
2093
2094
2095 static void ceph_msg_free(struct ceph_msg *m)
2096 {
2097 dout("%s %p\n", __func__, m);
2098 kvfree(m->front.iov_base);
2099 kfree(m->data);
2100 kmem_cache_free(ceph_msg_cache, m);
2101 }
2102
2103 static void ceph_msg_release(struct kref *kref)
2104 {
2105 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
2106 int i;
2107
2108 dout("%s %p\n", __func__, m);
2109 WARN_ON(!list_empty(&m->list_head));
2110
2111 msg_con_set(m, NULL);
2112
2113
2114 if (m->middle) {
2115 ceph_buffer_put(m->middle);
2116 m->middle = NULL;
2117 }
2118
2119 for (i = 0; i < m->num_data_items; i++)
2120 ceph_msg_data_destroy(&m->data[i]);
2121
2122 if (m->pool)
2123 ceph_msgpool_put(m->pool, m);
2124 else
2125 ceph_msg_free(m);
2126 }
2127
2128 struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
2129 {
2130 dout("%s %p (was %d)\n", __func__, msg,
2131 kref_read(&msg->kref));
2132 kref_get(&msg->kref);
2133 return msg;
2134 }
2135 EXPORT_SYMBOL(ceph_msg_get);
2136
2137 void ceph_msg_put(struct ceph_msg *msg)
2138 {
2139 dout("%s %p (was %d)\n", __func__, msg,
2140 kref_read(&msg->kref));
2141 kref_put(&msg->kref, ceph_msg_release);
2142 }
2143 EXPORT_SYMBOL(ceph_msg_put);
2144
2145 void ceph_msg_dump(struct ceph_msg *msg)
2146 {
2147 pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg,
2148 msg->front_alloc_len, msg->data_length);
2149 print_hex_dump(KERN_DEBUG, "header: ",
2150 DUMP_PREFIX_OFFSET, 16, 1,
2151 &msg->hdr, sizeof(msg->hdr), true);
2152 print_hex_dump(KERN_DEBUG, " front: ",
2153 DUMP_PREFIX_OFFSET, 16, 1,
2154 msg->front.iov_base, msg->front.iov_len, true);
2155 if (msg->middle)
2156 print_hex_dump(KERN_DEBUG, "middle: ",
2157 DUMP_PREFIX_OFFSET, 16, 1,
2158 msg->middle->vec.iov_base,
2159 msg->middle->vec.iov_len, true);
2160 print_hex_dump(KERN_DEBUG, "footer: ",
2161 DUMP_PREFIX_OFFSET, 16, 1,
2162 &msg->footer, sizeof(msg->footer), true);
2163 }
2164 EXPORT_SYMBOL(ceph_msg_dump);