Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0
0002 #include <linux/ceph/ceph_debug.h>
0003 
0004 #include <linux/bvec.h>
0005 #include <linux/crc32c.h>
0006 #include <linux/net.h>
0007 #include <linux/socket.h>
0008 #include <net/sock.h>
0009 
0010 #include <linux/ceph/ceph_features.h>
0011 #include <linux/ceph/decode.h>
0012 #include <linux/ceph/libceph.h>
0013 #include <linux/ceph/messenger.h>
0014 
0015 /* static tag bytes (protocol control messages) */
0016 static char tag_msg = CEPH_MSGR_TAG_MSG;
0017 static char tag_ack = CEPH_MSGR_TAG_ACK;
0018 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
0019 static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
0020 
0021 /*
0022  * If @buf is NULL, discard up to @len bytes.
0023  */
0024 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
0025 {
0026     struct kvec iov = {buf, len};
0027     struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
0028     int r;
0029 
0030     if (!buf)
0031         msg.msg_flags |= MSG_TRUNC;
0032 
0033     iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, len);
0034     r = sock_recvmsg(sock, &msg, msg.msg_flags);
0035     if (r == -EAGAIN)
0036         r = 0;
0037     return r;
0038 }
0039 
0040 static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
0041              int page_offset, size_t length)
0042 {
0043     struct bio_vec bvec = {
0044         .bv_page = page,
0045         .bv_offset = page_offset,
0046         .bv_len = length
0047     };
0048     struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
0049     int r;
0050 
0051     BUG_ON(page_offset + length > PAGE_SIZE);
0052     iov_iter_bvec(&msg.msg_iter, READ, &bvec, 1, length);
0053     r = sock_recvmsg(sock, &msg, msg.msg_flags);
0054     if (r == -EAGAIN)
0055         r = 0;
0056     return r;
0057 }
0058 
0059 /*
0060  * write something.  @more is true if caller will be sending more data
0061  * shortly.
0062  */
0063 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
0064                 size_t kvlen, size_t len, bool more)
0065 {
0066     struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
0067     int r;
0068 
0069     if (more)
0070         msg.msg_flags |= MSG_MORE;
0071     else
0072         msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
0073 
0074     r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
0075     if (r == -EAGAIN)
0076         r = 0;
0077     return r;
0078 }
0079 
0080 /*
0081  * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST
0082  */
0083 static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
0084                  int offset, size_t size, int more)
0085 {
0086     ssize_t (*sendpage)(struct socket *sock, struct page *page,
0087                 int offset, size_t size, int flags);
0088     int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
0089     int ret;
0090 
0091     /*
0092      * sendpage cannot properly handle pages with page_count == 0,
0093      * we need to fall back to sendmsg if that's the case.
0094      *
0095      * Same goes for slab pages: skb_can_coalesce() allows
0096      * coalescing neighboring slab objects into a single frag which
0097      * triggers one of hardened usercopy checks.
0098      */
0099     if (sendpage_ok(page))
0100         sendpage = sock->ops->sendpage;
0101     else
0102         sendpage = sock_no_sendpage;
0103 
0104     ret = sendpage(sock, page, offset, size, flags);
0105     if (ret == -EAGAIN)
0106         ret = 0;
0107 
0108     return ret;
0109 }
0110 
0111 static void con_out_kvec_reset(struct ceph_connection *con)
0112 {
0113     BUG_ON(con->v1.out_skip);
0114 
0115     con->v1.out_kvec_left = 0;
0116     con->v1.out_kvec_bytes = 0;
0117     con->v1.out_kvec_cur = &con->v1.out_kvec[0];
0118 }
0119 
0120 static void con_out_kvec_add(struct ceph_connection *con,
0121                 size_t size, void *data)
0122 {
0123     int index = con->v1.out_kvec_left;
0124 
0125     BUG_ON(con->v1.out_skip);
0126     BUG_ON(index >= ARRAY_SIZE(con->v1.out_kvec));
0127 
0128     con->v1.out_kvec[index].iov_len = size;
0129     con->v1.out_kvec[index].iov_base = data;
0130     con->v1.out_kvec_left++;
0131     con->v1.out_kvec_bytes += size;
0132 }
0133 
0134 /*
0135  * Chop off a kvec from the end.  Return residual number of bytes for
0136  * that kvec, i.e. how many bytes would have been written if the kvec
0137  * hadn't been nuked.
0138  */
0139 static int con_out_kvec_skip(struct ceph_connection *con)
0140 {
0141     int skip = 0;
0142 
0143     if (con->v1.out_kvec_bytes > 0) {
0144         skip = con->v1.out_kvec_cur[con->v1.out_kvec_left - 1].iov_len;
0145         BUG_ON(con->v1.out_kvec_bytes < skip);
0146         BUG_ON(!con->v1.out_kvec_left);
0147         con->v1.out_kvec_bytes -= skip;
0148         con->v1.out_kvec_left--;
0149     }
0150 
0151     return skip;
0152 }
0153 
0154 static size_t sizeof_footer(struct ceph_connection *con)
0155 {
0156     return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
0157         sizeof(struct ceph_msg_footer) :
0158         sizeof(struct ceph_msg_footer_old);
0159 }
0160 
0161 static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
0162 {
0163     /* Initialize data cursor */
0164 
0165     ceph_msg_data_cursor_init(&msg->cursor, msg, data_len);
0166 }
0167 
0168 /*
0169  * Prepare footer for currently outgoing message, and finish things
0170  * off.  Assumes out_kvec* are already valid.. we just add on to the end.
0171  */
0172 static void prepare_write_message_footer(struct ceph_connection *con)
0173 {
0174     struct ceph_msg *m = con->out_msg;
0175 
0176     m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
0177 
0178     dout("prepare_write_message_footer %p\n", con);
0179     con_out_kvec_add(con, sizeof_footer(con), &m->footer);
0180     if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
0181         if (con->ops->sign_message)
0182             con->ops->sign_message(m);
0183         else
0184             m->footer.sig = 0;
0185     } else {
0186         m->old_footer.flags = m->footer.flags;
0187     }
0188     con->v1.out_more = m->more_to_follow;
0189     con->v1.out_msg_done = true;
0190 }
0191 
0192 /*
0193  * Prepare headers for the next outgoing message.
0194  */
0195 static void prepare_write_message(struct ceph_connection *con)
0196 {
0197     struct ceph_msg *m;
0198     u32 crc;
0199 
0200     con_out_kvec_reset(con);
0201     con->v1.out_msg_done = false;
0202 
0203     /* Sneak an ack in there first?  If we can get it into the same
0204      * TCP packet that's a good thing. */
0205     if (con->in_seq > con->in_seq_acked) {
0206         con->in_seq_acked = con->in_seq;
0207         con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
0208         con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
0209         con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
0210             &con->v1.out_temp_ack);
0211     }
0212 
0213     ceph_con_get_out_msg(con);
0214     m = con->out_msg;
0215 
0216     dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
0217          m, con->out_seq, le16_to_cpu(m->hdr.type),
0218          le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
0219          m->data_length);
0220     WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
0221     WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
0222 
0223     /* tag + hdr + front + middle */
0224     con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
0225     con_out_kvec_add(con, sizeof(con->v1.out_hdr), &con->v1.out_hdr);
0226     con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
0227 
0228     if (m->middle)
0229         con_out_kvec_add(con, m->middle->vec.iov_len,
0230             m->middle->vec.iov_base);
0231 
0232     /* fill in hdr crc and finalize hdr */
0233     crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
0234     con->out_msg->hdr.crc = cpu_to_le32(crc);
0235     memcpy(&con->v1.out_hdr, &con->out_msg->hdr, sizeof(con->v1.out_hdr));
0236 
0237     /* fill in front and middle crc, footer */
0238     crc = crc32c(0, m->front.iov_base, m->front.iov_len);
0239     con->out_msg->footer.front_crc = cpu_to_le32(crc);
0240     if (m->middle) {
0241         crc = crc32c(0, m->middle->vec.iov_base,
0242                 m->middle->vec.iov_len);
0243         con->out_msg->footer.middle_crc = cpu_to_le32(crc);
0244     } else
0245         con->out_msg->footer.middle_crc = 0;
0246     dout("%s front_crc %u middle_crc %u\n", __func__,
0247          le32_to_cpu(con->out_msg->footer.front_crc),
0248          le32_to_cpu(con->out_msg->footer.middle_crc));
0249     con->out_msg->footer.flags = 0;
0250 
0251     /* is there a data payload? */
0252     con->out_msg->footer.data_crc = 0;
0253     if (m->data_length) {
0254         prepare_message_data(con->out_msg, m->data_length);
0255         con->v1.out_more = 1;  /* data + footer will follow */
0256     } else {
0257         /* no, queue up footer too and be done */
0258         prepare_write_message_footer(con);
0259     }
0260 
0261     ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
0262 }
0263 
0264 /*
0265  * Prepare an ack.
0266  */
0267 static void prepare_write_ack(struct ceph_connection *con)
0268 {
0269     dout("prepare_write_ack %p %llu -> %llu\n", con,
0270          con->in_seq_acked, con->in_seq);
0271     con->in_seq_acked = con->in_seq;
0272 
0273     con_out_kvec_reset(con);
0274 
0275     con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
0276 
0277     con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
0278     con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
0279              &con->v1.out_temp_ack);
0280 
0281     con->v1.out_more = 1;  /* more will follow.. eventually.. */
0282     ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
0283 }
0284 
0285 /*
0286  * Prepare to share the seq during handshake
0287  */
0288 static void prepare_write_seq(struct ceph_connection *con)
0289 {
0290     dout("prepare_write_seq %p %llu -> %llu\n", con,
0291          con->in_seq_acked, con->in_seq);
0292     con->in_seq_acked = con->in_seq;
0293 
0294     con_out_kvec_reset(con);
0295 
0296     con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
0297     con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
0298              &con->v1.out_temp_ack);
0299 
0300     ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
0301 }
0302 
0303 /*
0304  * Prepare to write keepalive byte.
0305  */
0306 static void prepare_write_keepalive(struct ceph_connection *con)
0307 {
0308     dout("prepare_write_keepalive %p\n", con);
0309     con_out_kvec_reset(con);
0310     if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
0311         struct timespec64 now;
0312 
0313         ktime_get_real_ts64(&now);
0314         con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
0315         ceph_encode_timespec64(&con->v1.out_temp_keepalive2, &now);
0316         con_out_kvec_add(con, sizeof(con->v1.out_temp_keepalive2),
0317                  &con->v1.out_temp_keepalive2);
0318     } else {
0319         con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
0320     }
0321     ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
0322 }
0323 
0324 /*
0325  * Connection negotiation.
0326  */
0327 
0328 static int get_connect_authorizer(struct ceph_connection *con)
0329 {
0330     struct ceph_auth_handshake *auth;
0331     int auth_proto;
0332 
0333     if (!con->ops->get_authorizer) {
0334         con->v1.auth = NULL;
0335         con->v1.out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
0336         con->v1.out_connect.authorizer_len = 0;
0337         return 0;
0338     }
0339 
0340     auth = con->ops->get_authorizer(con, &auth_proto, con->v1.auth_retry);
0341     if (IS_ERR(auth))
0342         return PTR_ERR(auth);
0343 
0344     con->v1.auth = auth;
0345     con->v1.out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
0346     con->v1.out_connect.authorizer_len =
0347         cpu_to_le32(auth->authorizer_buf_len);
0348     return 0;
0349 }
0350 
0351 /*
0352  * We connected to a peer and are saying hello.
0353  */
0354 static void prepare_write_banner(struct ceph_connection *con)
0355 {
0356     con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
0357     con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
0358                     &con->msgr->my_enc_addr);
0359 
0360     con->v1.out_more = 0;
0361     ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
0362 }
0363 
0364 static void __prepare_write_connect(struct ceph_connection *con)
0365 {
0366     con_out_kvec_add(con, sizeof(con->v1.out_connect),
0367              &con->v1.out_connect);
0368     if (con->v1.auth)
0369         con_out_kvec_add(con, con->v1.auth->authorizer_buf_len,
0370                  con->v1.auth->authorizer_buf);
0371 
0372     con->v1.out_more = 0;
0373     ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
0374 }
0375 
0376 static int prepare_write_connect(struct ceph_connection *con)
0377 {
0378     unsigned int global_seq = ceph_get_global_seq(con->msgr, 0);
0379     int proto;
0380     int ret;
0381 
0382     switch (con->peer_name.type) {
0383     case CEPH_ENTITY_TYPE_MON:
0384         proto = CEPH_MONC_PROTOCOL;
0385         break;
0386     case CEPH_ENTITY_TYPE_OSD:
0387         proto = CEPH_OSDC_PROTOCOL;
0388         break;
0389     case CEPH_ENTITY_TYPE_MDS:
0390         proto = CEPH_MDSC_PROTOCOL;
0391         break;
0392     default:
0393         BUG();
0394     }
0395 
0396     dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
0397          con->v1.connect_seq, global_seq, proto);
0398 
0399     con->v1.out_connect.features =
0400         cpu_to_le64(from_msgr(con->msgr)->supported_features);
0401     con->v1.out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
0402     con->v1.out_connect.connect_seq = cpu_to_le32(con->v1.connect_seq);
0403     con->v1.out_connect.global_seq = cpu_to_le32(global_seq);
0404     con->v1.out_connect.protocol_version = cpu_to_le32(proto);
0405     con->v1.out_connect.flags = 0;
0406 
0407     ret = get_connect_authorizer(con);
0408     if (ret)
0409         return ret;
0410 
0411     __prepare_write_connect(con);
0412     return 0;
0413 }
0414 
0415 /*
0416  * write as much of pending kvecs to the socket as we can.
0417  *  1 -> done
0418  *  0 -> socket full, but more to do
0419  * <0 -> error
0420  */
0421 static int write_partial_kvec(struct ceph_connection *con)
0422 {
0423     int ret;
0424 
0425     dout("write_partial_kvec %p %d left\n", con, con->v1.out_kvec_bytes);
0426     while (con->v1.out_kvec_bytes > 0) {
0427         ret = ceph_tcp_sendmsg(con->sock, con->v1.out_kvec_cur,
0428                        con->v1.out_kvec_left,
0429                        con->v1.out_kvec_bytes,
0430                        con->v1.out_more);
0431         if (ret <= 0)
0432             goto out;
0433         con->v1.out_kvec_bytes -= ret;
0434         if (!con->v1.out_kvec_bytes)
0435             break;            /* done */
0436 
0437         /* account for full iov entries consumed */
0438         while (ret >= con->v1.out_kvec_cur->iov_len) {
0439             BUG_ON(!con->v1.out_kvec_left);
0440             ret -= con->v1.out_kvec_cur->iov_len;
0441             con->v1.out_kvec_cur++;
0442             con->v1.out_kvec_left--;
0443         }
0444         /* and for a partially-consumed entry */
0445         if (ret) {
0446             con->v1.out_kvec_cur->iov_len -= ret;
0447             con->v1.out_kvec_cur->iov_base += ret;
0448         }
0449     }
0450     con->v1.out_kvec_left = 0;
0451     ret = 1;
0452 out:
0453     dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
0454          con->v1.out_kvec_bytes, con->v1.out_kvec_left, ret);
0455     return ret;  /* done! */
0456 }
0457 
0458 /*
0459  * Write as much message data payload as we can.  If we finish, queue
0460  * up the footer.
0461  *  1 -> done, footer is now queued in out_kvec[].
0462  *  0 -> socket full, but more to do
0463  * <0 -> error
0464  */
0465 static int write_partial_message_data(struct ceph_connection *con)
0466 {
0467     struct ceph_msg *msg = con->out_msg;
0468     struct ceph_msg_data_cursor *cursor = &msg->cursor;
0469     bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
0470     int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
0471     u32 crc;
0472 
0473     dout("%s %p msg %p\n", __func__, con, msg);
0474 
0475     if (!msg->num_data_items)
0476         return -EINVAL;
0477 
0478     /*
0479      * Iterate through each page that contains data to be
0480      * written, and send as much as possible for each.
0481      *
0482      * If we are calculating the data crc (the default), we will
0483      * need to map the page.  If we have no pages, they have
0484      * been revoked, so use the zero page.
0485      */
0486     crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
0487     while (cursor->total_resid) {
0488         struct page *page;
0489         size_t page_offset;
0490         size_t length;
0491         int ret;
0492 
0493         if (!cursor->resid) {
0494             ceph_msg_data_advance(cursor, 0);
0495             continue;
0496         }
0497 
0498         page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
0499         if (length == cursor->total_resid)
0500             more = MSG_MORE;
0501         ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
0502                     more);
0503         if (ret <= 0) {
0504             if (do_datacrc)
0505                 msg->footer.data_crc = cpu_to_le32(crc);
0506 
0507             return ret;
0508         }
0509         if (do_datacrc && cursor->need_crc)
0510             crc = ceph_crc32c_page(crc, page, page_offset, length);
0511         ceph_msg_data_advance(cursor, (size_t)ret);
0512     }
0513 
0514     dout("%s %p msg %p done\n", __func__, con, msg);
0515 
0516     /* prepare and queue up footer, too */
0517     if (do_datacrc)
0518         msg->footer.data_crc = cpu_to_le32(crc);
0519     else
0520         msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
0521     con_out_kvec_reset(con);
0522     prepare_write_message_footer(con);
0523 
0524     return 1;   /* must return > 0 to indicate success */
0525 }
0526 
0527 /*
0528  * write some zeros
0529  */
0530 static int write_partial_skip(struct ceph_connection *con)
0531 {
0532     int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
0533     int ret;
0534 
0535     dout("%s %p %d left\n", __func__, con, con->v1.out_skip);
0536     while (con->v1.out_skip > 0) {
0537         size_t size = min(con->v1.out_skip, (int)PAGE_SIZE);
0538 
0539         if (size == con->v1.out_skip)
0540             more = MSG_MORE;
0541         ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size,
0542                     more);
0543         if (ret <= 0)
0544             goto out;
0545         con->v1.out_skip -= ret;
0546     }
0547     ret = 1;
0548 out:
0549     return ret;
0550 }
0551 
0552 /*
0553  * Prepare to read connection handshake, or an ack.
0554  */
0555 static void prepare_read_banner(struct ceph_connection *con)
0556 {
0557     dout("prepare_read_banner %p\n", con);
0558     con->v1.in_base_pos = 0;
0559 }
0560 
0561 static void prepare_read_connect(struct ceph_connection *con)
0562 {
0563     dout("prepare_read_connect %p\n", con);
0564     con->v1.in_base_pos = 0;
0565 }
0566 
0567 static void prepare_read_ack(struct ceph_connection *con)
0568 {
0569     dout("prepare_read_ack %p\n", con);
0570     con->v1.in_base_pos = 0;
0571 }
0572 
0573 static void prepare_read_seq(struct ceph_connection *con)
0574 {
0575     dout("prepare_read_seq %p\n", con);
0576     con->v1.in_base_pos = 0;
0577     con->v1.in_tag = CEPH_MSGR_TAG_SEQ;
0578 }
0579 
0580 static void prepare_read_tag(struct ceph_connection *con)
0581 {
0582     dout("prepare_read_tag %p\n", con);
0583     con->v1.in_base_pos = 0;
0584     con->v1.in_tag = CEPH_MSGR_TAG_READY;
0585 }
0586 
0587 static void prepare_read_keepalive_ack(struct ceph_connection *con)
0588 {
0589     dout("prepare_read_keepalive_ack %p\n", con);
0590     con->v1.in_base_pos = 0;
0591 }
0592 
0593 /*
0594  * Prepare to read a message.
0595  */
0596 static int prepare_read_message(struct ceph_connection *con)
0597 {
0598     dout("prepare_read_message %p\n", con);
0599     BUG_ON(con->in_msg != NULL);
0600     con->v1.in_base_pos = 0;
0601     con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
0602     return 0;
0603 }
0604 
0605 static int read_partial(struct ceph_connection *con,
0606             int end, int size, void *object)
0607 {
0608     while (con->v1.in_base_pos < end) {
0609         int left = end - con->v1.in_base_pos;
0610         int have = size - left;
0611         int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
0612         if (ret <= 0)
0613             return ret;
0614         con->v1.in_base_pos += ret;
0615     }
0616     return 1;
0617 }
0618 
0619 /*
0620  * Read all or part of the connect-side handshake on a new connection
0621  */
0622 static int read_partial_banner(struct ceph_connection *con)
0623 {
0624     int size;
0625     int end;
0626     int ret;
0627 
0628     dout("read_partial_banner %p at %d\n", con, con->v1.in_base_pos);
0629 
0630     /* peer's banner */
0631     size = strlen(CEPH_BANNER);
0632     end = size;
0633     ret = read_partial(con, end, size, con->v1.in_banner);
0634     if (ret <= 0)
0635         goto out;
0636 
0637     size = sizeof(con->v1.actual_peer_addr);
0638     end += size;
0639     ret = read_partial(con, end, size, &con->v1.actual_peer_addr);
0640     if (ret <= 0)
0641         goto out;
0642     ceph_decode_banner_addr(&con->v1.actual_peer_addr);
0643 
0644     size = sizeof(con->v1.peer_addr_for_me);
0645     end += size;
0646     ret = read_partial(con, end, size, &con->v1.peer_addr_for_me);
0647     if (ret <= 0)
0648         goto out;
0649     ceph_decode_banner_addr(&con->v1.peer_addr_for_me);
0650 
0651 out:
0652     return ret;
0653 }
0654 
0655 static int read_partial_connect(struct ceph_connection *con)
0656 {
0657     int size;
0658     int end;
0659     int ret;
0660 
0661     dout("read_partial_connect %p at %d\n", con, con->v1.in_base_pos);
0662 
0663     size = sizeof(con->v1.in_reply);
0664     end = size;
0665     ret = read_partial(con, end, size, &con->v1.in_reply);
0666     if (ret <= 0)
0667         goto out;
0668 
0669     if (con->v1.auth) {
0670         size = le32_to_cpu(con->v1.in_reply.authorizer_len);
0671         if (size > con->v1.auth->authorizer_reply_buf_len) {
0672             pr_err("authorizer reply too big: %d > %zu\n", size,
0673                    con->v1.auth->authorizer_reply_buf_len);
0674             ret = -EINVAL;
0675             goto out;
0676         }
0677 
0678         end += size;
0679         ret = read_partial(con, end, size,
0680                    con->v1.auth->authorizer_reply_buf);
0681         if (ret <= 0)
0682             goto out;
0683     }
0684 
0685     dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
0686          con, con->v1.in_reply.tag,
0687          le32_to_cpu(con->v1.in_reply.connect_seq),
0688          le32_to_cpu(con->v1.in_reply.global_seq));
0689 out:
0690     return ret;
0691 }
0692 
0693 /*
0694  * Verify the hello banner looks okay.
0695  */
0696 static int verify_hello(struct ceph_connection *con)
0697 {
0698     if (memcmp(con->v1.in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
0699         pr_err("connect to %s got bad banner\n",
0700                ceph_pr_addr(&con->peer_addr));
0701         con->error_msg = "protocol error, bad banner";
0702         return -1;
0703     }
0704     return 0;
0705 }
0706 
0707 static int process_banner(struct ceph_connection *con)
0708 {
0709     struct ceph_entity_addr *my_addr = &con->msgr->inst.addr;
0710 
0711     dout("process_banner on %p\n", con);
0712 
0713     if (verify_hello(con) < 0)
0714         return -1;
0715 
0716     /*
0717      * Make sure the other end is who we wanted.  note that the other
0718      * end may not yet know their ip address, so if it's 0.0.0.0, give
0719      * them the benefit of the doubt.
0720      */
0721     if (memcmp(&con->peer_addr, &con->v1.actual_peer_addr,
0722            sizeof(con->peer_addr)) != 0 &&
0723         !(ceph_addr_is_blank(&con->v1.actual_peer_addr) &&
0724           con->v1.actual_peer_addr.nonce == con->peer_addr.nonce)) {
0725         pr_warn("wrong peer, want %s/%u, got %s/%u\n",
0726             ceph_pr_addr(&con->peer_addr),
0727             le32_to_cpu(con->peer_addr.nonce),
0728             ceph_pr_addr(&con->v1.actual_peer_addr),
0729             le32_to_cpu(con->v1.actual_peer_addr.nonce));
0730         con->error_msg = "wrong peer at address";
0731         return -1;
0732     }
0733 
0734     /*
0735      * did we learn our address?
0736      */
0737     if (ceph_addr_is_blank(my_addr)) {
0738         memcpy(&my_addr->in_addr,
0739                &con->v1.peer_addr_for_me.in_addr,
0740                sizeof(con->v1.peer_addr_for_me.in_addr));
0741         ceph_addr_set_port(my_addr, 0);
0742         ceph_encode_my_addr(con->msgr);
0743         dout("process_banner learned my addr is %s\n",
0744              ceph_pr_addr(my_addr));
0745     }
0746 
0747     return 0;
0748 }
0749 
0750 static int process_connect(struct ceph_connection *con)
0751 {
0752     u64 sup_feat = from_msgr(con->msgr)->supported_features;
0753     u64 req_feat = from_msgr(con->msgr)->required_features;
0754     u64 server_feat = le64_to_cpu(con->v1.in_reply.features);
0755     int ret;
0756 
0757     dout("process_connect on %p tag %d\n", con, con->v1.in_tag);
0758 
0759     if (con->v1.auth) {
0760         int len = le32_to_cpu(con->v1.in_reply.authorizer_len);
0761 
0762         /*
0763          * Any connection that defines ->get_authorizer()
0764          * should also define ->add_authorizer_challenge() and
0765          * ->verify_authorizer_reply().
0766          *
0767          * See get_connect_authorizer().
0768          */
0769         if (con->v1.in_reply.tag ==
0770                 CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
0771             ret = con->ops->add_authorizer_challenge(
0772                 con, con->v1.auth->authorizer_reply_buf, len);
0773             if (ret < 0)
0774                 return ret;
0775 
0776             con_out_kvec_reset(con);
0777             __prepare_write_connect(con);
0778             prepare_read_connect(con);
0779             return 0;
0780         }
0781 
0782         if (len) {
0783             ret = con->ops->verify_authorizer_reply(con);
0784             if (ret < 0) {
0785                 con->error_msg = "bad authorize reply";
0786                 return ret;
0787             }
0788         }
0789     }
0790 
0791     switch (con->v1.in_reply.tag) {
0792     case CEPH_MSGR_TAG_FEATURES:
0793         pr_err("%s%lld %s feature set mismatch,"
0794                " my %llx < server's %llx, missing %llx\n",
0795                ENTITY_NAME(con->peer_name),
0796                ceph_pr_addr(&con->peer_addr),
0797                sup_feat, server_feat, server_feat & ~sup_feat);
0798         con->error_msg = "missing required protocol features";
0799         return -1;
0800 
0801     case CEPH_MSGR_TAG_BADPROTOVER:
0802         pr_err("%s%lld %s protocol version mismatch,"
0803                " my %d != server's %d\n",
0804                ENTITY_NAME(con->peer_name),
0805                ceph_pr_addr(&con->peer_addr),
0806                le32_to_cpu(con->v1.out_connect.protocol_version),
0807                le32_to_cpu(con->v1.in_reply.protocol_version));
0808         con->error_msg = "protocol version mismatch";
0809         return -1;
0810 
0811     case CEPH_MSGR_TAG_BADAUTHORIZER:
0812         con->v1.auth_retry++;
0813         dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
0814              con->v1.auth_retry);
0815         if (con->v1.auth_retry == 2) {
0816             con->error_msg = "connect authorization failure";
0817             return -1;
0818         }
0819         con_out_kvec_reset(con);
0820         ret = prepare_write_connect(con);
0821         if (ret < 0)
0822             return ret;
0823         prepare_read_connect(con);
0824         break;
0825 
0826     case CEPH_MSGR_TAG_RESETSESSION:
0827         /*
0828          * If we connected with a large connect_seq but the peer
0829          * has no record of a session with us (no connection, or
0830          * connect_seq == 0), they will send RESETSESION to indicate
0831          * that they must have reset their session, and may have
0832          * dropped messages.
0833          */
0834         dout("process_connect got RESET peer seq %u\n",
0835              le32_to_cpu(con->v1.in_reply.connect_seq));
0836         pr_info("%s%lld %s session reset\n",
0837             ENTITY_NAME(con->peer_name),
0838             ceph_pr_addr(&con->peer_addr));
0839         ceph_con_reset_session(con);
0840         con_out_kvec_reset(con);
0841         ret = prepare_write_connect(con);
0842         if (ret < 0)
0843             return ret;
0844         prepare_read_connect(con);
0845 
0846         /* Tell ceph about it. */
0847         mutex_unlock(&con->mutex);
0848         if (con->ops->peer_reset)
0849             con->ops->peer_reset(con);
0850         mutex_lock(&con->mutex);
0851         if (con->state != CEPH_CON_S_V1_CONNECT_MSG)
0852             return -EAGAIN;
0853         break;
0854 
0855     case CEPH_MSGR_TAG_RETRY_SESSION:
0856         /*
0857          * If we sent a smaller connect_seq than the peer has, try
0858          * again with a larger value.
0859          */
0860         dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
0861              le32_to_cpu(con->v1.out_connect.connect_seq),
0862              le32_to_cpu(con->v1.in_reply.connect_seq));
0863         con->v1.connect_seq = le32_to_cpu(con->v1.in_reply.connect_seq);
0864         con_out_kvec_reset(con);
0865         ret = prepare_write_connect(con);
0866         if (ret < 0)
0867             return ret;
0868         prepare_read_connect(con);
0869         break;
0870 
0871     case CEPH_MSGR_TAG_RETRY_GLOBAL:
0872         /*
0873          * If we sent a smaller global_seq than the peer has, try
0874          * again with a larger value.
0875          */
0876         dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
0877              con->v1.peer_global_seq,
0878              le32_to_cpu(con->v1.in_reply.global_seq));
0879         ceph_get_global_seq(con->msgr,
0880                     le32_to_cpu(con->v1.in_reply.global_seq));
0881         con_out_kvec_reset(con);
0882         ret = prepare_write_connect(con);
0883         if (ret < 0)
0884             return ret;
0885         prepare_read_connect(con);
0886         break;
0887 
0888     case CEPH_MSGR_TAG_SEQ:
0889     case CEPH_MSGR_TAG_READY:
0890         if (req_feat & ~server_feat) {
0891             pr_err("%s%lld %s protocol feature mismatch,"
0892                    " my required %llx > server's %llx, need %llx\n",
0893                    ENTITY_NAME(con->peer_name),
0894                    ceph_pr_addr(&con->peer_addr),
0895                    req_feat, server_feat, req_feat & ~server_feat);
0896             con->error_msg = "missing required protocol features";
0897             return -1;
0898         }
0899 
0900         WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG);
0901         con->state = CEPH_CON_S_OPEN;
0902         con->v1.auth_retry = 0;    /* we authenticated; clear flag */
0903         con->v1.peer_global_seq =
0904             le32_to_cpu(con->v1.in_reply.global_seq);
0905         con->v1.connect_seq++;
0906         con->peer_features = server_feat;
0907         dout("process_connect got READY gseq %d cseq %d (%d)\n",
0908              con->v1.peer_global_seq,
0909              le32_to_cpu(con->v1.in_reply.connect_seq),
0910              con->v1.connect_seq);
0911         WARN_ON(con->v1.connect_seq !=
0912             le32_to_cpu(con->v1.in_reply.connect_seq));
0913 
0914         if (con->v1.in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
0915             ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX);
0916 
0917         con->delay = 0;      /* reset backoff memory */
0918 
0919         if (con->v1.in_reply.tag == CEPH_MSGR_TAG_SEQ) {
0920             prepare_write_seq(con);
0921             prepare_read_seq(con);
0922         } else {
0923             prepare_read_tag(con);
0924         }
0925         break;
0926 
0927     case CEPH_MSGR_TAG_WAIT:
0928         /*
0929          * If there is a connection race (we are opening
0930          * connections to each other), one of us may just have
0931          * to WAIT.  This shouldn't happen if we are the
0932          * client.
0933          */
0934         con->error_msg = "protocol error, got WAIT as client";
0935         return -1;
0936 
0937     default:
0938         con->error_msg = "protocol error, garbage tag during connect";
0939         return -1;
0940     }
0941     return 0;
0942 }
0943 
0944 /*
0945  * read (part of) an ack
0946  */
0947 static int read_partial_ack(struct ceph_connection *con)
0948 {
0949     int size = sizeof(con->v1.in_temp_ack);
0950     int end = size;
0951 
0952     return read_partial(con, end, size, &con->v1.in_temp_ack);
0953 }
0954 
0955 /*
0956  * We can finally discard anything that's been acked.
0957  */
0958 static void process_ack(struct ceph_connection *con)
0959 {
0960     u64 ack = le64_to_cpu(con->v1.in_temp_ack);
0961 
0962     if (con->v1.in_tag == CEPH_MSGR_TAG_ACK)
0963         ceph_con_discard_sent(con, ack);
0964     else
0965         ceph_con_discard_requeued(con, ack);
0966 
0967     prepare_read_tag(con);
0968 }
0969 
0970 static int read_partial_message_section(struct ceph_connection *con,
0971                     struct kvec *section,
0972                     unsigned int sec_len, u32 *crc)
0973 {
0974     int ret, left;
0975 
0976     BUG_ON(!section);
0977 
0978     while (section->iov_len < sec_len) {
0979         BUG_ON(section->iov_base == NULL);
0980         left = sec_len - section->iov_len;
0981         ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
0982                        section->iov_len, left);
0983         if (ret <= 0)
0984             return ret;
0985         section->iov_len += ret;
0986     }
0987     if (section->iov_len == sec_len)
0988         *crc = crc32c(0, section->iov_base, section->iov_len);
0989 
0990     return 1;
0991 }
0992 
0993 static int read_partial_msg_data(struct ceph_connection *con)
0994 {
0995     struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
0996     bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
0997     struct page *page;
0998     size_t page_offset;
0999     size_t length;
1000     u32 crc = 0;
1001     int ret;
1002 
1003     if (do_datacrc)
1004         crc = con->in_data_crc;
1005     while (cursor->total_resid) {
1006         if (!cursor->resid) {
1007             ceph_msg_data_advance(cursor, 0);
1008             continue;
1009         }
1010 
1011         page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
1012         ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
1013         if (ret <= 0) {
1014             if (do_datacrc)
1015                 con->in_data_crc = crc;
1016 
1017             return ret;
1018         }
1019 
1020         if (do_datacrc)
1021             crc = ceph_crc32c_page(crc, page, page_offset, ret);
1022         ceph_msg_data_advance(cursor, (size_t)ret);
1023     }
1024     if (do_datacrc)
1025         con->in_data_crc = crc;
1026 
1027     return 1;   /* must return > 0 to indicate success */
1028 }
1029 
1030 static int read_partial_msg_data_bounce(struct ceph_connection *con)
1031 {
1032     struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
1033     struct page *page;
1034     size_t off, len;
1035     u32 crc;
1036     int ret;
1037 
1038     if (unlikely(!con->bounce_page)) {
1039         con->bounce_page = alloc_page(GFP_NOIO);
1040         if (!con->bounce_page) {
1041             pr_err("failed to allocate bounce page\n");
1042             return -ENOMEM;
1043         }
1044     }
1045 
1046     crc = con->in_data_crc;
1047     while (cursor->total_resid) {
1048         if (!cursor->resid) {
1049             ceph_msg_data_advance(cursor, 0);
1050             continue;
1051         }
1052 
1053         page = ceph_msg_data_next(cursor, &off, &len, NULL);
1054         ret = ceph_tcp_recvpage(con->sock, con->bounce_page, 0, len);
1055         if (ret <= 0) {
1056             con->in_data_crc = crc;
1057             return ret;
1058         }
1059 
1060         crc = crc32c(crc, page_address(con->bounce_page), ret);
1061         memcpy_to_page(page, off, page_address(con->bounce_page), ret);
1062 
1063         ceph_msg_data_advance(cursor, ret);
1064     }
1065     con->in_data_crc = crc;
1066 
1067     return 1;   /* must return > 0 to indicate success */
1068 }
1069 
1070 /*
1071  * read (part of) a message.
1072  */
1073 static int read_partial_message(struct ceph_connection *con)
1074 {
1075     struct ceph_msg *m = con->in_msg;
1076     int size;
1077     int end;
1078     int ret;
1079     unsigned int front_len, middle_len, data_len;
1080     bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1081     bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
1082     u64 seq;
1083     u32 crc;
1084 
1085     dout("read_partial_message con %p msg %p\n", con, m);
1086 
1087     /* header */
1088     size = sizeof(con->v1.in_hdr);
1089     end = size;
1090     ret = read_partial(con, end, size, &con->v1.in_hdr);
1091     if (ret <= 0)
1092         return ret;
1093 
1094     crc = crc32c(0, &con->v1.in_hdr, offsetof(struct ceph_msg_header, crc));
1095     if (cpu_to_le32(crc) != con->v1.in_hdr.crc) {
1096         pr_err("read_partial_message bad hdr crc %u != expected %u\n",
1097                crc, con->v1.in_hdr.crc);
1098         return -EBADMSG;
1099     }
1100 
1101     front_len = le32_to_cpu(con->v1.in_hdr.front_len);
1102     if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1103         return -EIO;
1104     middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
1105     if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
1106         return -EIO;
1107     data_len = le32_to_cpu(con->v1.in_hdr.data_len);
1108     if (data_len > CEPH_MSG_MAX_DATA_LEN)
1109         return -EIO;
1110 
1111     /* verify seq# */
1112     seq = le64_to_cpu(con->v1.in_hdr.seq);
1113     if ((s64)seq - (s64)con->in_seq < 1) {
1114         pr_info("skipping %s%lld %s seq %lld expected %lld\n",
1115             ENTITY_NAME(con->peer_name),
1116             ceph_pr_addr(&con->peer_addr),
1117             seq, con->in_seq + 1);
1118         con->v1.in_base_pos = -front_len - middle_len - data_len -
1119                       sizeof_footer(con);
1120         con->v1.in_tag = CEPH_MSGR_TAG_READY;
1121         return 1;
1122     } else if ((s64)seq - (s64)con->in_seq > 1) {
1123         pr_err("read_partial_message bad seq %lld expected %lld\n",
1124                seq, con->in_seq + 1);
1125         con->error_msg = "bad message sequence # for incoming message";
1126         return -EBADE;
1127     }
1128 
1129     /* allocate message? */
1130     if (!con->in_msg) {
1131         int skip = 0;
1132 
1133         dout("got hdr type %d front %d data %d\n", con->v1.in_hdr.type,
1134              front_len, data_len);
1135         ret = ceph_con_in_msg_alloc(con, &con->v1.in_hdr, &skip);
1136         if (ret < 0)
1137             return ret;
1138 
1139         BUG_ON((!con->in_msg) ^ skip);
1140         if (skip) {
1141             /* skip this message */
1142             dout("alloc_msg said skip message\n");
1143             con->v1.in_base_pos = -front_len - middle_len -
1144                           data_len - sizeof_footer(con);
1145             con->v1.in_tag = CEPH_MSGR_TAG_READY;
1146             con->in_seq++;
1147             return 1;
1148         }
1149 
1150         BUG_ON(!con->in_msg);
1151         BUG_ON(con->in_msg->con != con);
1152         m = con->in_msg;
1153         m->front.iov_len = 0;    /* haven't read it yet */
1154         if (m->middle)
1155             m->middle->vec.iov_len = 0;
1156 
1157         /* prepare for data payload, if any */
1158 
1159         if (data_len)
1160             prepare_message_data(con->in_msg, data_len);
1161     }
1162 
1163     /* front */
1164     ret = read_partial_message_section(con, &m->front, front_len,
1165                        &con->in_front_crc);
1166     if (ret <= 0)
1167         return ret;
1168 
1169     /* middle */
1170     if (m->middle) {
1171         ret = read_partial_message_section(con, &m->middle->vec,
1172                            middle_len,
1173                            &con->in_middle_crc);
1174         if (ret <= 0)
1175             return ret;
1176     }
1177 
1178     /* (page) data */
1179     if (data_len) {
1180         if (!m->num_data_items)
1181             return -EIO;
1182 
1183         if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
1184             ret = read_partial_msg_data_bounce(con);
1185         else
1186             ret = read_partial_msg_data(con);
1187         if (ret <= 0)
1188             return ret;
1189     }
1190 
1191     /* footer */
1192     size = sizeof_footer(con);
1193     end += size;
1194     ret = read_partial(con, end, size, &m->footer);
1195     if (ret <= 0)
1196         return ret;
1197 
1198     if (!need_sign) {
1199         m->footer.flags = m->old_footer.flags;
1200         m->footer.sig = 0;
1201     }
1202 
1203     dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1204          m, front_len, m->footer.front_crc, middle_len,
1205          m->footer.middle_crc, data_len, m->footer.data_crc);
1206 
1207     /* crc ok? */
1208     if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
1209         pr_err("read_partial_message %p front crc %u != exp. %u\n",
1210                m, con->in_front_crc, m->footer.front_crc);
1211         return -EBADMSG;
1212     }
1213     if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
1214         pr_err("read_partial_message %p middle crc %u != exp %u\n",
1215                m, con->in_middle_crc, m->footer.middle_crc);
1216         return -EBADMSG;
1217     }
1218     if (do_datacrc &&
1219         (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
1220         con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
1221         pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
1222                con->in_data_crc, le32_to_cpu(m->footer.data_crc));
1223         return -EBADMSG;
1224     }
1225 
1226     if (need_sign && con->ops->check_message_signature &&
1227         con->ops->check_message_signature(m)) {
1228         pr_err("read_partial_message %p signature check failed\n", m);
1229         return -EBADMSG;
1230     }
1231 
1232     return 1; /* done! */
1233 }
1234 
1235 static int read_keepalive_ack(struct ceph_connection *con)
1236 {
1237     struct ceph_timespec ceph_ts;
1238     size_t size = sizeof(ceph_ts);
1239     int ret = read_partial(con, size, size, &ceph_ts);
1240     if (ret <= 0)
1241         return ret;
1242     ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts);
1243     prepare_read_tag(con);
1244     return 1;
1245 }
1246 
1247 /*
1248  * Read what we can from the socket.
1249  */
1250 int ceph_con_v1_try_read(struct ceph_connection *con)
1251 {
1252     int ret = -1;
1253 
1254 more:
1255     dout("try_read start %p state %d\n", con, con->state);
1256     if (con->state != CEPH_CON_S_V1_BANNER &&
1257         con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1258         con->state != CEPH_CON_S_OPEN)
1259         return 0;
1260 
1261     BUG_ON(!con->sock);
1262 
1263     dout("try_read tag %d in_base_pos %d\n", con->v1.in_tag,
1264          con->v1.in_base_pos);
1265 
1266     if (con->state == CEPH_CON_S_V1_BANNER) {
1267         ret = read_partial_banner(con);
1268         if (ret <= 0)
1269             goto out;
1270         ret = process_banner(con);
1271         if (ret < 0)
1272             goto out;
1273 
1274         con->state = CEPH_CON_S_V1_CONNECT_MSG;
1275 
1276         /*
1277          * Received banner is good, exchange connection info.
1278          * Do not reset out_kvec, as sending our banner raced
1279          * with receiving peer banner after connect completed.
1280          */
1281         ret = prepare_write_connect(con);
1282         if (ret < 0)
1283             goto out;
1284         prepare_read_connect(con);
1285 
1286         /* Send connection info before awaiting response */
1287         goto out;
1288     }
1289 
1290     if (con->state == CEPH_CON_S_V1_CONNECT_MSG) {
1291         ret = read_partial_connect(con);
1292         if (ret <= 0)
1293             goto out;
1294         ret = process_connect(con);
1295         if (ret < 0)
1296             goto out;
1297         goto more;
1298     }
1299 
1300     WARN_ON(con->state != CEPH_CON_S_OPEN);
1301 
1302     if (con->v1.in_base_pos < 0) {
1303         /*
1304          * skipping + discarding content.
1305          */
1306         ret = ceph_tcp_recvmsg(con->sock, NULL, -con->v1.in_base_pos);
1307         if (ret <= 0)
1308             goto out;
1309         dout("skipped %d / %d bytes\n", ret, -con->v1.in_base_pos);
1310         con->v1.in_base_pos += ret;
1311         if (con->v1.in_base_pos)
1312             goto more;
1313     }
1314     if (con->v1.in_tag == CEPH_MSGR_TAG_READY) {
1315         /*
1316          * what's next?
1317          */
1318         ret = ceph_tcp_recvmsg(con->sock, &con->v1.in_tag, 1);
1319         if (ret <= 0)
1320             goto out;
1321         dout("try_read got tag %d\n", con->v1.in_tag);
1322         switch (con->v1.in_tag) {
1323         case CEPH_MSGR_TAG_MSG:
1324             prepare_read_message(con);
1325             break;
1326         case CEPH_MSGR_TAG_ACK:
1327             prepare_read_ack(con);
1328             break;
1329         case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
1330             prepare_read_keepalive_ack(con);
1331             break;
1332         case CEPH_MSGR_TAG_CLOSE:
1333             ceph_con_close_socket(con);
1334             con->state = CEPH_CON_S_CLOSED;
1335             goto out;
1336         default:
1337             goto bad_tag;
1338         }
1339     }
1340     if (con->v1.in_tag == CEPH_MSGR_TAG_MSG) {
1341         ret = read_partial_message(con);
1342         if (ret <= 0) {
1343             switch (ret) {
1344             case -EBADMSG:
1345                 con->error_msg = "bad crc/signature";
1346                 fallthrough;
1347             case -EBADE:
1348                 ret = -EIO;
1349                 break;
1350             case -EIO:
1351                 con->error_msg = "io error";
1352                 break;
1353             }
1354             goto out;
1355         }
1356         if (con->v1.in_tag == CEPH_MSGR_TAG_READY)
1357             goto more;
1358         ceph_con_process_message(con);
1359         if (con->state == CEPH_CON_S_OPEN)
1360             prepare_read_tag(con);
1361         goto more;
1362     }
1363     if (con->v1.in_tag == CEPH_MSGR_TAG_ACK ||
1364         con->v1.in_tag == CEPH_MSGR_TAG_SEQ) {
1365         /*
1366          * the final handshake seq exchange is semantically
1367          * equivalent to an ACK
1368          */
1369         ret = read_partial_ack(con);
1370         if (ret <= 0)
1371             goto out;
1372         process_ack(con);
1373         goto more;
1374     }
1375     if (con->v1.in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
1376         ret = read_keepalive_ack(con);
1377         if (ret <= 0)
1378             goto out;
1379         goto more;
1380     }
1381 
1382 out:
1383     dout("try_read done on %p ret %d\n", con, ret);
1384     return ret;
1385 
1386 bad_tag:
1387     pr_err("try_read bad tag %d\n", con->v1.in_tag);
1388     con->error_msg = "protocol error, garbage tag";
1389     ret = -1;
1390     goto out;
1391 }
1392 
1393 /*
1394  * Write something to the socket.  Called in a worker thread when the
1395  * socket appears to be writeable and we have something ready to send.
1396  */
1397 int ceph_con_v1_try_write(struct ceph_connection *con)
1398 {
1399     int ret = 1;
1400 
1401     dout("try_write start %p state %d\n", con, con->state);
1402     if (con->state != CEPH_CON_S_PREOPEN &&
1403         con->state != CEPH_CON_S_V1_BANNER &&
1404         con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1405         con->state != CEPH_CON_S_OPEN)
1406         return 0;
1407 
1408     /* open the socket first? */
1409     if (con->state == CEPH_CON_S_PREOPEN) {
1410         BUG_ON(con->sock);
1411         con->state = CEPH_CON_S_V1_BANNER;
1412 
1413         con_out_kvec_reset(con);
1414         prepare_write_banner(con);
1415         prepare_read_banner(con);
1416 
1417         BUG_ON(con->in_msg);
1418         con->v1.in_tag = CEPH_MSGR_TAG_READY;
1419         dout("try_write initiating connect on %p new state %d\n",
1420              con, con->state);
1421         ret = ceph_tcp_connect(con);
1422         if (ret < 0) {
1423             con->error_msg = "connect error";
1424             goto out;
1425         }
1426     }
1427 
1428 more:
1429     dout("try_write out_kvec_bytes %d\n", con->v1.out_kvec_bytes);
1430     BUG_ON(!con->sock);
1431 
1432     /* kvec data queued? */
1433     if (con->v1.out_kvec_left) {
1434         ret = write_partial_kvec(con);
1435         if (ret <= 0)
1436             goto out;
1437     }
1438     if (con->v1.out_skip) {
1439         ret = write_partial_skip(con);
1440         if (ret <= 0)
1441             goto out;
1442     }
1443 
1444     /* msg pages? */
1445     if (con->out_msg) {
1446         if (con->v1.out_msg_done) {
1447             ceph_msg_put(con->out_msg);
1448             con->out_msg = NULL;   /* we're done with this one */
1449             goto do_next;
1450         }
1451 
1452         ret = write_partial_message_data(con);
1453         if (ret == 1)
1454             goto more;  /* we need to send the footer, too! */
1455         if (ret == 0)
1456             goto out;
1457         if (ret < 0) {
1458             dout("try_write write_partial_message_data err %d\n",
1459                  ret);
1460             goto out;
1461         }
1462     }
1463 
1464 do_next:
1465     if (con->state == CEPH_CON_S_OPEN) {
1466         if (ceph_con_flag_test_and_clear(con,
1467                 CEPH_CON_F_KEEPALIVE_PENDING)) {
1468             prepare_write_keepalive(con);
1469             goto more;
1470         }
1471         /* is anything else pending? */
1472         if (!list_empty(&con->out_queue)) {
1473             prepare_write_message(con);
1474             goto more;
1475         }
1476         if (con->in_seq > con->in_seq_acked) {
1477             prepare_write_ack(con);
1478             goto more;
1479         }
1480     }
1481 
1482     /* Nothing to do! */
1483     ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
1484     dout("try_write nothing else to write.\n");
1485     ret = 0;
1486 out:
1487     dout("try_write done on %p ret %d\n", con, ret);
1488     return ret;
1489 }
1490 
1491 void ceph_con_v1_revoke(struct ceph_connection *con)
1492 {
1493     struct ceph_msg *msg = con->out_msg;
1494 
1495     WARN_ON(con->v1.out_skip);
1496     /* footer */
1497     if (con->v1.out_msg_done) {
1498         con->v1.out_skip += con_out_kvec_skip(con);
1499     } else {
1500         WARN_ON(!msg->data_length);
1501         con->v1.out_skip += sizeof_footer(con);
1502     }
1503     /* data, middle, front */
1504     if (msg->data_length)
1505         con->v1.out_skip += msg->cursor.total_resid;
1506     if (msg->middle)
1507         con->v1.out_skip += con_out_kvec_skip(con);
1508     con->v1.out_skip += con_out_kvec_skip(con);
1509 
1510     dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
1511          con->v1.out_kvec_bytes, con->v1.out_skip);
1512 }
1513 
1514 void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
1515 {
1516     unsigned int front_len = le32_to_cpu(con->v1.in_hdr.front_len);
1517     unsigned int middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
1518     unsigned int data_len = le32_to_cpu(con->v1.in_hdr.data_len);
1519 
1520     /* skip rest of message */
1521     con->v1.in_base_pos = con->v1.in_base_pos -
1522             sizeof(struct ceph_msg_header) -
1523             front_len -
1524             middle_len -
1525             data_len -
1526             sizeof(struct ceph_msg_footer);
1527 
1528     con->v1.in_tag = CEPH_MSGR_TAG_READY;
1529     con->in_seq++;
1530 
1531     dout("%s con %p in_base_pos %d\n", __func__, con, con->v1.in_base_pos);
1532 }
1533 
1534 bool ceph_con_v1_opened(struct ceph_connection *con)
1535 {
1536     return con->v1.connect_seq;
1537 }
1538 
1539 void ceph_con_v1_reset_session(struct ceph_connection *con)
1540 {
1541     con->v1.connect_seq = 0;
1542     con->v1.peer_global_seq = 0;
1543 }
1544 
1545 void ceph_con_v1_reset_protocol(struct ceph_connection *con)
1546 {
1547     con->v1.out_skip = 0;
1548 }