Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0
0002 
0003 #include <linux/ceph/ceph_debug.h>
0004 
0005 #include <linux/module.h>
0006 #include <linux/err.h>
0007 #include <linux/highmem.h>
0008 #include <linux/mm.h>
0009 #include <linux/pagemap.h>
0010 #include <linux/slab.h>
0011 #include <linux/uaccess.h>
0012 #ifdef CONFIG_BLOCK
0013 #include <linux/bio.h>
0014 #endif
0015 
0016 #include <linux/ceph/ceph_features.h>
0017 #include <linux/ceph/libceph.h>
0018 #include <linux/ceph/osd_client.h>
0019 #include <linux/ceph/messenger.h>
0020 #include <linux/ceph/decode.h>
0021 #include <linux/ceph/auth.h>
0022 #include <linux/ceph/pagelist.h>
0023 #include <linux/ceph/striper.h>
0024 
0025 #define OSD_OPREPLY_FRONT_LEN   512
0026 
0027 static struct kmem_cache    *ceph_osd_request_cache;
0028 
0029 static const struct ceph_connection_operations osd_con_ops;
0030 
0031 /*
0032  * Implement client access to distributed object storage cluster.
0033  *
0034  * All data objects are stored within a cluster/cloud of OSDs, or
0035  * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
0036  * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
0037  * remote daemons serving up and coordinating consistent and safe
0038  * access to storage.
0039  *
0040  * Cluster membership and the mapping of data objects onto storage devices
0041  * are described by the osd map.
0042  *
0043  * We keep track of pending OSD requests (read, write), resubmit
0044  * requests to different OSDs when the cluster topology/data layout
0045  * change, or retry the affected requests when the communications
0046  * channel with an OSD is reset.
0047  */
0048 
0049 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
0050 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
0051 static void link_linger(struct ceph_osd *osd,
0052             struct ceph_osd_linger_request *lreq);
0053 static void unlink_linger(struct ceph_osd *osd,
0054               struct ceph_osd_linger_request *lreq);
0055 static void clear_backoffs(struct ceph_osd *osd);
0056 
0057 #if 1
0058 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
0059 {
0060     bool wrlocked = true;
0061 
0062     if (unlikely(down_read_trylock(sem))) {
0063         wrlocked = false;
0064         up_read(sem);
0065     }
0066 
0067     return wrlocked;
0068 }
0069 static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
0070 {
0071     WARN_ON(!rwsem_is_locked(&osdc->lock));
0072 }
0073 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
0074 {
0075     WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
0076 }
0077 static inline void verify_osd_locked(struct ceph_osd *osd)
0078 {
0079     struct ceph_osd_client *osdc = osd->o_osdc;
0080 
0081     WARN_ON(!(mutex_is_locked(&osd->lock) &&
0082           rwsem_is_locked(&osdc->lock)) &&
0083         !rwsem_is_wrlocked(&osdc->lock));
0084 }
0085 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq)
0086 {
0087     WARN_ON(!mutex_is_locked(&lreq->lock));
0088 }
0089 #else
0090 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
0091 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
0092 static inline void verify_osd_locked(struct ceph_osd *osd) { }
0093 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { }
0094 #endif
0095 
0096 /*
0097  * calculate the mapping of a file extent onto an object, and fill out the
0098  * request accordingly.  shorten extent as necessary if it crosses an
0099  * object boundary.
0100  *
0101  * fill osd op in request message.
0102  */
0103 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
0104             u64 *objnum, u64 *objoff, u64 *objlen)
0105 {
0106     u64 orig_len = *plen;
0107     u32 xlen;
0108 
0109     /* object extent? */
0110     ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
0111                       objoff, &xlen);
0112     *objlen = xlen;
0113     if (*objlen < orig_len) {
0114         *plen = *objlen;
0115         dout(" skipping last %llu, final file extent %llu~%llu\n",
0116              orig_len - *plen, off, *plen);
0117     }
0118 
0119     dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
0120     return 0;
0121 }
0122 
0123 static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
0124 {
0125     memset(osd_data, 0, sizeof (*osd_data));
0126     osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
0127 }
0128 
0129 /*
0130  * Consumes @pages if @own_pages is true.
0131  */
0132 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
0133             struct page **pages, u64 length, u32 alignment,
0134             bool pages_from_pool, bool own_pages)
0135 {
0136     osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
0137     osd_data->pages = pages;
0138     osd_data->length = length;
0139     osd_data->alignment = alignment;
0140     osd_data->pages_from_pool = pages_from_pool;
0141     osd_data->own_pages = own_pages;
0142 }
0143 
0144 /*
0145  * Consumes a ref on @pagelist.
0146  */
0147 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
0148             struct ceph_pagelist *pagelist)
0149 {
0150     osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
0151     osd_data->pagelist = pagelist;
0152 }
0153 
0154 #ifdef CONFIG_BLOCK
0155 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
0156                    struct ceph_bio_iter *bio_pos,
0157                    u32 bio_length)
0158 {
0159     osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
0160     osd_data->bio_pos = *bio_pos;
0161     osd_data->bio_length = bio_length;
0162 }
0163 #endif /* CONFIG_BLOCK */
0164 
0165 static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
0166                      struct ceph_bvec_iter *bvec_pos,
0167                      u32 num_bvecs)
0168 {
0169     osd_data->type = CEPH_OSD_DATA_TYPE_BVECS;
0170     osd_data->bvec_pos = *bvec_pos;
0171     osd_data->num_bvecs = num_bvecs;
0172 }
0173 
0174 static struct ceph_osd_data *
0175 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
0176 {
0177     BUG_ON(which >= osd_req->r_num_ops);
0178 
0179     return &osd_req->r_ops[which].raw_data_in;
0180 }
0181 
0182 struct ceph_osd_data *
0183 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
0184             unsigned int which)
0185 {
0186     return osd_req_op_data(osd_req, which, extent, osd_data);
0187 }
0188 EXPORT_SYMBOL(osd_req_op_extent_osd_data);
0189 
0190 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
0191             unsigned int which, struct page **pages,
0192             u64 length, u32 alignment,
0193             bool pages_from_pool, bool own_pages)
0194 {
0195     struct ceph_osd_data *osd_data;
0196 
0197     osd_data = osd_req_op_raw_data_in(osd_req, which);
0198     ceph_osd_data_pages_init(osd_data, pages, length, alignment,
0199                 pages_from_pool, own_pages);
0200 }
0201 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
0202 
0203 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
0204             unsigned int which, struct page **pages,
0205             u64 length, u32 alignment,
0206             bool pages_from_pool, bool own_pages)
0207 {
0208     struct ceph_osd_data *osd_data;
0209 
0210     osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
0211     ceph_osd_data_pages_init(osd_data, pages, length, alignment,
0212                 pages_from_pool, own_pages);
0213 }
0214 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
0215 
0216 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
0217             unsigned int which, struct ceph_pagelist *pagelist)
0218 {
0219     struct ceph_osd_data *osd_data;
0220 
0221     osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
0222     ceph_osd_data_pagelist_init(osd_data, pagelist);
0223 }
0224 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
0225 
0226 #ifdef CONFIG_BLOCK
0227 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
0228                     unsigned int which,
0229                     struct ceph_bio_iter *bio_pos,
0230                     u32 bio_length)
0231 {
0232     struct ceph_osd_data *osd_data;
0233 
0234     osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
0235     ceph_osd_data_bio_init(osd_data, bio_pos, bio_length);
0236 }
0237 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
0238 #endif /* CONFIG_BLOCK */
0239 
0240 void osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request *osd_req,
0241                       unsigned int which,
0242                       struct bio_vec *bvecs, u32 num_bvecs,
0243                       u32 bytes)
0244 {
0245     struct ceph_osd_data *osd_data;
0246     struct ceph_bvec_iter it = {
0247         .bvecs = bvecs,
0248         .iter = { .bi_size = bytes },
0249     };
0250 
0251     osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
0252     ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
0253 }
0254 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvecs);
0255 
0256 void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
0257                      unsigned int which,
0258                      struct ceph_bvec_iter *bvec_pos)
0259 {
0260     struct ceph_osd_data *osd_data;
0261 
0262     osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
0263     ceph_osd_data_bvecs_init(osd_data, bvec_pos, 0);
0264 }
0265 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
0266 
0267 static void osd_req_op_cls_request_info_pagelist(
0268             struct ceph_osd_request *osd_req,
0269             unsigned int which, struct ceph_pagelist *pagelist)
0270 {
0271     struct ceph_osd_data *osd_data;
0272 
0273     osd_data = osd_req_op_data(osd_req, which, cls, request_info);
0274     ceph_osd_data_pagelist_init(osd_data, pagelist);
0275 }
0276 
0277 void osd_req_op_cls_request_data_pagelist(
0278             struct ceph_osd_request *osd_req,
0279             unsigned int which, struct ceph_pagelist *pagelist)
0280 {
0281     struct ceph_osd_data *osd_data;
0282 
0283     osd_data = osd_req_op_data(osd_req, which, cls, request_data);
0284     ceph_osd_data_pagelist_init(osd_data, pagelist);
0285     osd_req->r_ops[which].cls.indata_len += pagelist->length;
0286     osd_req->r_ops[which].indata_len += pagelist->length;
0287 }
0288 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
0289 
0290 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
0291             unsigned int which, struct page **pages, u64 length,
0292             u32 alignment, bool pages_from_pool, bool own_pages)
0293 {
0294     struct ceph_osd_data *osd_data;
0295 
0296     osd_data = osd_req_op_data(osd_req, which, cls, request_data);
0297     ceph_osd_data_pages_init(osd_data, pages, length, alignment,
0298                 pages_from_pool, own_pages);
0299     osd_req->r_ops[which].cls.indata_len += length;
0300     osd_req->r_ops[which].indata_len += length;
0301 }
0302 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
0303 
0304 void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
0305                        unsigned int which,
0306                        struct bio_vec *bvecs, u32 num_bvecs,
0307                        u32 bytes)
0308 {
0309     struct ceph_osd_data *osd_data;
0310     struct ceph_bvec_iter it = {
0311         .bvecs = bvecs,
0312         .iter = { .bi_size = bytes },
0313     };
0314 
0315     osd_data = osd_req_op_data(osd_req, which, cls, request_data);
0316     ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
0317     osd_req->r_ops[which].cls.indata_len += bytes;
0318     osd_req->r_ops[which].indata_len += bytes;
0319 }
0320 EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);
0321 
0322 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
0323             unsigned int which, struct page **pages, u64 length,
0324             u32 alignment, bool pages_from_pool, bool own_pages)
0325 {
0326     struct ceph_osd_data *osd_data;
0327 
0328     osd_data = osd_req_op_data(osd_req, which, cls, response_data);
0329     ceph_osd_data_pages_init(osd_data, pages, length, alignment,
0330                 pages_from_pool, own_pages);
0331 }
0332 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
0333 
0334 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
0335 {
0336     switch (osd_data->type) {
0337     case CEPH_OSD_DATA_TYPE_NONE:
0338         return 0;
0339     case CEPH_OSD_DATA_TYPE_PAGES:
0340         return osd_data->length;
0341     case CEPH_OSD_DATA_TYPE_PAGELIST:
0342         return (u64)osd_data->pagelist->length;
0343 #ifdef CONFIG_BLOCK
0344     case CEPH_OSD_DATA_TYPE_BIO:
0345         return (u64)osd_data->bio_length;
0346 #endif /* CONFIG_BLOCK */
0347     case CEPH_OSD_DATA_TYPE_BVECS:
0348         return osd_data->bvec_pos.iter.bi_size;
0349     default:
0350         WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
0351         return 0;
0352     }
0353 }
0354 
0355 static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
0356 {
0357     if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
0358         int num_pages;
0359 
0360         num_pages = calc_pages_for((u64)osd_data->alignment,
0361                         (u64)osd_data->length);
0362         ceph_release_page_vector(osd_data->pages, num_pages);
0363     } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
0364         ceph_pagelist_release(osd_data->pagelist);
0365     }
0366     ceph_osd_data_init(osd_data);
0367 }
0368 
0369 static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
0370             unsigned int which)
0371 {
0372     struct ceph_osd_req_op *op;
0373 
0374     BUG_ON(which >= osd_req->r_num_ops);
0375     op = &osd_req->r_ops[which];
0376 
0377     switch (op->op) {
0378     case CEPH_OSD_OP_READ:
0379     case CEPH_OSD_OP_WRITE:
0380     case CEPH_OSD_OP_WRITEFULL:
0381         ceph_osd_data_release(&op->extent.osd_data);
0382         break;
0383     case CEPH_OSD_OP_CALL:
0384         ceph_osd_data_release(&op->cls.request_info);
0385         ceph_osd_data_release(&op->cls.request_data);
0386         ceph_osd_data_release(&op->cls.response_data);
0387         break;
0388     case CEPH_OSD_OP_SETXATTR:
0389     case CEPH_OSD_OP_CMPXATTR:
0390         ceph_osd_data_release(&op->xattr.osd_data);
0391         break;
0392     case CEPH_OSD_OP_STAT:
0393         ceph_osd_data_release(&op->raw_data_in);
0394         break;
0395     case CEPH_OSD_OP_NOTIFY_ACK:
0396         ceph_osd_data_release(&op->notify_ack.request_data);
0397         break;
0398     case CEPH_OSD_OP_NOTIFY:
0399         ceph_osd_data_release(&op->notify.request_data);
0400         ceph_osd_data_release(&op->notify.response_data);
0401         break;
0402     case CEPH_OSD_OP_LIST_WATCHERS:
0403         ceph_osd_data_release(&op->list_watchers.response_data);
0404         break;
0405     case CEPH_OSD_OP_COPY_FROM2:
0406         ceph_osd_data_release(&op->copy_from.osd_data);
0407         break;
0408     default:
0409         break;
0410     }
0411 }
0412 
0413 /*
0414  * Assumes @t is zero-initialized.
0415  */
0416 static void target_init(struct ceph_osd_request_target *t)
0417 {
0418     ceph_oid_init(&t->base_oid);
0419     ceph_oloc_init(&t->base_oloc);
0420     ceph_oid_init(&t->target_oid);
0421     ceph_oloc_init(&t->target_oloc);
0422 
0423     ceph_osds_init(&t->acting);
0424     ceph_osds_init(&t->up);
0425     t->size = -1;
0426     t->min_size = -1;
0427 
0428     t->osd = CEPH_HOMELESS_OSD;
0429 }
0430 
0431 static void target_copy(struct ceph_osd_request_target *dest,
0432             const struct ceph_osd_request_target *src)
0433 {
0434     ceph_oid_copy(&dest->base_oid, &src->base_oid);
0435     ceph_oloc_copy(&dest->base_oloc, &src->base_oloc);
0436     ceph_oid_copy(&dest->target_oid, &src->target_oid);
0437     ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
0438 
0439     dest->pgid = src->pgid; /* struct */
0440     dest->spgid = src->spgid; /* struct */
0441     dest->pg_num = src->pg_num;
0442     dest->pg_num_mask = src->pg_num_mask;
0443     ceph_osds_copy(&dest->acting, &src->acting);
0444     ceph_osds_copy(&dest->up, &src->up);
0445     dest->size = src->size;
0446     dest->min_size = src->min_size;
0447     dest->sort_bitwise = src->sort_bitwise;
0448     dest->recovery_deletes = src->recovery_deletes;
0449 
0450     dest->flags = src->flags;
0451     dest->used_replica = src->used_replica;
0452     dest->paused = src->paused;
0453 
0454     dest->epoch = src->epoch;
0455     dest->last_force_resend = src->last_force_resend;
0456 
0457     dest->osd = src->osd;
0458 }
0459 
0460 static void target_destroy(struct ceph_osd_request_target *t)
0461 {
0462     ceph_oid_destroy(&t->base_oid);
0463     ceph_oloc_destroy(&t->base_oloc);
0464     ceph_oid_destroy(&t->target_oid);
0465     ceph_oloc_destroy(&t->target_oloc);
0466 }
0467 
0468 /*
0469  * requests
0470  */
0471 static void request_release_checks(struct ceph_osd_request *req)
0472 {
0473     WARN_ON(!RB_EMPTY_NODE(&req->r_node));
0474     WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
0475     WARN_ON(!list_empty(&req->r_private_item));
0476     WARN_ON(req->r_osd);
0477 }
0478 
0479 static void ceph_osdc_release_request(struct kref *kref)
0480 {
0481     struct ceph_osd_request *req = container_of(kref,
0482                         struct ceph_osd_request, r_kref);
0483     unsigned int which;
0484 
0485     dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
0486          req->r_request, req->r_reply);
0487     request_release_checks(req);
0488 
0489     if (req->r_request)
0490         ceph_msg_put(req->r_request);
0491     if (req->r_reply)
0492         ceph_msg_put(req->r_reply);
0493 
0494     for (which = 0; which < req->r_num_ops; which++)
0495         osd_req_op_data_release(req, which);
0496 
0497     target_destroy(&req->r_t);
0498     ceph_put_snap_context(req->r_snapc);
0499 
0500     if (req->r_mempool)
0501         mempool_free(req, req->r_osdc->req_mempool);
0502     else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
0503         kmem_cache_free(ceph_osd_request_cache, req);
0504     else
0505         kfree(req);
0506 }
0507 
0508 void ceph_osdc_get_request(struct ceph_osd_request *req)
0509 {
0510     dout("%s %p (was %d)\n", __func__, req,
0511          kref_read(&req->r_kref));
0512     kref_get(&req->r_kref);
0513 }
0514 EXPORT_SYMBOL(ceph_osdc_get_request);
0515 
0516 void ceph_osdc_put_request(struct ceph_osd_request *req)
0517 {
0518     if (req) {
0519         dout("%s %p (was %d)\n", __func__, req,
0520              kref_read(&req->r_kref));
0521         kref_put(&req->r_kref, ceph_osdc_release_request);
0522     }
0523 }
0524 EXPORT_SYMBOL(ceph_osdc_put_request);
0525 
0526 static void request_init(struct ceph_osd_request *req)
0527 {
0528     /* req only, each op is zeroed in osd_req_op_init() */
0529     memset(req, 0, sizeof(*req));
0530 
0531     kref_init(&req->r_kref);
0532     init_completion(&req->r_completion);
0533     RB_CLEAR_NODE(&req->r_node);
0534     RB_CLEAR_NODE(&req->r_mc_node);
0535     INIT_LIST_HEAD(&req->r_private_item);
0536 
0537     target_init(&req->r_t);
0538 }
0539 
0540 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
0541                            struct ceph_snap_context *snapc,
0542                            unsigned int num_ops,
0543                            bool use_mempool,
0544                            gfp_t gfp_flags)
0545 {
0546     struct ceph_osd_request *req;
0547 
0548     if (use_mempool) {
0549         BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
0550         req = mempool_alloc(osdc->req_mempool, gfp_flags);
0551     } else if (num_ops <= CEPH_OSD_SLAB_OPS) {
0552         req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
0553     } else {
0554         BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
0555         req = kmalloc(struct_size(req, r_ops, num_ops), gfp_flags);
0556     }
0557     if (unlikely(!req))
0558         return NULL;
0559 
0560     request_init(req);
0561     req->r_osdc = osdc;
0562     req->r_mempool = use_mempool;
0563     req->r_num_ops = num_ops;
0564     req->r_snapid = CEPH_NOSNAP;
0565     req->r_snapc = ceph_get_snap_context(snapc);
0566 
0567     dout("%s req %p\n", __func__, req);
0568     return req;
0569 }
0570 EXPORT_SYMBOL(ceph_osdc_alloc_request);
0571 
0572 static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
0573 {
0574     return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
0575 }
0576 
0577 static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp,
0578                       int num_request_data_items,
0579                       int num_reply_data_items)
0580 {
0581     struct ceph_osd_client *osdc = req->r_osdc;
0582     struct ceph_msg *msg;
0583     int msg_size;
0584 
0585     WARN_ON(req->r_request || req->r_reply);
0586     WARN_ON(ceph_oid_empty(&req->r_base_oid));
0587     WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
0588 
0589     /* create request message */
0590     msg_size = CEPH_ENCODING_START_BLK_LEN +
0591             CEPH_PGID_ENCODING_LEN + 1; /* spgid */
0592     msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */
0593     msg_size += CEPH_ENCODING_START_BLK_LEN +
0594             sizeof(struct ceph_osd_reqid); /* reqid */
0595     msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */
0596     msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */
0597     msg_size += CEPH_ENCODING_START_BLK_LEN +
0598             ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
0599     msg_size += 4 + req->r_base_oid.name_len; /* oid */
0600     msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
0601     msg_size += 8; /* snapid */
0602     msg_size += 8; /* snap_seq */
0603     msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
0604     msg_size += 4 + 8; /* retry_attempt, features */
0605 
0606     if (req->r_mempool)
0607         msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size,
0608                        num_request_data_items);
0609     else
0610         msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size,
0611                     num_request_data_items, gfp, true);
0612     if (!msg)
0613         return -ENOMEM;
0614 
0615     memset(msg->front.iov_base, 0, msg->front.iov_len);
0616     req->r_request = msg;
0617 
0618     /* create reply message */
0619     msg_size = OSD_OPREPLY_FRONT_LEN;
0620     msg_size += req->r_base_oid.name_len;
0621     msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
0622 
0623     if (req->r_mempool)
0624         msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size,
0625                        num_reply_data_items);
0626     else
0627         msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size,
0628                     num_reply_data_items, gfp, true);
0629     if (!msg)
0630         return -ENOMEM;
0631 
0632     req->r_reply = msg;
0633 
0634     return 0;
0635 }
0636 
0637 static bool osd_req_opcode_valid(u16 opcode)
0638 {
0639     switch (opcode) {
0640 #define GENERATE_CASE(op, opcode, str)  case CEPH_OSD_OP_##op: return true;
0641 __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
0642 #undef GENERATE_CASE
0643     default:
0644         return false;
0645     }
0646 }
0647 
0648 static void get_num_data_items(struct ceph_osd_request *req,
0649                    int *num_request_data_items,
0650                    int *num_reply_data_items)
0651 {
0652     struct ceph_osd_req_op *op;
0653 
0654     *num_request_data_items = 0;
0655     *num_reply_data_items = 0;
0656 
0657     for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
0658         switch (op->op) {
0659         /* request */
0660         case CEPH_OSD_OP_WRITE:
0661         case CEPH_OSD_OP_WRITEFULL:
0662         case CEPH_OSD_OP_SETXATTR:
0663         case CEPH_OSD_OP_CMPXATTR:
0664         case CEPH_OSD_OP_NOTIFY_ACK:
0665         case CEPH_OSD_OP_COPY_FROM2:
0666             *num_request_data_items += 1;
0667             break;
0668 
0669         /* reply */
0670         case CEPH_OSD_OP_STAT:
0671         case CEPH_OSD_OP_READ:
0672         case CEPH_OSD_OP_LIST_WATCHERS:
0673             *num_reply_data_items += 1;
0674             break;
0675 
0676         /* both */
0677         case CEPH_OSD_OP_NOTIFY:
0678             *num_request_data_items += 1;
0679             *num_reply_data_items += 1;
0680             break;
0681         case CEPH_OSD_OP_CALL:
0682             *num_request_data_items += 2;
0683             *num_reply_data_items += 1;
0684             break;
0685 
0686         default:
0687             WARN_ON(!osd_req_opcode_valid(op->op));
0688             break;
0689         }
0690     }
0691 }
0692 
0693 /*
0694  * oid, oloc and OSD op opcode(s) must be filled in before this function
0695  * is called.
0696  */
0697 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
0698 {
0699     int num_request_data_items, num_reply_data_items;
0700 
0701     get_num_data_items(req, &num_request_data_items, &num_reply_data_items);
0702     return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items,
0703                       num_reply_data_items);
0704 }
0705 EXPORT_SYMBOL(ceph_osdc_alloc_messages);
0706 
0707 /*
0708  * This is an osd op init function for opcodes that have no data or
0709  * other information associated with them.  It also serves as a
0710  * common init routine for all the other init functions, below.
0711  */
0712 struct ceph_osd_req_op *
0713 osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
0714          u16 opcode, u32 flags)
0715 {
0716     struct ceph_osd_req_op *op;
0717 
0718     BUG_ON(which >= osd_req->r_num_ops);
0719     BUG_ON(!osd_req_opcode_valid(opcode));
0720 
0721     op = &osd_req->r_ops[which];
0722     memset(op, 0, sizeof (*op));
0723     op->op = opcode;
0724     op->flags = flags;
0725 
0726     return op;
0727 }
0728 EXPORT_SYMBOL(osd_req_op_init);
0729 
0730 void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
0731                 unsigned int which, u16 opcode,
0732                 u64 offset, u64 length,
0733                 u64 truncate_size, u32 truncate_seq)
0734 {
0735     struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
0736                              opcode, 0);
0737     size_t payload_len = 0;
0738 
0739     BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
0740            opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
0741            opcode != CEPH_OSD_OP_TRUNCATE);
0742 
0743     op->extent.offset = offset;
0744     op->extent.length = length;
0745     op->extent.truncate_size = truncate_size;
0746     op->extent.truncate_seq = truncate_seq;
0747     if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
0748         payload_len += length;
0749 
0750     op->indata_len = payload_len;
0751 }
0752 EXPORT_SYMBOL(osd_req_op_extent_init);
0753 
0754 void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
0755                 unsigned int which, u64 length)
0756 {
0757     struct ceph_osd_req_op *op;
0758     u64 previous;
0759 
0760     BUG_ON(which >= osd_req->r_num_ops);
0761     op = &osd_req->r_ops[which];
0762     previous = op->extent.length;
0763 
0764     if (length == previous)
0765         return;     /* Nothing to do */
0766     BUG_ON(length > previous);
0767 
0768     op->extent.length = length;
0769     if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
0770         op->indata_len -= previous - length;
0771 }
0772 EXPORT_SYMBOL(osd_req_op_extent_update);
0773 
0774 void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
0775                 unsigned int which, u64 offset_inc)
0776 {
0777     struct ceph_osd_req_op *op, *prev_op;
0778 
0779     BUG_ON(which + 1 >= osd_req->r_num_ops);
0780 
0781     prev_op = &osd_req->r_ops[which];
0782     op = osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
0783     /* dup previous one */
0784     op->indata_len = prev_op->indata_len;
0785     op->outdata_len = prev_op->outdata_len;
0786     op->extent = prev_op->extent;
0787     /* adjust offset */
0788     op->extent.offset += offset_inc;
0789     op->extent.length -= offset_inc;
0790 
0791     if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
0792         op->indata_len -= offset_inc;
0793 }
0794 EXPORT_SYMBOL(osd_req_op_extent_dup_last);
0795 
0796 int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
0797             const char *class, const char *method)
0798 {
0799     struct ceph_osd_req_op *op;
0800     struct ceph_pagelist *pagelist;
0801     size_t payload_len = 0;
0802     size_t size;
0803     int ret;
0804 
0805     op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
0806 
0807     pagelist = ceph_pagelist_alloc(GFP_NOFS);
0808     if (!pagelist)
0809         return -ENOMEM;
0810 
0811     op->cls.class_name = class;
0812     size = strlen(class);
0813     BUG_ON(size > (size_t) U8_MAX);
0814     op->cls.class_len = size;
0815     ret = ceph_pagelist_append(pagelist, class, size);
0816     if (ret)
0817         goto err_pagelist_free;
0818     payload_len += size;
0819 
0820     op->cls.method_name = method;
0821     size = strlen(method);
0822     BUG_ON(size > (size_t) U8_MAX);
0823     op->cls.method_len = size;
0824     ret = ceph_pagelist_append(pagelist, method, size);
0825     if (ret)
0826         goto err_pagelist_free;
0827     payload_len += size;
0828 
0829     osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
0830     op->indata_len = payload_len;
0831     return 0;
0832 
0833 err_pagelist_free:
0834     ceph_pagelist_release(pagelist);
0835     return ret;
0836 }
0837 EXPORT_SYMBOL(osd_req_op_cls_init);
0838 
0839 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
0840               u16 opcode, const char *name, const void *value,
0841               size_t size, u8 cmp_op, u8 cmp_mode)
0842 {
0843     struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
0844                              opcode, 0);
0845     struct ceph_pagelist *pagelist;
0846     size_t payload_len;
0847     int ret;
0848 
0849     BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
0850 
0851     pagelist = ceph_pagelist_alloc(GFP_NOFS);
0852     if (!pagelist)
0853         return -ENOMEM;
0854 
0855     payload_len = strlen(name);
0856     op->xattr.name_len = payload_len;
0857     ret = ceph_pagelist_append(pagelist, name, payload_len);
0858     if (ret)
0859         goto err_pagelist_free;
0860 
0861     op->xattr.value_len = size;
0862     ret = ceph_pagelist_append(pagelist, value, size);
0863     if (ret)
0864         goto err_pagelist_free;
0865     payload_len += size;
0866 
0867     op->xattr.cmp_op = cmp_op;
0868     op->xattr.cmp_mode = cmp_mode;
0869 
0870     ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
0871     op->indata_len = payload_len;
0872     return 0;
0873 
0874 err_pagelist_free:
0875     ceph_pagelist_release(pagelist);
0876     return ret;
0877 }
0878 EXPORT_SYMBOL(osd_req_op_xattr_init);
0879 
0880 /*
0881  * @watch_opcode: CEPH_OSD_WATCH_OP_*
0882  */
0883 static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
0884                   u8 watch_opcode, u64 cookie, u32 gen)
0885 {
0886     struct ceph_osd_req_op *op;
0887 
0888     op = osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
0889     op->watch.cookie = cookie;
0890     op->watch.op = watch_opcode;
0891     op->watch.gen = gen;
0892 }
0893 
0894 /*
0895  * prot_ver, timeout and notify payload (may be empty) should already be
0896  * encoded in @request_pl
0897  */
0898 static void osd_req_op_notify_init(struct ceph_osd_request *req, int which,
0899                    u64 cookie, struct ceph_pagelist *request_pl)
0900 {
0901     struct ceph_osd_req_op *op;
0902 
0903     op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
0904     op->notify.cookie = cookie;
0905 
0906     ceph_osd_data_pagelist_init(&op->notify.request_data, request_pl);
0907     op->indata_len = request_pl->length;
0908 }
0909 
0910 /*
0911  * @flags: CEPH_OSD_OP_ALLOC_HINT_FLAG_*
0912  */
0913 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
0914                 unsigned int which,
0915                 u64 expected_object_size,
0916                 u64 expected_write_size,
0917                 u32 flags)
0918 {
0919     struct ceph_osd_req_op *op;
0920 
0921     op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_SETALLOCHINT, 0);
0922     op->alloc_hint.expected_object_size = expected_object_size;
0923     op->alloc_hint.expected_write_size = expected_write_size;
0924     op->alloc_hint.flags = flags;
0925 
0926     /*
0927      * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
0928      * not worth a feature bit.  Set FAILOK per-op flag to make
0929      * sure older osds don't trip over an unsupported opcode.
0930      */
0931     op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
0932 }
0933 EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
0934 
0935 static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
0936                 struct ceph_osd_data *osd_data)
0937 {
0938     u64 length = ceph_osd_data_length(osd_data);
0939 
0940     if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
0941         BUG_ON(length > (u64) SIZE_MAX);
0942         if (length)
0943             ceph_msg_data_add_pages(msg, osd_data->pages,
0944                     length, osd_data->alignment, false);
0945     } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
0946         BUG_ON(!length);
0947         ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
0948 #ifdef CONFIG_BLOCK
0949     } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
0950         ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
0951 #endif
0952     } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
0953         ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
0954     } else {
0955         BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
0956     }
0957 }
0958 
0959 static u32 osd_req_encode_op(struct ceph_osd_op *dst,
0960                  const struct ceph_osd_req_op *src)
0961 {
0962     switch (src->op) {
0963     case CEPH_OSD_OP_STAT:
0964         break;
0965     case CEPH_OSD_OP_READ:
0966     case CEPH_OSD_OP_WRITE:
0967     case CEPH_OSD_OP_WRITEFULL:
0968     case CEPH_OSD_OP_ZERO:
0969     case CEPH_OSD_OP_TRUNCATE:
0970         dst->extent.offset = cpu_to_le64(src->extent.offset);
0971         dst->extent.length = cpu_to_le64(src->extent.length);
0972         dst->extent.truncate_size =
0973             cpu_to_le64(src->extent.truncate_size);
0974         dst->extent.truncate_seq =
0975             cpu_to_le32(src->extent.truncate_seq);
0976         break;
0977     case CEPH_OSD_OP_CALL:
0978         dst->cls.class_len = src->cls.class_len;
0979         dst->cls.method_len = src->cls.method_len;
0980         dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
0981         break;
0982     case CEPH_OSD_OP_WATCH:
0983         dst->watch.cookie = cpu_to_le64(src->watch.cookie);
0984         dst->watch.ver = cpu_to_le64(0);
0985         dst->watch.op = src->watch.op;
0986         dst->watch.gen = cpu_to_le32(src->watch.gen);
0987         break;
0988     case CEPH_OSD_OP_NOTIFY_ACK:
0989         break;
0990     case CEPH_OSD_OP_NOTIFY:
0991         dst->notify.cookie = cpu_to_le64(src->notify.cookie);
0992         break;
0993     case CEPH_OSD_OP_LIST_WATCHERS:
0994         break;
0995     case CEPH_OSD_OP_SETALLOCHINT:
0996         dst->alloc_hint.expected_object_size =
0997             cpu_to_le64(src->alloc_hint.expected_object_size);
0998         dst->alloc_hint.expected_write_size =
0999             cpu_to_le64(src->alloc_hint.expected_write_size);
1000         dst->alloc_hint.flags = cpu_to_le32(src->alloc_hint.flags);
1001         break;
1002     case CEPH_OSD_OP_SETXATTR:
1003     case CEPH_OSD_OP_CMPXATTR:
1004         dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
1005         dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
1006         dst->xattr.cmp_op = src->xattr.cmp_op;
1007         dst->xattr.cmp_mode = src->xattr.cmp_mode;
1008         break;
1009     case CEPH_OSD_OP_CREATE:
1010     case CEPH_OSD_OP_DELETE:
1011         break;
1012     case CEPH_OSD_OP_COPY_FROM2:
1013         dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
1014         dst->copy_from.src_version =
1015             cpu_to_le64(src->copy_from.src_version);
1016         dst->copy_from.flags = src->copy_from.flags;
1017         dst->copy_from.src_fadvise_flags =
1018             cpu_to_le32(src->copy_from.src_fadvise_flags);
1019         break;
1020     default:
1021         pr_err("unsupported osd opcode %s\n",
1022             ceph_osd_op_name(src->op));
1023         WARN_ON(1);
1024 
1025         return 0;
1026     }
1027 
1028     dst->op = cpu_to_le16(src->op);
1029     dst->flags = cpu_to_le32(src->flags);
1030     dst->payload_len = cpu_to_le32(src->indata_len);
1031 
1032     return src->indata_len;
1033 }
1034 
1035 /*
1036  * build new request AND message, calculate layout, and adjust file
1037  * extent as needed.
1038  *
1039  * if the file was recently truncated, we include information about its
1040  * old and new size so that the object can be updated appropriately.  (we
1041  * avoid synchronously deleting truncated objects because it's slow.)
1042  */
1043 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
1044                            struct ceph_file_layout *layout,
1045                            struct ceph_vino vino,
1046                            u64 off, u64 *plen,
1047                            unsigned int which, int num_ops,
1048                            int opcode, int flags,
1049                            struct ceph_snap_context *snapc,
1050                            u32 truncate_seq,
1051                            u64 truncate_size,
1052                            bool use_mempool)
1053 {
1054     struct ceph_osd_request *req;
1055     u64 objnum = 0;
1056     u64 objoff = 0;
1057     u64 objlen = 0;
1058     int r;
1059 
1060     BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
1061            opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
1062            opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
1063 
1064     req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
1065                     GFP_NOFS);
1066     if (!req) {
1067         r = -ENOMEM;
1068         goto fail;
1069     }
1070 
1071     /* calculate max write size */
1072     r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
1073     if (r)
1074         goto fail;
1075 
1076     if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
1077         osd_req_op_init(req, which, opcode, 0);
1078     } else {
1079         u32 object_size = layout->object_size;
1080         u32 object_base = off - objoff;
1081         if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
1082             if (truncate_size <= object_base) {
1083                 truncate_size = 0;
1084             } else {
1085                 truncate_size -= object_base;
1086                 if (truncate_size > object_size)
1087                     truncate_size = object_size;
1088             }
1089         }
1090         osd_req_op_extent_init(req, which, opcode, objoff, objlen,
1091                        truncate_size, truncate_seq);
1092     }
1093 
1094     req->r_base_oloc.pool = layout->pool_id;
1095     req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
1096     ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
1097     req->r_flags = flags | osdc->client->options->read_from_replica;
1098 
1099     req->r_snapid = vino.snap;
1100     if (flags & CEPH_OSD_FLAG_WRITE)
1101         req->r_data_offset = off;
1102 
1103     if (num_ops > 1)
1104         /*
1105          * This is a special case for ceph_writepages_start(), but it
1106          * also covers ceph_uninline_data().  If more multi-op request
1107          * use cases emerge, we will need a separate helper.
1108          */
1109         r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0);
1110     else
1111         r = ceph_osdc_alloc_messages(req, GFP_NOFS);
1112     if (r)
1113         goto fail;
1114 
1115     return req;
1116 
1117 fail:
1118     ceph_osdc_put_request(req);
1119     return ERR_PTR(r);
1120 }
1121 EXPORT_SYMBOL(ceph_osdc_new_request);
1122 
1123 /*
1124  * We keep osd requests in an rbtree, sorted by ->r_tid.
1125  */
1126 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
1127 DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
1128 
1129 /*
1130  * Call @fn on each OSD request as long as @fn returns 0.
1131  */
1132 static void for_each_request(struct ceph_osd_client *osdc,
1133             int (*fn)(struct ceph_osd_request *req, void *arg),
1134             void *arg)
1135 {
1136     struct rb_node *n, *p;
1137 
1138     for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
1139         struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
1140 
1141         for (p = rb_first(&osd->o_requests); p; ) {
1142             struct ceph_osd_request *req =
1143                 rb_entry(p, struct ceph_osd_request, r_node);
1144 
1145             p = rb_next(p);
1146             if (fn(req, arg))
1147                 return;
1148         }
1149     }
1150 
1151     for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
1152         struct ceph_osd_request *req =
1153             rb_entry(p, struct ceph_osd_request, r_node);
1154 
1155         p = rb_next(p);
1156         if (fn(req, arg))
1157             return;
1158     }
1159 }
1160 
1161 static bool osd_homeless(struct ceph_osd *osd)
1162 {
1163     return osd->o_osd == CEPH_HOMELESS_OSD;
1164 }
1165 
1166 static bool osd_registered(struct ceph_osd *osd)
1167 {
1168     verify_osdc_locked(osd->o_osdc);
1169 
1170     return !RB_EMPTY_NODE(&osd->o_node);
1171 }
1172 
1173 /*
1174  * Assumes @osd is zero-initialized.
1175  */
1176 static void osd_init(struct ceph_osd *osd)
1177 {
1178     refcount_set(&osd->o_ref, 1);
1179     RB_CLEAR_NODE(&osd->o_node);
1180     osd->o_requests = RB_ROOT;
1181     osd->o_linger_requests = RB_ROOT;
1182     osd->o_backoff_mappings = RB_ROOT;
1183     osd->o_backoffs_by_id = RB_ROOT;
1184     INIT_LIST_HEAD(&osd->o_osd_lru);
1185     INIT_LIST_HEAD(&osd->o_keepalive_item);
1186     osd->o_incarnation = 1;
1187     mutex_init(&osd->lock);
1188 }
1189 
1190 static void osd_cleanup(struct ceph_osd *osd)
1191 {
1192     WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
1193     WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
1194     WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
1195     WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings));
1196     WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id));
1197     WARN_ON(!list_empty(&osd->o_osd_lru));
1198     WARN_ON(!list_empty(&osd->o_keepalive_item));
1199 
1200     if (osd->o_auth.authorizer) {
1201         WARN_ON(osd_homeless(osd));
1202         ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
1203     }
1204 }
1205 
1206 /*
1207  * Track open sessions with osds.
1208  */
1209 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
1210 {
1211     struct ceph_osd *osd;
1212 
1213     WARN_ON(onum == CEPH_HOMELESS_OSD);
1214 
1215     osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL);
1216     osd_init(osd);
1217     osd->o_osdc = osdc;
1218     osd->o_osd = onum;
1219 
1220     ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
1221 
1222     return osd;
1223 }
1224 
1225 static struct ceph_osd *get_osd(struct ceph_osd *osd)
1226 {
1227     if (refcount_inc_not_zero(&osd->o_ref)) {
1228         dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
1229              refcount_read(&osd->o_ref));
1230         return osd;
1231     } else {
1232         dout("get_osd %p FAIL\n", osd);
1233         return NULL;
1234     }
1235 }
1236 
1237 static void put_osd(struct ceph_osd *osd)
1238 {
1239     dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
1240          refcount_read(&osd->o_ref) - 1);
1241     if (refcount_dec_and_test(&osd->o_ref)) {
1242         osd_cleanup(osd);
1243         kfree(osd);
1244     }
1245 }
1246 
1247 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
1248 
1249 static void __move_osd_to_lru(struct ceph_osd *osd)
1250 {
1251     struct ceph_osd_client *osdc = osd->o_osdc;
1252 
1253     dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1254     BUG_ON(!list_empty(&osd->o_osd_lru));
1255 
1256     spin_lock(&osdc->osd_lru_lock);
1257     list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
1258     spin_unlock(&osdc->osd_lru_lock);
1259 
1260     osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
1261 }
1262 
1263 static void maybe_move_osd_to_lru(struct ceph_osd *osd)
1264 {
1265     if (RB_EMPTY_ROOT(&osd->o_requests) &&
1266         RB_EMPTY_ROOT(&osd->o_linger_requests))
1267         __move_osd_to_lru(osd);
1268 }
1269 
1270 static void __remove_osd_from_lru(struct ceph_osd *osd)
1271 {
1272     struct ceph_osd_client *osdc = osd->o_osdc;
1273 
1274     dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1275 
1276     spin_lock(&osdc->osd_lru_lock);
1277     if (!list_empty(&osd->o_osd_lru))
1278         list_del_init(&osd->o_osd_lru);
1279     spin_unlock(&osdc->osd_lru_lock);
1280 }
1281 
1282 /*
1283  * Close the connection and assign any leftover requests to the
1284  * homeless session.
1285  */
1286 static void close_osd(struct ceph_osd *osd)
1287 {
1288     struct ceph_osd_client *osdc = osd->o_osdc;
1289     struct rb_node *n;
1290 
1291     verify_osdc_wrlocked(osdc);
1292     dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1293 
1294     ceph_con_close(&osd->o_con);
1295 
1296     for (n = rb_first(&osd->o_requests); n; ) {
1297         struct ceph_osd_request *req =
1298             rb_entry(n, struct ceph_osd_request, r_node);
1299 
1300         n = rb_next(n); /* unlink_request() */
1301 
1302         dout(" reassigning req %p tid %llu\n", req, req->r_tid);
1303         unlink_request(osd, req);
1304         link_request(&osdc->homeless_osd, req);
1305     }
1306     for (n = rb_first(&osd->o_linger_requests); n; ) {
1307         struct ceph_osd_linger_request *lreq =
1308             rb_entry(n, struct ceph_osd_linger_request, node);
1309 
1310         n = rb_next(n); /* unlink_linger() */
1311 
1312         dout(" reassigning lreq %p linger_id %llu\n", lreq,
1313              lreq->linger_id);
1314         unlink_linger(osd, lreq);
1315         link_linger(&osdc->homeless_osd, lreq);
1316     }
1317     clear_backoffs(osd);
1318 
1319     __remove_osd_from_lru(osd);
1320     erase_osd(&osdc->osds, osd);
1321     put_osd(osd);
1322 }
1323 
1324 /*
1325  * reset osd connect
1326  */
1327 static int reopen_osd(struct ceph_osd *osd)
1328 {
1329     struct ceph_entity_addr *peer_addr;
1330 
1331     dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1332 
1333     if (RB_EMPTY_ROOT(&osd->o_requests) &&
1334         RB_EMPTY_ROOT(&osd->o_linger_requests)) {
1335         close_osd(osd);
1336         return -ENODEV;
1337     }
1338 
1339     peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
1340     if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
1341             !ceph_con_opened(&osd->o_con)) {
1342         struct rb_node *n;
1343 
1344         dout("osd addr hasn't changed and connection never opened, "
1345              "letting msgr retry\n");
1346         /* touch each r_stamp for handle_timeout()'s benfit */
1347         for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
1348             struct ceph_osd_request *req =
1349                 rb_entry(n, struct ceph_osd_request, r_node);
1350             req->r_stamp = jiffies;
1351         }
1352 
1353         return -EAGAIN;
1354     }
1355 
1356     ceph_con_close(&osd->o_con);
1357     ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
1358     osd->o_incarnation++;
1359 
1360     return 0;
1361 }
1362 
1363 static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
1364                       bool wrlocked)
1365 {
1366     struct ceph_osd *osd;
1367 
1368     if (wrlocked)
1369         verify_osdc_wrlocked(osdc);
1370     else
1371         verify_osdc_locked(osdc);
1372 
1373     if (o != CEPH_HOMELESS_OSD)
1374         osd = lookup_osd(&osdc->osds, o);
1375     else
1376         osd = &osdc->homeless_osd;
1377     if (!osd) {
1378         if (!wrlocked)
1379             return ERR_PTR(-EAGAIN);
1380 
1381         osd = create_osd(osdc, o);
1382         insert_osd(&osdc->osds, osd);
1383         ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
1384                   &osdc->osdmap->osd_addr[osd->o_osd]);
1385     }
1386 
1387     dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
1388     return osd;
1389 }
1390 
1391 /*
1392  * Create request <-> OSD session relation.
1393  *
1394  * @req has to be assigned a tid, @osd may be homeless.
1395  */
1396 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1397 {
1398     verify_osd_locked(osd);
1399     WARN_ON(!req->r_tid || req->r_osd);
1400     dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1401          req, req->r_tid);
1402 
1403     if (!osd_homeless(osd))
1404         __remove_osd_from_lru(osd);
1405     else
1406         atomic_inc(&osd->o_osdc->num_homeless);
1407 
1408     get_osd(osd);
1409     insert_request(&osd->o_requests, req);
1410     req->r_osd = osd;
1411 }
1412 
1413 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1414 {
1415     verify_osd_locked(osd);
1416     WARN_ON(req->r_osd != osd);
1417     dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1418          req, req->r_tid);
1419 
1420     req->r_osd = NULL;
1421     erase_request(&osd->o_requests, req);
1422     put_osd(osd);
1423 
1424     if (!osd_homeless(osd))
1425         maybe_move_osd_to_lru(osd);
1426     else
1427         atomic_dec(&osd->o_osdc->num_homeless);
1428 }
1429 
1430 static bool __pool_full(struct ceph_pg_pool_info *pi)
1431 {
1432     return pi->flags & CEPH_POOL_FLAG_FULL;
1433 }
1434 
1435 static bool have_pool_full(struct ceph_osd_client *osdc)
1436 {
1437     struct rb_node *n;
1438 
1439     for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
1440         struct ceph_pg_pool_info *pi =
1441             rb_entry(n, struct ceph_pg_pool_info, node);
1442 
1443         if (__pool_full(pi))
1444             return true;
1445     }
1446 
1447     return false;
1448 }
1449 
1450 static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
1451 {
1452     struct ceph_pg_pool_info *pi;
1453 
1454     pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
1455     if (!pi)
1456         return false;
1457 
1458     return __pool_full(pi);
1459 }
1460 
1461 /*
1462  * Returns whether a request should be blocked from being sent
1463  * based on the current osdmap and osd_client settings.
1464  */
1465 static bool target_should_be_paused(struct ceph_osd_client *osdc,
1466                     const struct ceph_osd_request_target *t,
1467                     struct ceph_pg_pool_info *pi)
1468 {
1469     bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
1470     bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
1471                ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
1472                __pool_full(pi);
1473 
1474     WARN_ON(pi->id != t->target_oloc.pool);
1475     return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
1476            ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
1477            (osdc->osdmap->epoch < osdc->epoch_barrier);
1478 }
1479 
1480 static int pick_random_replica(const struct ceph_osds *acting)
1481 {
1482     int i = prandom_u32() % acting->size;
1483 
1484     dout("%s picked osd%d, primary osd%d\n", __func__,
1485          acting->osds[i], acting->primary);
1486     return i;
1487 }
1488 
1489 /*
1490  * Picks the closest replica based on client's location given by
1491  * crush_location option.  Prefers the primary if the locality is
1492  * the same.
1493  */
1494 static int pick_closest_replica(struct ceph_osd_client *osdc,
1495                 const struct ceph_osds *acting)
1496 {
1497     struct ceph_options *opt = osdc->client->options;
1498     int best_i, best_locality;
1499     int i = 0, locality;
1500 
1501     do {
1502         locality = ceph_get_crush_locality(osdc->osdmap,
1503                            acting->osds[i],
1504                            &opt->crush_locs);
1505         if (i == 0 ||
1506             (locality >= 0 && best_locality < 0) ||
1507             (locality >= 0 && best_locality >= 0 &&
1508              locality < best_locality)) {
1509             best_i = i;
1510             best_locality = locality;
1511         }
1512     } while (++i < acting->size);
1513 
1514     dout("%s picked osd%d with locality %d, primary osd%d\n", __func__,
1515          acting->osds[best_i], best_locality, acting->primary);
1516     return best_i;
1517 }
1518 
1519 enum calc_target_result {
1520     CALC_TARGET_NO_ACTION = 0,
1521     CALC_TARGET_NEED_RESEND,
1522     CALC_TARGET_POOL_DNE,
1523 };
1524 
1525 static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1526                        struct ceph_osd_request_target *t,
1527                        bool any_change)
1528 {
1529     struct ceph_pg_pool_info *pi;
1530     struct ceph_pg pgid, last_pgid;
1531     struct ceph_osds up, acting;
1532     bool is_read = t->flags & CEPH_OSD_FLAG_READ;
1533     bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
1534     bool force_resend = false;
1535     bool unpaused = false;
1536     bool legacy_change = false;
1537     bool split = false;
1538     bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
1539     bool recovery_deletes = ceph_osdmap_flag(osdc,
1540                          CEPH_OSDMAP_RECOVERY_DELETES);
1541     enum calc_target_result ct_res;
1542 
1543     t->epoch = osdc->osdmap->epoch;
1544     pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
1545     if (!pi) {
1546         t->osd = CEPH_HOMELESS_OSD;
1547         ct_res = CALC_TARGET_POOL_DNE;
1548         goto out;
1549     }
1550 
1551     if (osdc->osdmap->epoch == pi->last_force_request_resend) {
1552         if (t->last_force_resend < pi->last_force_request_resend) {
1553             t->last_force_resend = pi->last_force_request_resend;
1554             force_resend = true;
1555         } else if (t->last_force_resend == 0) {
1556             force_resend = true;
1557         }
1558     }
1559 
1560     /* apply tiering */
1561     ceph_oid_copy(&t->target_oid, &t->base_oid);
1562     ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
1563     if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1564         if (is_read && pi->read_tier >= 0)
1565             t->target_oloc.pool = pi->read_tier;
1566         if (is_write && pi->write_tier >= 0)
1567             t->target_oloc.pool = pi->write_tier;
1568 
1569         pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
1570         if (!pi) {
1571             t->osd = CEPH_HOMELESS_OSD;
1572             ct_res = CALC_TARGET_POOL_DNE;
1573             goto out;
1574         }
1575     }
1576 
1577     __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, &pgid);
1578     last_pgid.pool = pgid.pool;
1579     last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
1580 
1581     ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting);
1582     if (any_change &&
1583         ceph_is_new_interval(&t->acting,
1584                  &acting,
1585                  &t->up,
1586                  &up,
1587                  t->size,
1588                  pi->size,
1589                  t->min_size,
1590                  pi->min_size,
1591                  t->pg_num,
1592                  pi->pg_num,
1593                  t->sort_bitwise,
1594                  sort_bitwise,
1595                  t->recovery_deletes,
1596                  recovery_deletes,
1597                  &last_pgid))
1598         force_resend = true;
1599 
1600     if (t->paused && !target_should_be_paused(osdc, t, pi)) {
1601         t->paused = false;
1602         unpaused = true;
1603     }
1604     legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
1605             ceph_osds_changed(&t->acting, &acting,
1606                       t->used_replica || any_change);
1607     if (t->pg_num)
1608         split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
1609 
1610     if (legacy_change || force_resend || split) {
1611         t->pgid = pgid; /* struct */
1612         ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid);
1613         ceph_osds_copy(&t->acting, &acting);
1614         ceph_osds_copy(&t->up, &up);
1615         t->size = pi->size;
1616         t->min_size = pi->min_size;
1617         t->pg_num = pi->pg_num;
1618         t->pg_num_mask = pi->pg_num_mask;
1619         t->sort_bitwise = sort_bitwise;
1620         t->recovery_deletes = recovery_deletes;
1621 
1622         if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
1623                  CEPH_OSD_FLAG_LOCALIZE_READS)) &&
1624             !is_write && pi->type == CEPH_POOL_TYPE_REP &&
1625             acting.size > 1) {
1626             int pos;
1627 
1628             WARN_ON(!is_read || acting.osds[0] != acting.primary);
1629             if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
1630                 pos = pick_random_replica(&acting);
1631             } else {
1632                 pos = pick_closest_replica(osdc, &acting);
1633             }
1634             t->osd = acting.osds[pos];
1635             t->used_replica = pos > 0;
1636         } else {
1637             t->osd = acting.primary;
1638             t->used_replica = false;
1639         }
1640     }
1641 
1642     if (unpaused || legacy_change || force_resend || split)
1643         ct_res = CALC_TARGET_NEED_RESEND;
1644     else
1645         ct_res = CALC_TARGET_NO_ACTION;
1646 
1647 out:
1648     dout("%s t %p -> %d%d%d%d ct_res %d osd%d\n", __func__, t, unpaused,
1649          legacy_change, force_resend, split, ct_res, t->osd);
1650     return ct_res;
1651 }
1652 
1653 static struct ceph_spg_mapping *alloc_spg_mapping(void)
1654 {
1655     struct ceph_spg_mapping *spg;
1656 
1657     spg = kmalloc(sizeof(*spg), GFP_NOIO);
1658     if (!spg)
1659         return NULL;
1660 
1661     RB_CLEAR_NODE(&spg->node);
1662     spg->backoffs = RB_ROOT;
1663     return spg;
1664 }
1665 
1666 static void free_spg_mapping(struct ceph_spg_mapping *spg)
1667 {
1668     WARN_ON(!RB_EMPTY_NODE(&spg->node));
1669     WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs));
1670 
1671     kfree(spg);
1672 }
1673 
1674 /*
1675  * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to
1676  * ceph_pg_mapping.  Used to track OSD backoffs -- a backoff [range] is
1677  * defined only within a specific spgid; it does not pass anything to
1678  * children on split, or to another primary.
1679  */
1680 DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare,
1681          RB_BYPTR, const struct ceph_spg *, node)
1682 
1683 static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid)
1684 {
1685     return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits;
1686 }
1687 
1688 static void hoid_get_effective_key(const struct ceph_hobject_id *hoid,
1689                    void **pkey, size_t *pkey_len)
1690 {
1691     if (hoid->key_len) {
1692         *pkey = hoid->key;
1693         *pkey_len = hoid->key_len;
1694     } else {
1695         *pkey = hoid->oid;
1696         *pkey_len = hoid->oid_len;
1697     }
1698 }
1699 
1700 static int compare_names(const void *name1, size_t name1_len,
1701              const void *name2, size_t name2_len)
1702 {
1703     int ret;
1704 
1705     ret = memcmp(name1, name2, min(name1_len, name2_len));
1706     if (!ret) {
1707         if (name1_len < name2_len)
1708             ret = -1;
1709         else if (name1_len > name2_len)
1710             ret = 1;
1711     }
1712     return ret;
1713 }
1714 
1715 static int hoid_compare(const struct ceph_hobject_id *lhs,
1716             const struct ceph_hobject_id *rhs)
1717 {
1718     void *effective_key1, *effective_key2;
1719     size_t effective_key1_len, effective_key2_len;
1720     int ret;
1721 
1722     if (lhs->is_max < rhs->is_max)
1723         return -1;
1724     if (lhs->is_max > rhs->is_max)
1725         return 1;
1726 
1727     if (lhs->pool < rhs->pool)
1728         return -1;
1729     if (lhs->pool > rhs->pool)
1730         return 1;
1731 
1732     if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs))
1733         return -1;
1734     if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs))
1735         return 1;
1736 
1737     ret = compare_names(lhs->nspace, lhs->nspace_len,
1738                 rhs->nspace, rhs->nspace_len);
1739     if (ret)
1740         return ret;
1741 
1742     hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len);
1743     hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len);
1744     ret = compare_names(effective_key1, effective_key1_len,
1745                 effective_key2, effective_key2_len);
1746     if (ret)
1747         return ret;
1748 
1749     ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len);
1750     if (ret)
1751         return ret;
1752 
1753     if (lhs->snapid < rhs->snapid)
1754         return -1;
1755     if (lhs->snapid > rhs->snapid)
1756         return 1;
1757 
1758     return 0;
1759 }
1760 
1761 /*
1762  * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX
1763  * compat stuff here.
1764  *
1765  * Assumes @hoid is zero-initialized.
1766  */
1767 static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid)
1768 {
1769     u8 struct_v;
1770     u32 struct_len;
1771     int ret;
1772 
1773     ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v,
1774                   &struct_len);
1775     if (ret)
1776         return ret;
1777 
1778     if (struct_v < 4) {
1779         pr_err("got struct_v %d < 4 of hobject_t\n", struct_v);
1780         goto e_inval;
1781     }
1782 
1783     hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len,
1784                         GFP_NOIO);
1785     if (IS_ERR(hoid->key)) {
1786         ret = PTR_ERR(hoid->key);
1787         hoid->key = NULL;
1788         return ret;
1789     }
1790 
1791     hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len,
1792                         GFP_NOIO);
1793     if (IS_ERR(hoid->oid)) {
1794         ret = PTR_ERR(hoid->oid);
1795         hoid->oid = NULL;
1796         return ret;
1797     }
1798 
1799     ceph_decode_64_safe(p, end, hoid->snapid, e_inval);
1800     ceph_decode_32_safe(p, end, hoid->hash, e_inval);
1801     ceph_decode_8_safe(p, end, hoid->is_max, e_inval);
1802 
1803     hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len,
1804                            GFP_NOIO);
1805     if (IS_ERR(hoid->nspace)) {
1806         ret = PTR_ERR(hoid->nspace);
1807         hoid->nspace = NULL;
1808         return ret;
1809     }
1810 
1811     ceph_decode_64_safe(p, end, hoid->pool, e_inval);
1812 
1813     ceph_hoid_build_hash_cache(hoid);
1814     return 0;
1815 
1816 e_inval:
1817     return -EINVAL;
1818 }
1819 
1820 static int hoid_encoding_size(const struct ceph_hobject_id *hoid)
1821 {
1822     return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */
1823            4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len;
1824 }
1825 
1826 static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid)
1827 {
1828     ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid));
1829     ceph_encode_string(p, end, hoid->key, hoid->key_len);
1830     ceph_encode_string(p, end, hoid->oid, hoid->oid_len);
1831     ceph_encode_64(p, hoid->snapid);
1832     ceph_encode_32(p, hoid->hash);
1833     ceph_encode_8(p, hoid->is_max);
1834     ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len);
1835     ceph_encode_64(p, hoid->pool);
1836 }
1837 
1838 static void free_hoid(struct ceph_hobject_id *hoid)
1839 {
1840     if (hoid) {
1841         kfree(hoid->key);
1842         kfree(hoid->oid);
1843         kfree(hoid->nspace);
1844         kfree(hoid);
1845     }
1846 }
1847 
1848 static struct ceph_osd_backoff *alloc_backoff(void)
1849 {
1850     struct ceph_osd_backoff *backoff;
1851 
1852     backoff = kzalloc(sizeof(*backoff), GFP_NOIO);
1853     if (!backoff)
1854         return NULL;
1855 
1856     RB_CLEAR_NODE(&backoff->spg_node);
1857     RB_CLEAR_NODE(&backoff->id_node);
1858     return backoff;
1859 }
1860 
1861 static void free_backoff(struct ceph_osd_backoff *backoff)
1862 {
1863     WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node));
1864     WARN_ON(!RB_EMPTY_NODE(&backoff->id_node));
1865 
1866     free_hoid(backoff->begin);
1867     free_hoid(backoff->end);
1868     kfree(backoff);
1869 }
1870 
1871 /*
1872  * Within a specific spgid, backoffs are managed by ->begin hoid.
1873  */
1874 DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare,
1875             RB_BYVAL, spg_node);
1876 
1877 static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root,
1878                         const struct ceph_hobject_id *hoid)
1879 {
1880     struct rb_node *n = root->rb_node;
1881 
1882     while (n) {
1883         struct ceph_osd_backoff *cur =
1884             rb_entry(n, struct ceph_osd_backoff, spg_node);
1885         int cmp;
1886 
1887         cmp = hoid_compare(hoid, cur->begin);
1888         if (cmp < 0) {
1889             n = n->rb_left;
1890         } else if (cmp > 0) {
1891             if (hoid_compare(hoid, cur->end) < 0)
1892                 return cur;
1893 
1894             n = n->rb_right;
1895         } else {
1896             return cur;
1897         }
1898     }
1899 
1900     return NULL;
1901 }
1902 
1903 /*
1904  * Each backoff has a unique id within its OSD session.
1905  */
1906 DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node)
1907 
1908 static void clear_backoffs(struct ceph_osd *osd)
1909 {
1910     while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) {
1911         struct ceph_spg_mapping *spg =
1912             rb_entry(rb_first(&osd->o_backoff_mappings),
1913                  struct ceph_spg_mapping, node);
1914 
1915         while (!RB_EMPTY_ROOT(&spg->backoffs)) {
1916             struct ceph_osd_backoff *backoff =
1917                 rb_entry(rb_first(&spg->backoffs),
1918                      struct ceph_osd_backoff, spg_node);
1919 
1920             erase_backoff(&spg->backoffs, backoff);
1921             erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
1922             free_backoff(backoff);
1923         }
1924         erase_spg_mapping(&osd->o_backoff_mappings, spg);
1925         free_spg_mapping(spg);
1926     }
1927 }
1928 
1929 /*
1930  * Set up a temporary, non-owning view into @t.
1931  */
1932 static void hoid_fill_from_target(struct ceph_hobject_id *hoid,
1933                   const struct ceph_osd_request_target *t)
1934 {
1935     hoid->key = NULL;
1936     hoid->key_len = 0;
1937     hoid->oid = t->target_oid.name;
1938     hoid->oid_len = t->target_oid.name_len;
1939     hoid->snapid = CEPH_NOSNAP;
1940     hoid->hash = t->pgid.seed;
1941     hoid->is_max = false;
1942     if (t->target_oloc.pool_ns) {
1943         hoid->nspace = t->target_oloc.pool_ns->str;
1944         hoid->nspace_len = t->target_oloc.pool_ns->len;
1945     } else {
1946         hoid->nspace = NULL;
1947         hoid->nspace_len = 0;
1948     }
1949     hoid->pool = t->target_oloc.pool;
1950     ceph_hoid_build_hash_cache(hoid);
1951 }
1952 
1953 static bool should_plug_request(struct ceph_osd_request *req)
1954 {
1955     struct ceph_osd *osd = req->r_osd;
1956     struct ceph_spg_mapping *spg;
1957     struct ceph_osd_backoff *backoff;
1958     struct ceph_hobject_id hoid;
1959 
1960     spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid);
1961     if (!spg)
1962         return false;
1963 
1964     hoid_fill_from_target(&hoid, &req->r_t);
1965     backoff = lookup_containing_backoff(&spg->backoffs, &hoid);
1966     if (!backoff)
1967         return false;
1968 
1969     dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n",
1970          __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool,
1971          backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id);
1972     return true;
1973 }
1974 
1975 /*
1976  * Keep get_num_data_items() in sync with this function.
1977  */
1978 static void setup_request_data(struct ceph_osd_request *req)
1979 {
1980     struct ceph_msg *request_msg = req->r_request;
1981     struct ceph_msg *reply_msg = req->r_reply;
1982     struct ceph_osd_req_op *op;
1983 
1984     if (req->r_request->num_data_items || req->r_reply->num_data_items)
1985         return;
1986 
1987     WARN_ON(request_msg->data_length || reply_msg->data_length);
1988     for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
1989         switch (op->op) {
1990         /* request */
1991         case CEPH_OSD_OP_WRITE:
1992         case CEPH_OSD_OP_WRITEFULL:
1993             WARN_ON(op->indata_len != op->extent.length);
1994             ceph_osdc_msg_data_add(request_msg,
1995                            &op->extent.osd_data);
1996             break;
1997         case CEPH_OSD_OP_SETXATTR:
1998         case CEPH_OSD_OP_CMPXATTR:
1999             WARN_ON(op->indata_len != op->xattr.name_len +
2000                           op->xattr.value_len);
2001             ceph_osdc_msg_data_add(request_msg,
2002                            &op->xattr.osd_data);
2003             break;
2004         case CEPH_OSD_OP_NOTIFY_ACK:
2005             ceph_osdc_msg_data_add(request_msg,
2006                            &op->notify_ack.request_data);
2007             break;
2008         case CEPH_OSD_OP_COPY_FROM2:
2009             ceph_osdc_msg_data_add(request_msg,
2010                            &op->copy_from.osd_data);
2011             break;
2012 
2013         /* reply */
2014         case CEPH_OSD_OP_STAT:
2015             ceph_osdc_msg_data_add(reply_msg,
2016                            &op->raw_data_in);
2017             break;
2018         case CEPH_OSD_OP_READ:
2019             ceph_osdc_msg_data_add(reply_msg,
2020                            &op->extent.osd_data);
2021             break;
2022         case CEPH_OSD_OP_LIST_WATCHERS:
2023             ceph_osdc_msg_data_add(reply_msg,
2024                            &op->list_watchers.response_data);
2025             break;
2026 
2027         /* both */
2028         case CEPH_OSD_OP_CALL:
2029             WARN_ON(op->indata_len != op->cls.class_len +
2030                           op->cls.method_len +
2031                           op->cls.indata_len);
2032             ceph_osdc_msg_data_add(request_msg,
2033                            &op->cls.request_info);
2034             /* optional, can be NONE */
2035             ceph_osdc_msg_data_add(request_msg,
2036                            &op->cls.request_data);
2037             /* optional, can be NONE */
2038             ceph_osdc_msg_data_add(reply_msg,
2039                            &op->cls.response_data);
2040             break;
2041         case CEPH_OSD_OP_NOTIFY:
2042             ceph_osdc_msg_data_add(request_msg,
2043                            &op->notify.request_data);
2044             ceph_osdc_msg_data_add(reply_msg,
2045                            &op->notify.response_data);
2046             break;
2047         }
2048     }
2049 }
2050 
2051 static void encode_pgid(void **p, const struct ceph_pg *pgid)
2052 {
2053     ceph_encode_8(p, 1);
2054     ceph_encode_64(p, pgid->pool);
2055     ceph_encode_32(p, pgid->seed);
2056     ceph_encode_32(p, -1); /* preferred */
2057 }
2058 
2059 static void encode_spgid(void **p, const struct ceph_spg *spgid)
2060 {
2061     ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1);
2062     encode_pgid(p, &spgid->pgid);
2063     ceph_encode_8(p, spgid->shard);
2064 }
2065 
2066 static void encode_oloc(void **p, void *end,
2067             const struct ceph_object_locator *oloc)
2068 {
2069     ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc));
2070     ceph_encode_64(p, oloc->pool);
2071     ceph_encode_32(p, -1); /* preferred */
2072     ceph_encode_32(p, 0);  /* key len */
2073     if (oloc->pool_ns)
2074         ceph_encode_string(p, end, oloc->pool_ns->str,
2075                    oloc->pool_ns->len);
2076     else
2077         ceph_encode_32(p, 0);
2078 }
2079 
2080 static void encode_request_partial(struct ceph_osd_request *req,
2081                    struct ceph_msg *msg)
2082 {
2083     void *p = msg->front.iov_base;
2084     void *const end = p + msg->front_alloc_len;
2085     u32 data_len = 0;
2086     int i;
2087 
2088     if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
2089         /* snapshots aren't writeable */
2090         WARN_ON(req->r_snapid != CEPH_NOSNAP);
2091     } else {
2092         WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
2093             req->r_data_offset || req->r_snapc);
2094     }
2095 
2096     setup_request_data(req);
2097 
2098     encode_spgid(&p, &req->r_t.spgid); /* actual spg */
2099     ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
2100     ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
2101     ceph_encode_32(&p, req->r_flags);
2102 
2103     /* reqid */
2104     ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid));
2105     memset(p, 0, sizeof(struct ceph_osd_reqid));
2106     p += sizeof(struct ceph_osd_reqid);
2107 
2108     /* trace */
2109     memset(p, 0, sizeof(struct ceph_blkin_trace_info));
2110     p += sizeof(struct ceph_blkin_trace_info);
2111 
2112     ceph_encode_32(&p, 0); /* client_inc, always 0 */
2113     ceph_encode_timespec64(p, &req->r_mtime);
2114     p += sizeof(struct ceph_timespec);
2115 
2116     encode_oloc(&p, end, &req->r_t.target_oloc);
2117     ceph_encode_string(&p, end, req->r_t.target_oid.name,
2118                req->r_t.target_oid.name_len);
2119 
2120     /* ops, can imply data */
2121     ceph_encode_16(&p, req->r_num_ops);
2122     for (i = 0; i < req->r_num_ops; i++) {
2123         data_len += osd_req_encode_op(p, &req->r_ops[i]);
2124         p += sizeof(struct ceph_osd_op);
2125     }
2126 
2127     ceph_encode_64(&p, req->r_snapid); /* snapid */
2128     if (req->r_snapc) {
2129         ceph_encode_64(&p, req->r_snapc->seq);
2130         ceph_encode_32(&p, req->r_snapc->num_snaps);
2131         for (i = 0; i < req->r_snapc->num_snaps; i++)
2132             ceph_encode_64(&p, req->r_snapc->snaps[i]);
2133     } else {
2134         ceph_encode_64(&p, 0); /* snap_seq */
2135         ceph_encode_32(&p, 0); /* snaps len */
2136     }
2137 
2138     ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
2139     BUG_ON(p > end - 8); /* space for features */
2140 
2141     msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */
2142     /* front_len is finalized in encode_request_finish() */
2143     msg->front.iov_len = p - msg->front.iov_base;
2144     msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2145     msg->hdr.data_len = cpu_to_le32(data_len);
2146     /*
2147      * The header "data_off" is a hint to the receiver allowing it
2148      * to align received data into its buffers such that there's no
2149      * need to re-copy it before writing it to disk (direct I/O).
2150      */
2151     msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
2152 
2153     dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg,
2154          req->r_t.target_oid.name, req->r_t.target_oid.name_len);
2155 }
2156 
2157 static void encode_request_finish(struct ceph_msg *msg)
2158 {
2159     void *p = msg->front.iov_base;
2160     void *const partial_end = p + msg->front.iov_len;
2161     void *const end = p + msg->front_alloc_len;
2162 
2163     if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) {
2164         /* luminous OSD -- encode features and be done */
2165         p = partial_end;
2166         ceph_encode_64(&p, msg->con->peer_features);
2167     } else {
2168         struct {
2169             char spgid[CEPH_ENCODING_START_BLK_LEN +
2170                    CEPH_PGID_ENCODING_LEN + 1];
2171             __le32 hash;
2172             __le32 epoch;
2173             __le32 flags;
2174             char reqid[CEPH_ENCODING_START_BLK_LEN +
2175                    sizeof(struct ceph_osd_reqid)];
2176             char trace[sizeof(struct ceph_blkin_trace_info)];
2177             __le32 client_inc;
2178             struct ceph_timespec mtime;
2179         } __packed head;
2180         struct ceph_pg pgid;
2181         void *oloc, *oid, *tail;
2182         int oloc_len, oid_len, tail_len;
2183         int len;
2184 
2185         /*
2186          * Pre-luminous OSD -- reencode v8 into v4 using @head
2187          * as a temporary buffer.  Encode the raw PG; the rest
2188          * is just a matter of moving oloc, oid and tail blobs
2189          * around.
2190          */
2191         memcpy(&head, p, sizeof(head));
2192         p += sizeof(head);
2193 
2194         oloc = p;
2195         p += CEPH_ENCODING_START_BLK_LEN;
2196         pgid.pool = ceph_decode_64(&p);
2197         p += 4 + 4; /* preferred, key len */
2198         len = ceph_decode_32(&p);
2199         p += len;   /* nspace */
2200         oloc_len = p - oloc;
2201 
2202         oid = p;
2203         len = ceph_decode_32(&p);
2204         p += len;
2205         oid_len = p - oid;
2206 
2207         tail = p;
2208         tail_len = partial_end - p;
2209 
2210         p = msg->front.iov_base;
2211         ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc));
2212         ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch));
2213         ceph_encode_copy(&p, &head.flags, sizeof(head.flags));
2214         ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime));
2215 
2216         /* reassert_version */
2217         memset(p, 0, sizeof(struct ceph_eversion));
2218         p += sizeof(struct ceph_eversion);
2219 
2220         BUG_ON(p >= oloc);
2221         memmove(p, oloc, oloc_len);
2222         p += oloc_len;
2223 
2224         pgid.seed = le32_to_cpu(head.hash);
2225         encode_pgid(&p, &pgid); /* raw pg */
2226 
2227         BUG_ON(p >= oid);
2228         memmove(p, oid, oid_len);
2229         p += oid_len;
2230 
2231         /* tail -- ops, snapid, snapc, retry_attempt */
2232         BUG_ON(p >= tail);
2233         memmove(p, tail, tail_len);
2234         p += tail_len;
2235 
2236         msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
2237     }
2238 
2239     BUG_ON(p > end);
2240     msg->front.iov_len = p - msg->front.iov_base;
2241     msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2242 
2243     dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg,
2244          le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len),
2245          le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len),
2246          le16_to_cpu(msg->hdr.version));
2247 }
2248 
2249 /*
2250  * @req has to be assigned a tid and registered.
2251  */
2252 static void send_request(struct ceph_osd_request *req)
2253 {
2254     struct ceph_osd *osd = req->r_osd;
2255 
2256     verify_osd_locked(osd);
2257     WARN_ON(osd->o_osd != req->r_t.osd);
2258 
2259     /* backoff? */
2260     if (should_plug_request(req))
2261         return;
2262 
2263     /*
2264      * We may have a previously queued request message hanging
2265      * around.  Cancel it to avoid corrupting the msgr.
2266      */
2267     if (req->r_sent)
2268         ceph_msg_revoke(req->r_request);
2269 
2270     req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
2271     if (req->r_attempts)
2272         req->r_flags |= CEPH_OSD_FLAG_RETRY;
2273     else
2274         WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
2275 
2276     encode_request_partial(req, req->r_request);
2277 
2278     dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n",
2279          __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
2280          req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed,
2281          req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags,
2282          req->r_attempts);
2283 
2284     req->r_t.paused = false;
2285     req->r_stamp = jiffies;
2286     req->r_attempts++;
2287 
2288     req->r_sent = osd->o_incarnation;
2289     req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
2290     ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
2291 }
2292 
2293 static void maybe_request_map(struct ceph_osd_client *osdc)
2294 {
2295     bool continuous = false;
2296 
2297     verify_osdc_locked(osdc);
2298     WARN_ON(!osdc->osdmap->epoch);
2299 
2300     if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2301         ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) ||
2302         ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2303         dout("%s osdc %p continuous\n", __func__, osdc);
2304         continuous = true;
2305     } else {
2306         dout("%s osdc %p onetime\n", __func__, osdc);
2307     }
2308 
2309     if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
2310                    osdc->osdmap->epoch + 1, continuous))
2311         ceph_monc_renew_subs(&osdc->client->monc);
2312 }
2313 
2314 static void complete_request(struct ceph_osd_request *req, int err);
2315 static void send_map_check(struct ceph_osd_request *req);
2316 
2317 static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
2318 {
2319     struct ceph_osd_client *osdc = req->r_osdc;
2320     struct ceph_osd *osd;
2321     enum calc_target_result ct_res;
2322     int err = 0;
2323     bool need_send = false;
2324     bool promoted = false;
2325 
2326     WARN_ON(req->r_tid);
2327     dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
2328 
2329 again:
2330     ct_res = calc_target(osdc, &req->r_t, false);
2331     if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
2332         goto promote;
2333 
2334     osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
2335     if (IS_ERR(osd)) {
2336         WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
2337         goto promote;
2338     }
2339 
2340     if (osdc->abort_err) {
2341         dout("req %p abort_err %d\n", req, osdc->abort_err);
2342         err = osdc->abort_err;
2343     } else if (osdc->osdmap->epoch < osdc->epoch_barrier) {
2344         dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
2345              osdc->epoch_barrier);
2346         req->r_t.paused = true;
2347         maybe_request_map(osdc);
2348     } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2349            ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2350         dout("req %p pausewr\n", req);
2351         req->r_t.paused = true;
2352         maybe_request_map(osdc);
2353     } else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
2354            ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
2355         dout("req %p pauserd\n", req);
2356         req->r_t.paused = true;
2357         maybe_request_map(osdc);
2358     } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2359            !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
2360                      CEPH_OSD_FLAG_FULL_FORCE)) &&
2361            (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2362             pool_full(osdc, req->r_t.base_oloc.pool))) {
2363         dout("req %p full/pool_full\n", req);
2364         if (ceph_test_opt(osdc->client, ABORT_ON_FULL)) {
2365             err = -ENOSPC;
2366         } else {
2367             if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL))
2368                 pr_warn_ratelimited("cluster is full (osdmap FULL)\n");
2369             else
2370                 pr_warn_ratelimited("pool %lld is full or reached quota\n",
2371                             req->r_t.base_oloc.pool);
2372             req->r_t.paused = true;
2373             maybe_request_map(osdc);
2374         }
2375     } else if (!osd_homeless(osd)) {
2376         need_send = true;
2377     } else {
2378         maybe_request_map(osdc);
2379     }
2380 
2381     mutex_lock(&osd->lock);
2382     /*
2383      * Assign the tid atomically with send_request() to protect
2384      * multiple writes to the same object from racing with each
2385      * other, resulting in out of order ops on the OSDs.
2386      */
2387     req->r_tid = atomic64_inc_return(&osdc->last_tid);
2388     link_request(osd, req);
2389     if (need_send)
2390         send_request(req);
2391     else if (err)
2392         complete_request(req, err);
2393     mutex_unlock(&osd->lock);
2394 
2395     if (!err && ct_res == CALC_TARGET_POOL_DNE)
2396         send_map_check(req);
2397 
2398     if (promoted)
2399         downgrade_write(&osdc->lock);
2400     return;
2401 
2402 promote:
2403     up_read(&osdc->lock);
2404     down_write(&osdc->lock);
2405     wrlocked = true;
2406     promoted = true;
2407     goto again;
2408 }
2409 
2410 static void account_request(struct ceph_osd_request *req)
2411 {
2412     WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK));
2413     WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)));
2414 
2415     req->r_flags |= CEPH_OSD_FLAG_ONDISK;
2416     atomic_inc(&req->r_osdc->num_requests);
2417 
2418     req->r_start_stamp = jiffies;
2419     req->r_start_latency = ktime_get();
2420 }
2421 
2422 static void submit_request(struct ceph_osd_request *req, bool wrlocked)
2423 {
2424     ceph_osdc_get_request(req);
2425     account_request(req);
2426     __submit_request(req, wrlocked);
2427 }
2428 
2429 static void finish_request(struct ceph_osd_request *req)
2430 {
2431     struct ceph_osd_client *osdc = req->r_osdc;
2432 
2433     WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
2434     dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2435 
2436     req->r_end_latency = ktime_get();
2437 
2438     if (req->r_osd)
2439         unlink_request(req->r_osd, req);
2440     atomic_dec(&osdc->num_requests);
2441 
2442     /*
2443      * If an OSD has failed or returned and a request has been sent
2444      * twice, it's possible to get a reply and end up here while the
2445      * request message is queued for delivery.  We will ignore the
2446      * reply, so not a big deal, but better to try and catch it.
2447      */
2448     ceph_msg_revoke(req->r_request);
2449     ceph_msg_revoke_incoming(req->r_reply);
2450 }
2451 
2452 static void __complete_request(struct ceph_osd_request *req)
2453 {
2454     dout("%s req %p tid %llu cb %ps result %d\n", __func__, req,
2455          req->r_tid, req->r_callback, req->r_result);
2456 
2457     if (req->r_callback)
2458         req->r_callback(req);
2459     complete_all(&req->r_completion);
2460     ceph_osdc_put_request(req);
2461 }
2462 
2463 static void complete_request_workfn(struct work_struct *work)
2464 {
2465     struct ceph_osd_request *req =
2466         container_of(work, struct ceph_osd_request, r_complete_work);
2467 
2468     __complete_request(req);
2469 }
2470 
2471 /*
2472  * This is open-coded in handle_reply().
2473  */
2474 static void complete_request(struct ceph_osd_request *req, int err)
2475 {
2476     dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2477 
2478     req->r_result = err;
2479     finish_request(req);
2480 
2481     INIT_WORK(&req->r_complete_work, complete_request_workfn);
2482     queue_work(req->r_osdc->completion_wq, &req->r_complete_work);
2483 }
2484 
2485 static void cancel_map_check(struct ceph_osd_request *req)
2486 {
2487     struct ceph_osd_client *osdc = req->r_osdc;
2488     struct ceph_osd_request *lookup_req;
2489 
2490     verify_osdc_wrlocked(osdc);
2491 
2492     lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2493     if (!lookup_req)
2494         return;
2495 
2496     WARN_ON(lookup_req != req);
2497     erase_request_mc(&osdc->map_checks, req);
2498     ceph_osdc_put_request(req);
2499 }
2500 
2501 static void cancel_request(struct ceph_osd_request *req)
2502 {
2503     dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2504 
2505     cancel_map_check(req);
2506     finish_request(req);
2507     complete_all(&req->r_completion);
2508     ceph_osdc_put_request(req);
2509 }
2510 
2511 static void abort_request(struct ceph_osd_request *req, int err)
2512 {
2513     dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2514 
2515     cancel_map_check(req);
2516     complete_request(req, err);
2517 }
2518 
2519 static int abort_fn(struct ceph_osd_request *req, void *arg)
2520 {
2521     int err = *(int *)arg;
2522 
2523     abort_request(req, err);
2524     return 0; /* continue iteration */
2525 }
2526 
2527 /*
2528  * Abort all in-flight requests with @err and arrange for all future
2529  * requests to be failed immediately.
2530  */
2531 void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err)
2532 {
2533     dout("%s osdc %p err %d\n", __func__, osdc, err);
2534     down_write(&osdc->lock);
2535     for_each_request(osdc, abort_fn, &err);
2536     osdc->abort_err = err;
2537     up_write(&osdc->lock);
2538 }
2539 EXPORT_SYMBOL(ceph_osdc_abort_requests);
2540 
2541 void ceph_osdc_clear_abort_err(struct ceph_osd_client *osdc)
2542 {
2543     down_write(&osdc->lock);
2544     osdc->abort_err = 0;
2545     up_write(&osdc->lock);
2546 }
2547 EXPORT_SYMBOL(ceph_osdc_clear_abort_err);
2548 
2549 static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2550 {
2551     if (likely(eb > osdc->epoch_barrier)) {
2552         dout("updating epoch_barrier from %u to %u\n",
2553                 osdc->epoch_barrier, eb);
2554         osdc->epoch_barrier = eb;
2555         /* Request map if we're not to the barrier yet */
2556         if (eb > osdc->osdmap->epoch)
2557             maybe_request_map(osdc);
2558     }
2559 }
2560 
2561 void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2562 {
2563     down_read(&osdc->lock);
2564     if (unlikely(eb > osdc->epoch_barrier)) {
2565         up_read(&osdc->lock);
2566         down_write(&osdc->lock);
2567         update_epoch_barrier(osdc, eb);
2568         up_write(&osdc->lock);
2569     } else {
2570         up_read(&osdc->lock);
2571     }
2572 }
2573 EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
2574 
2575 /*
2576  * We can end up releasing caps as a result of abort_request().
2577  * In that case, we probably want to ensure that the cap release message
2578  * has an updated epoch barrier in it, so set the epoch barrier prior to
2579  * aborting the first request.
2580  */
2581 static int abort_on_full_fn(struct ceph_osd_request *req, void *arg)
2582 {
2583     struct ceph_osd_client *osdc = req->r_osdc;
2584     bool *victims = arg;
2585 
2586     if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2587         (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2588          pool_full(osdc, req->r_t.base_oloc.pool))) {
2589         if (!*victims) {
2590             update_epoch_barrier(osdc, osdc->osdmap->epoch);
2591             *victims = true;
2592         }
2593         abort_request(req, -ENOSPC);
2594     }
2595 
2596     return 0; /* continue iteration */
2597 }
2598 
2599 /*
2600  * Drop all pending requests that are stalled waiting on a full condition to
2601  * clear, and complete them with ENOSPC as the return code. Set the
2602  * osdc->epoch_barrier to the latest map epoch that we've seen if any were
2603  * cancelled.
2604  */
2605 static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
2606 {
2607     bool victims = false;
2608 
2609     if (ceph_test_opt(osdc->client, ABORT_ON_FULL) &&
2610         (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc)))
2611         for_each_request(osdc, abort_on_full_fn, &victims);
2612 }
2613 
2614 static void check_pool_dne(struct ceph_osd_request *req)
2615 {
2616     struct ceph_osd_client *osdc = req->r_osdc;
2617     struct ceph_osdmap *map = osdc->osdmap;
2618 
2619     verify_osdc_wrlocked(osdc);
2620     WARN_ON(!map->epoch);
2621 
2622     if (req->r_attempts) {
2623         /*
2624          * We sent a request earlier, which means that
2625          * previously the pool existed, and now it does not
2626          * (i.e., it was deleted).
2627          */
2628         req->r_map_dne_bound = map->epoch;
2629         dout("%s req %p tid %llu pool disappeared\n", __func__, req,
2630              req->r_tid);
2631     } else {
2632         dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__,
2633              req, req->r_tid, req->r_map_dne_bound, map->epoch);
2634     }
2635 
2636     if (req->r_map_dne_bound) {
2637         if (map->epoch >= req->r_map_dne_bound) {
2638             /* we had a new enough map */
2639             pr_info_ratelimited("tid %llu pool does not exist\n",
2640                         req->r_tid);
2641             complete_request(req, -ENOENT);
2642         }
2643     } else {
2644         send_map_check(req);
2645     }
2646 }
2647 
2648 static void map_check_cb(struct ceph_mon_generic_request *greq)
2649 {
2650     struct ceph_osd_client *osdc = &greq->monc->client->osdc;
2651     struct ceph_osd_request *req;
2652     u64 tid = greq->private_data;
2653 
2654     WARN_ON(greq->result || !greq->u.newest);
2655 
2656     down_write(&osdc->lock);
2657     req = lookup_request_mc(&osdc->map_checks, tid);
2658     if (!req) {
2659         dout("%s tid %llu dne\n", __func__, tid);
2660         goto out_unlock;
2661     }
2662 
2663     dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__,
2664          req, req->r_tid, req->r_map_dne_bound, greq->u.newest);
2665     if (!req->r_map_dne_bound)
2666         req->r_map_dne_bound = greq->u.newest;
2667     erase_request_mc(&osdc->map_checks, req);
2668     check_pool_dne(req);
2669 
2670     ceph_osdc_put_request(req);
2671 out_unlock:
2672     up_write(&osdc->lock);
2673 }
2674 
2675 static void send_map_check(struct ceph_osd_request *req)
2676 {
2677     struct ceph_osd_client *osdc = req->r_osdc;
2678     struct ceph_osd_request *lookup_req;
2679     int ret;
2680 
2681     verify_osdc_wrlocked(osdc);
2682 
2683     lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2684     if (lookup_req) {
2685         WARN_ON(lookup_req != req);
2686         return;
2687     }
2688 
2689     ceph_osdc_get_request(req);
2690     insert_request_mc(&osdc->map_checks, req);
2691     ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
2692                       map_check_cb, req->r_tid);
2693     WARN_ON(ret);
2694 }
2695 
2696 /*
2697  * lingering requests, watch/notify v2 infrastructure
2698  */
2699 static void linger_release(struct kref *kref)
2700 {
2701     struct ceph_osd_linger_request *lreq =
2702         container_of(kref, struct ceph_osd_linger_request, kref);
2703 
2704     dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq,
2705          lreq->reg_req, lreq->ping_req);
2706     WARN_ON(!RB_EMPTY_NODE(&lreq->node));
2707     WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
2708     WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node));
2709     WARN_ON(!list_empty(&lreq->scan_item));
2710     WARN_ON(!list_empty(&lreq->pending_lworks));
2711     WARN_ON(lreq->osd);
2712 
2713     if (lreq->request_pl)
2714         ceph_pagelist_release(lreq->request_pl);
2715     if (lreq->notify_id_pages)
2716         ceph_release_page_vector(lreq->notify_id_pages, 1);
2717 
2718     ceph_osdc_put_request(lreq->reg_req);
2719     ceph_osdc_put_request(lreq->ping_req);
2720     target_destroy(&lreq->t);
2721     kfree(lreq);
2722 }
2723 
2724 static void linger_put(struct ceph_osd_linger_request *lreq)
2725 {
2726     if (lreq)
2727         kref_put(&lreq->kref, linger_release);
2728 }
2729 
2730 static struct ceph_osd_linger_request *
2731 linger_get(struct ceph_osd_linger_request *lreq)
2732 {
2733     kref_get(&lreq->kref);
2734     return lreq;
2735 }
2736 
2737 static struct ceph_osd_linger_request *
2738 linger_alloc(struct ceph_osd_client *osdc)
2739 {
2740     struct ceph_osd_linger_request *lreq;
2741 
2742     lreq = kzalloc(sizeof(*lreq), GFP_NOIO);
2743     if (!lreq)
2744         return NULL;
2745 
2746     kref_init(&lreq->kref);
2747     mutex_init(&lreq->lock);
2748     RB_CLEAR_NODE(&lreq->node);
2749     RB_CLEAR_NODE(&lreq->osdc_node);
2750     RB_CLEAR_NODE(&lreq->mc_node);
2751     INIT_LIST_HEAD(&lreq->scan_item);
2752     INIT_LIST_HEAD(&lreq->pending_lworks);
2753     init_completion(&lreq->reg_commit_wait);
2754     init_completion(&lreq->notify_finish_wait);
2755 
2756     lreq->osdc = osdc;
2757     target_init(&lreq->t);
2758 
2759     dout("%s lreq %p\n", __func__, lreq);
2760     return lreq;
2761 }
2762 
2763 DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
2764 DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
2765 DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node)
2766 
2767 /*
2768  * Create linger request <-> OSD session relation.
2769  *
2770  * @lreq has to be registered, @osd may be homeless.
2771  */
2772 static void link_linger(struct ceph_osd *osd,
2773             struct ceph_osd_linger_request *lreq)
2774 {
2775     verify_osd_locked(osd);
2776     WARN_ON(!lreq->linger_id || lreq->osd);
2777     dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2778          osd->o_osd, lreq, lreq->linger_id);
2779 
2780     if (!osd_homeless(osd))
2781         __remove_osd_from_lru(osd);
2782     else
2783         atomic_inc(&osd->o_osdc->num_homeless);
2784 
2785     get_osd(osd);
2786     insert_linger(&osd->o_linger_requests, lreq);
2787     lreq->osd = osd;
2788 }
2789 
2790 static void unlink_linger(struct ceph_osd *osd,
2791               struct ceph_osd_linger_request *lreq)
2792 {
2793     verify_osd_locked(osd);
2794     WARN_ON(lreq->osd != osd);
2795     dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2796          osd->o_osd, lreq, lreq->linger_id);
2797 
2798     lreq->osd = NULL;
2799     erase_linger(&osd->o_linger_requests, lreq);
2800     put_osd(osd);
2801 
2802     if (!osd_homeless(osd))
2803         maybe_move_osd_to_lru(osd);
2804     else
2805         atomic_dec(&osd->o_osdc->num_homeless);
2806 }
2807 
2808 static bool __linger_registered(struct ceph_osd_linger_request *lreq)
2809 {
2810     verify_osdc_locked(lreq->osdc);
2811 
2812     return !RB_EMPTY_NODE(&lreq->osdc_node);
2813 }
2814 
2815 static bool linger_registered(struct ceph_osd_linger_request *lreq)
2816 {
2817     struct ceph_osd_client *osdc = lreq->osdc;
2818     bool registered;
2819 
2820     down_read(&osdc->lock);
2821     registered = __linger_registered(lreq);
2822     up_read(&osdc->lock);
2823 
2824     return registered;
2825 }
2826 
2827 static void linger_register(struct ceph_osd_linger_request *lreq)
2828 {
2829     struct ceph_osd_client *osdc = lreq->osdc;
2830 
2831     verify_osdc_wrlocked(osdc);
2832     WARN_ON(lreq->linger_id);
2833 
2834     linger_get(lreq);
2835     lreq->linger_id = ++osdc->last_linger_id;
2836     insert_linger_osdc(&osdc->linger_requests, lreq);
2837 }
2838 
2839 static void linger_unregister(struct ceph_osd_linger_request *lreq)
2840 {
2841     struct ceph_osd_client *osdc = lreq->osdc;
2842 
2843     verify_osdc_wrlocked(osdc);
2844 
2845     erase_linger_osdc(&osdc->linger_requests, lreq);
2846     linger_put(lreq);
2847 }
2848 
2849 static void cancel_linger_request(struct ceph_osd_request *req)
2850 {
2851     struct ceph_osd_linger_request *lreq = req->r_priv;
2852 
2853     WARN_ON(!req->r_linger);
2854     cancel_request(req);
2855     linger_put(lreq);
2856 }
2857 
2858 struct linger_work {
2859     struct work_struct work;
2860     struct ceph_osd_linger_request *lreq;
2861     struct list_head pending_item;
2862     unsigned long queued_stamp;
2863 
2864     union {
2865         struct {
2866             u64 notify_id;
2867             u64 notifier_id;
2868             void *payload; /* points into @msg front */
2869             size_t payload_len;
2870 
2871             struct ceph_msg *msg; /* for ceph_msg_put() */
2872         } notify;
2873         struct {
2874             int err;
2875         } error;
2876     };
2877 };
2878 
2879 static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
2880                        work_func_t workfn)
2881 {
2882     struct linger_work *lwork;
2883 
2884     lwork = kzalloc(sizeof(*lwork), GFP_NOIO);
2885     if (!lwork)
2886         return NULL;
2887 
2888     INIT_WORK(&lwork->work, workfn);
2889     INIT_LIST_HEAD(&lwork->pending_item);
2890     lwork->lreq = linger_get(lreq);
2891 
2892     return lwork;
2893 }
2894 
2895 static void lwork_free(struct linger_work *lwork)
2896 {
2897     struct ceph_osd_linger_request *lreq = lwork->lreq;
2898 
2899     mutex_lock(&lreq->lock);
2900     list_del(&lwork->pending_item);
2901     mutex_unlock(&lreq->lock);
2902 
2903     linger_put(lreq);
2904     kfree(lwork);
2905 }
2906 
2907 static void lwork_queue(struct linger_work *lwork)
2908 {
2909     struct ceph_osd_linger_request *lreq = lwork->lreq;
2910     struct ceph_osd_client *osdc = lreq->osdc;
2911 
2912     verify_lreq_locked(lreq);
2913     WARN_ON(!list_empty(&lwork->pending_item));
2914 
2915     lwork->queued_stamp = jiffies;
2916     list_add_tail(&lwork->pending_item, &lreq->pending_lworks);
2917     queue_work(osdc->notify_wq, &lwork->work);
2918 }
2919 
2920 static void do_watch_notify(struct work_struct *w)
2921 {
2922     struct linger_work *lwork = container_of(w, struct linger_work, work);
2923     struct ceph_osd_linger_request *lreq = lwork->lreq;
2924 
2925     if (!linger_registered(lreq)) {
2926         dout("%s lreq %p not registered\n", __func__, lreq);
2927         goto out;
2928     }
2929 
2930     WARN_ON(!lreq->is_watch);
2931     dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
2932          __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
2933          lwork->notify.payload_len);
2934     lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id,
2935           lwork->notify.notifier_id, lwork->notify.payload,
2936           lwork->notify.payload_len);
2937 
2938 out:
2939     ceph_msg_put(lwork->notify.msg);
2940     lwork_free(lwork);
2941 }
2942 
2943 static void do_watch_error(struct work_struct *w)
2944 {
2945     struct linger_work *lwork = container_of(w, struct linger_work, work);
2946     struct ceph_osd_linger_request *lreq = lwork->lreq;
2947 
2948     if (!linger_registered(lreq)) {
2949         dout("%s lreq %p not registered\n", __func__, lreq);
2950         goto out;
2951     }
2952 
2953     dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err);
2954     lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err);
2955 
2956 out:
2957     lwork_free(lwork);
2958 }
2959 
2960 static void queue_watch_error(struct ceph_osd_linger_request *lreq)
2961 {
2962     struct linger_work *lwork;
2963 
2964     lwork = lwork_alloc(lreq, do_watch_error);
2965     if (!lwork) {
2966         pr_err("failed to allocate error-lwork\n");
2967         return;
2968     }
2969 
2970     lwork->error.err = lreq->last_error;
2971     lwork_queue(lwork);
2972 }
2973 
2974 static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq,
2975                        int result)
2976 {
2977     if (!completion_done(&lreq->reg_commit_wait)) {
2978         lreq->reg_commit_error = (result <= 0 ? result : 0);
2979         complete_all(&lreq->reg_commit_wait);
2980     }
2981 }
2982 
2983 static void linger_commit_cb(struct ceph_osd_request *req)
2984 {
2985     struct ceph_osd_linger_request *lreq = req->r_priv;
2986 
2987     mutex_lock(&lreq->lock);
2988     if (req != lreq->reg_req) {
2989         dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
2990              __func__, lreq, lreq->linger_id, req, lreq->reg_req);
2991         goto out;
2992     }
2993 
2994     dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
2995          lreq->linger_id, req->r_result);
2996     linger_reg_commit_complete(lreq, req->r_result);
2997     lreq->committed = true;
2998 
2999     if (!lreq->is_watch) {
3000         struct ceph_osd_data *osd_data =
3001             osd_req_op_data(req, 0, notify, response_data);
3002         void *p = page_address(osd_data->pages[0]);
3003 
3004         WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
3005             osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
3006 
3007         /* make note of the notify_id */
3008         if (req->r_ops[0].outdata_len >= sizeof(u64)) {
3009             lreq->notify_id = ceph_decode_64(&p);
3010             dout("lreq %p notify_id %llu\n", lreq,
3011                  lreq->notify_id);
3012         } else {
3013             dout("lreq %p no notify_id\n", lreq);
3014         }
3015     }
3016 
3017 out:
3018     mutex_unlock(&lreq->lock);
3019     linger_put(lreq);
3020 }
3021 
3022 static int normalize_watch_error(int err)
3023 {
3024     /*
3025      * Translate ENOENT -> ENOTCONN so that a delete->disconnection
3026      * notification and a failure to reconnect because we raced with
3027      * the delete appear the same to the user.
3028      */
3029     if (err == -ENOENT)
3030         err = -ENOTCONN;
3031 
3032     return err;
3033 }
3034 
3035 static void linger_reconnect_cb(struct ceph_osd_request *req)
3036 {
3037     struct ceph_osd_linger_request *lreq = req->r_priv;
3038 
3039     mutex_lock(&lreq->lock);
3040     if (req != lreq->reg_req) {
3041         dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
3042              __func__, lreq, lreq->linger_id, req, lreq->reg_req);
3043         goto out;
3044     }
3045 
3046     dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__,
3047          lreq, lreq->linger_id, req->r_result, lreq->last_error);
3048     if (req->r_result < 0) {
3049         if (!lreq->last_error) {
3050             lreq->last_error = normalize_watch_error(req->r_result);
3051             queue_watch_error(lreq);
3052         }
3053     }
3054 
3055 out:
3056     mutex_unlock(&lreq->lock);
3057     linger_put(lreq);
3058 }
3059 
3060 static void send_linger(struct ceph_osd_linger_request *lreq)
3061 {
3062     struct ceph_osd_client *osdc = lreq->osdc;
3063     struct ceph_osd_request *req;
3064     int ret;
3065 
3066     verify_osdc_wrlocked(osdc);
3067     mutex_lock(&lreq->lock);
3068     dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3069 
3070     if (lreq->reg_req) {
3071         if (lreq->reg_req->r_osd)
3072             cancel_linger_request(lreq->reg_req);
3073         ceph_osdc_put_request(lreq->reg_req);
3074     }
3075 
3076     req = ceph_osdc_alloc_request(osdc, NULL, 1, true, GFP_NOIO);
3077     BUG_ON(!req);
3078 
3079     target_copy(&req->r_t, &lreq->t);
3080     req->r_mtime = lreq->mtime;
3081 
3082     if (lreq->is_watch && lreq->committed) {
3083         osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_RECONNECT,
3084                       lreq->linger_id, ++lreq->register_gen);
3085         dout("lreq %p reconnect register_gen %u\n", lreq,
3086              req->r_ops[0].watch.gen);
3087         req->r_callback = linger_reconnect_cb;
3088     } else {
3089         if (lreq->is_watch) {
3090             osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_WATCH,
3091                           lreq->linger_id, 0);
3092         } else {
3093             lreq->notify_id = 0;
3094 
3095             refcount_inc(&lreq->request_pl->refcnt);
3096             osd_req_op_notify_init(req, 0, lreq->linger_id,
3097                            lreq->request_pl);
3098             ceph_osd_data_pages_init(
3099                 osd_req_op_data(req, 0, notify, response_data),
3100                 lreq->notify_id_pages, PAGE_SIZE, 0, false, false);
3101         }
3102         dout("lreq %p register\n", lreq);
3103         req->r_callback = linger_commit_cb;
3104     }
3105 
3106     ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3107     BUG_ON(ret);
3108 
3109     req->r_priv = linger_get(lreq);
3110     req->r_linger = true;
3111     lreq->reg_req = req;
3112     mutex_unlock(&lreq->lock);
3113 
3114     submit_request(req, true);
3115 }
3116 
3117 static void linger_ping_cb(struct ceph_osd_request *req)
3118 {
3119     struct ceph_osd_linger_request *lreq = req->r_priv;
3120 
3121     mutex_lock(&lreq->lock);
3122     if (req != lreq->ping_req) {
3123         dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
3124              __func__, lreq, lreq->linger_id, req, lreq->ping_req);
3125         goto out;
3126     }
3127 
3128     dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n",
3129          __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
3130          lreq->last_error);
3131     if (lreq->register_gen == req->r_ops[0].watch.gen) {
3132         if (!req->r_result) {
3133             lreq->watch_valid_thru = lreq->ping_sent;
3134         } else if (!lreq->last_error) {
3135             lreq->last_error = normalize_watch_error(req->r_result);
3136             queue_watch_error(lreq);
3137         }
3138     } else {
3139         dout("lreq %p register_gen %u ignoring old pong %u\n", lreq,
3140              lreq->register_gen, req->r_ops[0].watch.gen);
3141     }
3142 
3143 out:
3144     mutex_unlock(&lreq->lock);
3145     linger_put(lreq);
3146 }
3147 
3148 static void send_linger_ping(struct ceph_osd_linger_request *lreq)
3149 {
3150     struct ceph_osd_client *osdc = lreq->osdc;
3151     struct ceph_osd_request *req;
3152     int ret;
3153 
3154     if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
3155         dout("%s PAUSERD\n", __func__);
3156         return;
3157     }
3158 
3159     lreq->ping_sent = jiffies;
3160     dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n",
3161          __func__, lreq, lreq->linger_id, lreq->ping_sent,
3162          lreq->register_gen);
3163 
3164     if (lreq->ping_req) {
3165         if (lreq->ping_req->r_osd)
3166             cancel_linger_request(lreq->ping_req);
3167         ceph_osdc_put_request(lreq->ping_req);
3168     }
3169 
3170     req = ceph_osdc_alloc_request(osdc, NULL, 1, true, GFP_NOIO);
3171     BUG_ON(!req);
3172 
3173     target_copy(&req->r_t, &lreq->t);
3174     osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_PING, lreq->linger_id,
3175                   lreq->register_gen);
3176     req->r_callback = linger_ping_cb;
3177 
3178     ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3179     BUG_ON(ret);
3180 
3181     req->r_priv = linger_get(lreq);
3182     req->r_linger = true;
3183     lreq->ping_req = req;
3184 
3185     ceph_osdc_get_request(req);
3186     account_request(req);
3187     req->r_tid = atomic64_inc_return(&osdc->last_tid);
3188     link_request(lreq->osd, req);
3189     send_request(req);
3190 }
3191 
3192 static void linger_submit(struct ceph_osd_linger_request *lreq)
3193 {
3194     struct ceph_osd_client *osdc = lreq->osdc;
3195     struct ceph_osd *osd;
3196 
3197     down_write(&osdc->lock);
3198     linger_register(lreq);
3199 
3200     calc_target(osdc, &lreq->t, false);
3201     osd = lookup_create_osd(osdc, lreq->t.osd, true);
3202     link_linger(osd, lreq);
3203 
3204     send_linger(lreq);
3205     up_write(&osdc->lock);
3206 }
3207 
3208 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
3209 {
3210     struct ceph_osd_client *osdc = lreq->osdc;
3211     struct ceph_osd_linger_request *lookup_lreq;
3212 
3213     verify_osdc_wrlocked(osdc);
3214 
3215     lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
3216                        lreq->linger_id);
3217     if (!lookup_lreq)
3218         return;
3219 
3220     WARN_ON(lookup_lreq != lreq);
3221     erase_linger_mc(&osdc->linger_map_checks, lreq);
3222     linger_put(lreq);
3223 }
3224 
3225 /*
3226  * @lreq has to be both registered and linked.
3227  */
3228 static void __linger_cancel(struct ceph_osd_linger_request *lreq)
3229 {
3230     if (lreq->ping_req && lreq->ping_req->r_osd)
3231         cancel_linger_request(lreq->ping_req);
3232     if (lreq->reg_req && lreq->reg_req->r_osd)
3233         cancel_linger_request(lreq->reg_req);
3234     cancel_linger_map_check(lreq);
3235     unlink_linger(lreq->osd, lreq);
3236     linger_unregister(lreq);
3237 }
3238 
3239 static void linger_cancel(struct ceph_osd_linger_request *lreq)
3240 {
3241     struct ceph_osd_client *osdc = lreq->osdc;
3242 
3243     down_write(&osdc->lock);
3244     if (__linger_registered(lreq))
3245         __linger_cancel(lreq);
3246     up_write(&osdc->lock);
3247 }
3248 
3249 static void send_linger_map_check(struct ceph_osd_linger_request *lreq);
3250 
3251 static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq)
3252 {
3253     struct ceph_osd_client *osdc = lreq->osdc;
3254     struct ceph_osdmap *map = osdc->osdmap;
3255 
3256     verify_osdc_wrlocked(osdc);
3257     WARN_ON(!map->epoch);
3258 
3259     if (lreq->register_gen) {
3260         lreq->map_dne_bound = map->epoch;
3261         dout("%s lreq %p linger_id %llu pool disappeared\n", __func__,
3262              lreq, lreq->linger_id);
3263     } else {
3264         dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n",
3265              __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3266              map->epoch);
3267     }
3268 
3269     if (lreq->map_dne_bound) {
3270         if (map->epoch >= lreq->map_dne_bound) {
3271             /* we had a new enough map */
3272             pr_info("linger_id %llu pool does not exist\n",
3273                 lreq->linger_id);
3274             linger_reg_commit_complete(lreq, -ENOENT);
3275             __linger_cancel(lreq);
3276         }
3277     } else {
3278         send_linger_map_check(lreq);
3279     }
3280 }
3281 
3282 static void linger_map_check_cb(struct ceph_mon_generic_request *greq)
3283 {
3284     struct ceph_osd_client *osdc = &greq->monc->client->osdc;
3285     struct ceph_osd_linger_request *lreq;
3286     u64 linger_id = greq->private_data;
3287 
3288     WARN_ON(greq->result || !greq->u.newest);
3289 
3290     down_write(&osdc->lock);
3291     lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id);
3292     if (!lreq) {
3293         dout("%s linger_id %llu dne\n", __func__, linger_id);
3294         goto out_unlock;
3295     }
3296 
3297     dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n",
3298          __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3299          greq->u.newest);
3300     if (!lreq->map_dne_bound)
3301         lreq->map_dne_bound = greq->u.newest;
3302     erase_linger_mc(&osdc->linger_map_checks, lreq);
3303     check_linger_pool_dne(lreq);
3304 
3305     linger_put(lreq);
3306 out_unlock:
3307     up_write(&osdc->lock);
3308 }
3309 
3310 static void send_linger_map_check(struct ceph_osd_linger_request *lreq)
3311 {
3312     struct ceph_osd_client *osdc = lreq->osdc;
3313     struct ceph_osd_linger_request *lookup_lreq;
3314     int ret;
3315 
3316     verify_osdc_wrlocked(osdc);
3317 
3318     lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
3319                        lreq->linger_id);
3320     if (lookup_lreq) {
3321         WARN_ON(lookup_lreq != lreq);
3322         return;
3323     }
3324 
3325     linger_get(lreq);
3326     insert_linger_mc(&osdc->linger_map_checks, lreq);
3327     ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
3328                       linger_map_check_cb, lreq->linger_id);
3329     WARN_ON(ret);
3330 }
3331 
3332 static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
3333 {
3334     int ret;
3335 
3336     dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3337     ret = wait_for_completion_interruptible(&lreq->reg_commit_wait);
3338     return ret ?: lreq->reg_commit_error;
3339 }
3340 
3341 static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq)
3342 {
3343     int ret;
3344 
3345     dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3346     ret = wait_for_completion_interruptible(&lreq->notify_finish_wait);
3347     return ret ?: lreq->notify_finish_error;
3348 }
3349 
3350 /*
3351  * Timeout callback, called every N seconds.  When 1 or more OSD
3352  * requests has been active for more than N seconds, we send a keepalive
3353  * (tag + timestamp) to its OSD to ensure any communications channel
3354  * reset is detected.
3355  */
3356 static void handle_timeout(struct work_struct *work)
3357 {
3358     struct ceph_osd_client *osdc =
3359         container_of(work, struct ceph_osd_client, timeout_work.work);
3360     struct ceph_options *opts = osdc->client->options;
3361     unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
3362     unsigned long expiry_cutoff = jiffies - opts->osd_request_timeout;
3363     LIST_HEAD(slow_osds);
3364     struct rb_node *n, *p;
3365 
3366     dout("%s osdc %p\n", __func__, osdc);
3367     down_write(&osdc->lock);
3368 
3369     /*
3370      * ping osds that are a bit slow.  this ensures that if there
3371      * is a break in the TCP connection we will notice, and reopen
3372      * a connection with that osd (from the fault callback).
3373      */
3374     for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
3375         struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3376         bool found = false;
3377 
3378         for (p = rb_first(&osd->o_requests); p; ) {
3379             struct ceph_osd_request *req =
3380                 rb_entry(p, struct ceph_osd_request, r_node);
3381 
3382             p = rb_next(p); /* abort_request() */
3383 
3384             if (time_before(req->r_stamp, cutoff)) {
3385                 dout(" req %p tid %llu on osd%d is laggy\n",
3386                      req, req->r_tid, osd->o_osd);
3387                 found = true;
3388             }
3389             if (opts->osd_request_timeout &&
3390                 time_before(req->r_start_stamp, expiry_cutoff)) {
3391                 pr_err_ratelimited("tid %llu on osd%d timeout\n",
3392                        req->r_tid, osd->o_osd);
3393                 abort_request(req, -ETIMEDOUT);
3394             }
3395         }
3396         for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
3397             struct ceph_osd_linger_request *lreq =
3398                 rb_entry(p, struct ceph_osd_linger_request, node);
3399 
3400             dout(" lreq %p linger_id %llu is served by osd%d\n",
3401                  lreq, lreq->linger_id, osd->o_osd);
3402             found = true;
3403 
3404             mutex_lock(&lreq->lock);
3405             if (lreq->is_watch && lreq->committed && !lreq->last_error)
3406                 send_linger_ping(lreq);
3407             mutex_unlock(&lreq->lock);
3408         }
3409 
3410         if (found)
3411             list_move_tail(&osd->o_keepalive_item, &slow_osds);
3412     }
3413 
3414     if (opts->osd_request_timeout) {
3415         for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
3416             struct ceph_osd_request *req =
3417                 rb_entry(p, struct ceph_osd_request, r_node);
3418 
3419             p = rb_next(p); /* abort_request() */
3420 
3421             if (time_before(req->r_start_stamp, expiry_cutoff)) {
3422                 pr_err_ratelimited("tid %llu on osd%d timeout\n",
3423                        req->r_tid, osdc->homeless_osd.o_osd);
3424                 abort_request(req, -ETIMEDOUT);
3425             }
3426         }
3427     }
3428 
3429     if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
3430         maybe_request_map(osdc);
3431 
3432     while (!list_empty(&slow_osds)) {
3433         struct ceph_osd *osd = list_first_entry(&slow_osds,
3434                             struct ceph_osd,
3435                             o_keepalive_item);
3436         list_del_init(&osd->o_keepalive_item);
3437         ceph_con_keepalive(&osd->o_con);
3438     }
3439 
3440     up_write(&osdc->lock);
3441     schedule_delayed_work(&osdc->timeout_work,
3442                   osdc->client->options->osd_keepalive_timeout);
3443 }
3444 
3445 static void handle_osds_timeout(struct work_struct *work)
3446 {
3447     struct ceph_osd_client *osdc =
3448         container_of(work, struct ceph_osd_client,
3449                  osds_timeout_work.work);
3450     unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
3451     struct ceph_osd *osd, *nosd;
3452 
3453     dout("%s osdc %p\n", __func__, osdc);
3454     down_write(&osdc->lock);
3455     list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
3456         if (time_before(jiffies, osd->lru_ttl))
3457             break;
3458 
3459         WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
3460         WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
3461         close_osd(osd);
3462     }
3463 
3464     up_write(&osdc->lock);
3465     schedule_delayed_work(&osdc->osds_timeout_work,
3466                   round_jiffies_relative(delay));
3467 }
3468 
3469 static int ceph_oloc_decode(void **p, void *end,
3470                 struct ceph_object_locator *oloc)
3471 {
3472     u8 struct_v, struct_cv;
3473     u32 len;
3474     void *struct_end;
3475     int ret = 0;
3476 
3477     ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3478     struct_v = ceph_decode_8(p);
3479     struct_cv = ceph_decode_8(p);
3480     if (struct_v < 3) {
3481         pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
3482             struct_v, struct_cv);
3483         goto e_inval;
3484     }
3485     if (struct_cv > 6) {
3486         pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
3487             struct_v, struct_cv);
3488         goto e_inval;
3489     }
3490     len = ceph_decode_32(p);
3491     ceph_decode_need(p, end, len, e_inval);
3492     struct_end = *p + len;
3493 
3494     oloc->pool = ceph_decode_64(p);
3495     *p += 4; /* skip preferred */
3496 
3497     len = ceph_decode_32(p);
3498     if (len > 0) {
3499         pr_warn("ceph_object_locator::key is set\n");
3500         goto e_inval;
3501     }
3502 
3503     if (struct_v >= 5) {
3504         bool changed = false;
3505 
3506         len = ceph_decode_32(p);
3507         if (len > 0) {
3508             ceph_decode_need(p, end, len, e_inval);
3509             if (!oloc->pool_ns ||
3510                 ceph_compare_string(oloc->pool_ns, *p, len))
3511                 changed = true;
3512             *p += len;
3513         } else {
3514             if (oloc->pool_ns)
3515                 changed = true;
3516         }
3517         if (changed) {
3518             /* redirect changes namespace */
3519             pr_warn("ceph_object_locator::nspace is changed\n");
3520             goto e_inval;
3521         }
3522     }
3523 
3524     if (struct_v >= 6) {
3525         s64 hash = ceph_decode_64(p);
3526         if (hash != -1) {
3527             pr_warn("ceph_object_locator::hash is set\n");
3528             goto e_inval;
3529         }
3530     }
3531 
3532     /* skip the rest */
3533     *p = struct_end;
3534 out:
3535     return ret;
3536 
3537 e_inval:
3538     ret = -EINVAL;
3539     goto out;
3540 }
3541 
3542 static int ceph_redirect_decode(void **p, void *end,
3543                 struct ceph_request_redirect *redir)
3544 {
3545     u8 struct_v, struct_cv;
3546     u32 len;
3547     void *struct_end;
3548     int ret;
3549 
3550     ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3551     struct_v = ceph_decode_8(p);
3552     struct_cv = ceph_decode_8(p);
3553     if (struct_cv > 1) {
3554         pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
3555             struct_v, struct_cv);
3556         goto e_inval;
3557     }
3558     len = ceph_decode_32(p);
3559     ceph_decode_need(p, end, len, e_inval);
3560     struct_end = *p + len;
3561 
3562     ret = ceph_oloc_decode(p, end, &redir->oloc);
3563     if (ret)
3564         goto out;
3565 
3566     len = ceph_decode_32(p);
3567     if (len > 0) {
3568         pr_warn("ceph_request_redirect::object_name is set\n");
3569         goto e_inval;
3570     }
3571 
3572     /* skip the rest */
3573     *p = struct_end;
3574 out:
3575     return ret;
3576 
3577 e_inval:
3578     ret = -EINVAL;
3579     goto out;
3580 }
3581 
3582 struct MOSDOpReply {
3583     struct ceph_pg pgid;
3584     u64 flags;
3585     int result;
3586     u32 epoch;
3587     int num_ops;
3588     u32 outdata_len[CEPH_OSD_MAX_OPS];
3589     s32 rval[CEPH_OSD_MAX_OPS];
3590     int retry_attempt;
3591     struct ceph_eversion replay_version;
3592     u64 user_version;
3593     struct ceph_request_redirect redirect;
3594 };
3595 
3596 static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
3597 {
3598     void *p = msg->front.iov_base;
3599     void *const end = p + msg->front.iov_len;
3600     u16 version = le16_to_cpu(msg->hdr.version);
3601     struct ceph_eversion bad_replay_version;
3602     u8 decode_redir;
3603     u32 len;
3604     int ret;
3605     int i;
3606 
3607     ceph_decode_32_safe(&p, end, len, e_inval);
3608     ceph_decode_need(&p, end, len, e_inval);
3609     p += len; /* skip oid */
3610 
3611     ret = ceph_decode_pgid(&p, end, &m->pgid);
3612     if (ret)
3613         return ret;
3614 
3615     ceph_decode_64_safe(&p, end, m->flags, e_inval);
3616     ceph_decode_32_safe(&p, end, m->result, e_inval);
3617     ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
3618     memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
3619     p += sizeof(bad_replay_version);
3620     ceph_decode_32_safe(&p, end, m->epoch, e_inval);
3621 
3622     ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
3623     if (m->num_ops > ARRAY_SIZE(m->outdata_len))
3624         goto e_inval;
3625 
3626     ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
3627              e_inval);
3628     for (i = 0; i < m->num_ops; i++) {
3629         struct ceph_osd_op *op = p;
3630 
3631         m->outdata_len[i] = le32_to_cpu(op->payload_len);
3632         p += sizeof(*op);
3633     }
3634 
3635     ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
3636     for (i = 0; i < m->num_ops; i++)
3637         ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
3638 
3639     if (version >= 5) {
3640         ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
3641         memcpy(&m->replay_version, p, sizeof(m->replay_version));
3642         p += sizeof(m->replay_version);
3643         ceph_decode_64_safe(&p, end, m->user_version, e_inval);
3644     } else {
3645         m->replay_version = bad_replay_version; /* struct */
3646         m->user_version = le64_to_cpu(m->replay_version.version);
3647     }
3648 
3649     if (version >= 6) {
3650         if (version >= 7)
3651             ceph_decode_8_safe(&p, end, decode_redir, e_inval);
3652         else
3653             decode_redir = 1;
3654     } else {
3655         decode_redir = 0;
3656     }
3657 
3658     if (decode_redir) {
3659         ret = ceph_redirect_decode(&p, end, &m->redirect);
3660         if (ret)
3661             return ret;
3662     } else {
3663         ceph_oloc_init(&m->redirect.oloc);
3664     }
3665 
3666     return 0;
3667 
3668 e_inval:
3669     return -EINVAL;
3670 }
3671 
3672 /*
3673  * Handle MOSDOpReply.  Set ->r_result and call the callback if it is
3674  * specified.
3675  */
3676 static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
3677 {
3678     struct ceph_osd_client *osdc = osd->o_osdc;
3679     struct ceph_osd_request *req;
3680     struct MOSDOpReply m;
3681     u64 tid = le64_to_cpu(msg->hdr.tid);
3682     u32 data_len = 0;
3683     int ret;
3684     int i;
3685 
3686     dout("%s msg %p tid %llu\n", __func__, msg, tid);
3687 
3688     down_read(&osdc->lock);
3689     if (!osd_registered(osd)) {
3690         dout("%s osd%d unknown\n", __func__, osd->o_osd);
3691         goto out_unlock_osdc;
3692     }
3693     WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
3694 
3695     mutex_lock(&osd->lock);
3696     req = lookup_request(&osd->o_requests, tid);
3697     if (!req) {
3698         dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
3699         goto out_unlock_session;
3700     }
3701 
3702     m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns;
3703     ret = decode_MOSDOpReply(msg, &m);
3704     m.redirect.oloc.pool_ns = NULL;
3705     if (ret) {
3706         pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
3707                req->r_tid, ret);
3708         ceph_msg_dump(msg);
3709         goto fail_request;
3710     }
3711     dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
3712          __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
3713          m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
3714          le64_to_cpu(m.replay_version.version), m.user_version);
3715 
3716     if (m.retry_attempt >= 0) {
3717         if (m.retry_attempt != req->r_attempts - 1) {
3718             dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
3719                  req, req->r_tid, m.retry_attempt,
3720                  req->r_attempts - 1);
3721             goto out_unlock_session;
3722         }
3723     } else {
3724         WARN_ON(1); /* MOSDOpReply v4 is assumed */
3725     }
3726 
3727     if (!ceph_oloc_empty(&m.redirect.oloc)) {
3728         dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
3729              m.redirect.oloc.pool);
3730         unlink_request(osd, req);
3731         mutex_unlock(&osd->lock);
3732 
3733         /*
3734          * Not ceph_oloc_copy() - changing pool_ns is not
3735          * supported.
3736          */
3737         req->r_t.target_oloc.pool = m.redirect.oloc.pool;
3738         req->r_flags |= CEPH_OSD_FLAG_REDIRECTED |
3739                 CEPH_OSD_FLAG_IGNORE_OVERLAY |
3740                 CEPH_OSD_FLAG_IGNORE_CACHE;
3741         req->r_tid = 0;
3742         __submit_request(req, false);
3743         goto out_unlock_osdc;
3744     }
3745 
3746     if (m.result == -EAGAIN) {
3747         dout("req %p tid %llu EAGAIN\n", req, req->r_tid);
3748         unlink_request(osd, req);
3749         mutex_unlock(&osd->lock);
3750 
3751         /*
3752          * The object is missing on the replica or not (yet)
3753          * readable.  Clear pgid to force a resend to the primary
3754          * via legacy_change.
3755          */
3756         req->r_t.pgid.pool = 0;
3757         req->r_t.pgid.seed = 0;
3758         WARN_ON(!req->r_t.used_replica);
3759         req->r_flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3760                   CEPH_OSD_FLAG_LOCALIZE_READS);
3761         req->r_tid = 0;
3762         __submit_request(req, false);
3763         goto out_unlock_osdc;
3764     }
3765 
3766     if (m.num_ops != req->r_num_ops) {
3767         pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
3768                req->r_num_ops, req->r_tid);
3769         goto fail_request;
3770     }
3771     for (i = 0; i < req->r_num_ops; i++) {
3772         dout(" req %p tid %llu op %d rval %d len %u\n", req,
3773              req->r_tid, i, m.rval[i], m.outdata_len[i]);
3774         req->r_ops[i].rval = m.rval[i];
3775         req->r_ops[i].outdata_len = m.outdata_len[i];
3776         data_len += m.outdata_len[i];
3777     }
3778     if (data_len != le32_to_cpu(msg->hdr.data_len)) {
3779         pr_err("sum of lens %u != %u for tid %llu\n", data_len,
3780                le32_to_cpu(msg->hdr.data_len), req->r_tid);
3781         goto fail_request;
3782     }
3783     dout("%s req %p tid %llu result %d data_len %u\n", __func__,
3784          req, req->r_tid, m.result, data_len);
3785 
3786     /*
3787      * Since we only ever request ONDISK, we should only ever get
3788      * one (type of) reply back.
3789      */
3790     WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK));
3791     req->r_result = m.result ?: data_len;
3792     finish_request(req);
3793     mutex_unlock(&osd->lock);
3794     up_read(&osdc->lock);
3795 
3796     __complete_request(req);
3797     return;
3798 
3799 fail_request:
3800     complete_request(req, -EIO);
3801 out_unlock_session:
3802     mutex_unlock(&osd->lock);
3803 out_unlock_osdc:
3804     up_read(&osdc->lock);
3805 }
3806 
3807 static void set_pool_was_full(struct ceph_osd_client *osdc)
3808 {
3809     struct rb_node *n;
3810 
3811     for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
3812         struct ceph_pg_pool_info *pi =
3813             rb_entry(n, struct ceph_pg_pool_info, node);
3814 
3815         pi->was_full = __pool_full(pi);
3816     }
3817 }
3818 
3819 static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
3820 {
3821     struct ceph_pg_pool_info *pi;
3822 
3823     pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
3824     if (!pi)
3825         return false;
3826 
3827     return pi->was_full && !__pool_full(pi);
3828 }
3829 
3830 static enum calc_target_result
3831 recalc_linger_target(struct ceph_osd_linger_request *lreq)
3832 {
3833     struct ceph_osd_client *osdc = lreq->osdc;
3834     enum calc_target_result ct_res;
3835 
3836     ct_res = calc_target(osdc, &lreq->t, true);
3837     if (ct_res == CALC_TARGET_NEED_RESEND) {
3838         struct ceph_osd *osd;
3839 
3840         osd = lookup_create_osd(osdc, lreq->t.osd, true);
3841         if (osd != lreq->osd) {
3842             unlink_linger(lreq->osd, lreq);
3843             link_linger(osd, lreq);
3844         }
3845     }
3846 
3847     return ct_res;
3848 }
3849 
3850 /*
3851  * Requeue requests whose mapping to an OSD has changed.
3852  */
3853 static void scan_requests(struct ceph_osd *osd,
3854               bool force_resend,
3855               bool cleared_full,
3856               bool check_pool_cleared_full,
3857               struct rb_root *need_resend,
3858               struct list_head *need_resend_linger)
3859 {
3860     struct ceph_osd_client *osdc = osd->o_osdc;
3861     struct rb_node *n;
3862     bool force_resend_writes;
3863 
3864     for (n = rb_first(&osd->o_linger_requests); n; ) {
3865         struct ceph_osd_linger_request *lreq =
3866             rb_entry(n, struct ceph_osd_linger_request, node);
3867         enum calc_target_result ct_res;
3868 
3869         n = rb_next(n); /* recalc_linger_target() */
3870 
3871         dout("%s lreq %p linger_id %llu\n", __func__, lreq,
3872              lreq->linger_id);
3873         ct_res = recalc_linger_target(lreq);
3874         switch (ct_res) {
3875         case CALC_TARGET_NO_ACTION:
3876             force_resend_writes = cleared_full ||
3877                 (check_pool_cleared_full &&
3878                  pool_cleared_full(osdc, lreq->t.base_oloc.pool));
3879             if (!force_resend && !force_resend_writes)
3880                 break;
3881 
3882             fallthrough;
3883         case CALC_TARGET_NEED_RESEND:
3884             cancel_linger_map_check(lreq);
3885             /*
3886              * scan_requests() for the previous epoch(s)
3887              * may have already added it to the list, since
3888              * it's not unlinked here.
3889              */
3890             if (list_empty(&lreq->scan_item))
3891                 list_add_tail(&lreq->scan_item, need_resend_linger);
3892             break;
3893         case CALC_TARGET_POOL_DNE:
3894             list_del_init(&lreq->scan_item);
3895             check_linger_pool_dne(lreq);
3896             break;
3897         }
3898     }
3899 
3900     for (n = rb_first(&osd->o_requests); n; ) {
3901         struct ceph_osd_request *req =
3902             rb_entry(n, struct ceph_osd_request, r_node);
3903         enum calc_target_result ct_res;
3904 
3905         n = rb_next(n); /* unlink_request(), check_pool_dne() */
3906 
3907         dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
3908         ct_res = calc_target(osdc, &req->r_t, false);
3909         switch (ct_res) {
3910         case CALC_TARGET_NO_ACTION:
3911             force_resend_writes = cleared_full ||
3912                 (check_pool_cleared_full &&
3913                  pool_cleared_full(osdc, req->r_t.base_oloc.pool));
3914             if (!force_resend &&
3915                 (!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
3916                  !force_resend_writes))
3917                 break;
3918 
3919             fallthrough;
3920         case CALC_TARGET_NEED_RESEND:
3921             cancel_map_check(req);
3922             unlink_request(osd, req);
3923             insert_request(need_resend, req);
3924             break;
3925         case CALC_TARGET_POOL_DNE:
3926             check_pool_dne(req);
3927             break;
3928         }
3929     }
3930 }
3931 
3932 static int handle_one_map(struct ceph_osd_client *osdc,
3933               void *p, void *end, bool incremental,
3934               struct rb_root *need_resend,
3935               struct list_head *need_resend_linger)
3936 {
3937     struct ceph_osdmap *newmap;
3938     struct rb_node *n;
3939     bool skipped_map = false;
3940     bool was_full;
3941 
3942     was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
3943     set_pool_was_full(osdc);
3944 
3945     if (incremental)
3946         newmap = osdmap_apply_incremental(&p, end,
3947                           ceph_msgr2(osdc->client),
3948                           osdc->osdmap);
3949     else
3950         newmap = ceph_osdmap_decode(&p, end, ceph_msgr2(osdc->client));
3951     if (IS_ERR(newmap))
3952         return PTR_ERR(newmap);
3953 
3954     if (newmap != osdc->osdmap) {
3955         /*
3956          * Preserve ->was_full before destroying the old map.
3957          * For pools that weren't in the old map, ->was_full
3958          * should be false.
3959          */
3960         for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
3961             struct ceph_pg_pool_info *pi =
3962                 rb_entry(n, struct ceph_pg_pool_info, node);
3963             struct ceph_pg_pool_info *old_pi;
3964 
3965             old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
3966             if (old_pi)
3967                 pi->was_full = old_pi->was_full;
3968             else
3969                 WARN_ON(pi->was_full);
3970         }
3971 
3972         if (osdc->osdmap->epoch &&
3973             osdc->osdmap->epoch + 1 < newmap->epoch) {
3974             WARN_ON(incremental);
3975             skipped_map = true;
3976         }
3977 
3978         ceph_osdmap_destroy(osdc->osdmap);
3979         osdc->osdmap = newmap;
3980     }
3981 
3982     was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
3983     scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
3984               need_resend, need_resend_linger);
3985 
3986     for (n = rb_first(&osdc->osds); n; ) {
3987         struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3988 
3989         n = rb_next(n); /* close_osd() */
3990 
3991         scan_requests(osd, skipped_map, was_full, true, need_resend,
3992                   need_resend_linger);
3993         if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
3994             memcmp(&osd->o_con.peer_addr,
3995                ceph_osd_addr(osdc->osdmap, osd->o_osd),
3996                sizeof(struct ceph_entity_addr)))
3997             close_osd(osd);
3998     }
3999 
4000     return 0;
4001 }
4002 
4003 static void kick_requests(struct ceph_osd_client *osdc,
4004               struct rb_root *need_resend,
4005               struct list_head *need_resend_linger)
4006 {
4007     struct ceph_osd_linger_request *lreq, *nlreq;
4008     enum calc_target_result ct_res;
4009     struct rb_node *n;
4010 
4011     /* make sure need_resend targets reflect latest map */
4012     for (n = rb_first(need_resend); n; ) {
4013         struct ceph_osd_request *req =
4014             rb_entry(n, struct ceph_osd_request, r_node);
4015 
4016         n = rb_next(n);
4017 
4018         if (req->r_t.epoch < osdc->osdmap->epoch) {
4019             ct_res = calc_target(osdc, &req->r_t, false);
4020             if (ct_res == CALC_TARGET_POOL_DNE) {
4021                 erase_request(need_resend, req);
4022                 check_pool_dne(req);
4023             }
4024         }
4025     }
4026 
4027     for (n = rb_first(need_resend); n; ) {
4028         struct ceph_osd_request *req =
4029             rb_entry(n, struct ceph_osd_request, r_node);
4030         struct ceph_osd *osd;
4031 
4032         n = rb_next(n);
4033         erase_request(need_resend, req); /* before link_request() */
4034 
4035         osd = lookup_create_osd(osdc, req->r_t.osd, true);
4036         link_request(osd, req);
4037         if (!req->r_linger) {
4038             if (!osd_homeless(osd) && !req->r_t.paused)
4039                 send_request(req);
4040         } else {
4041             cancel_linger_request(req);
4042         }
4043     }
4044 
4045     list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) {
4046         if (!osd_homeless(lreq->osd))
4047             send_linger(lreq);
4048 
4049         list_del_init(&lreq->scan_item);
4050     }
4051 }
4052 
4053 /*
4054  * Process updated osd map.
4055  *
4056  * The message contains any number of incremental and full maps, normally
4057  * indicating some sort of topology change in the cluster.  Kick requests
4058  * off to different OSDs as needed.
4059  */
4060 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
4061 {
4062     void *p = msg->front.iov_base;
4063     void *const end = p + msg->front.iov_len;
4064     u32 nr_maps, maplen;
4065     u32 epoch;
4066     struct ceph_fsid fsid;
4067     struct rb_root need_resend = RB_ROOT;
4068     LIST_HEAD(need_resend_linger);
4069     bool handled_incremental = false;
4070     bool was_pauserd, was_pausewr;
4071     bool pauserd, pausewr;
4072     int err;
4073 
4074     dout("%s have %u\n", __func__, osdc->osdmap->epoch);
4075     down_write(&osdc->lock);
4076 
4077     /* verify fsid */
4078     ceph_decode_need(&p, end, sizeof(fsid), bad);
4079     ceph_decode_copy(&p, &fsid, sizeof(fsid));
4080     if (ceph_check_fsid(osdc->client, &fsid) < 0)
4081         goto bad;
4082 
4083     was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
4084     was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
4085               ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
4086               have_pool_full(osdc);
4087 
4088     /* incremental maps */
4089     ceph_decode_32_safe(&p, end, nr_maps, bad);
4090     dout(" %d inc maps\n", nr_maps);
4091     while (nr_maps > 0) {
4092         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4093         epoch = ceph_decode_32(&p);
4094         maplen = ceph_decode_32(&p);
4095         ceph_decode_need(&p, end, maplen, bad);
4096         if (osdc->osdmap->epoch &&
4097             osdc->osdmap->epoch + 1 == epoch) {
4098             dout("applying incremental map %u len %d\n",
4099                  epoch, maplen);
4100             err = handle_one_map(osdc, p, p + maplen, true,
4101                          &need_resend, &need_resend_linger);
4102             if (err)
4103                 goto bad;
4104             handled_incremental = true;
4105         } else {
4106             dout("ignoring incremental map %u len %d\n",
4107                  epoch, maplen);
4108         }
4109         p += maplen;
4110         nr_maps--;
4111     }
4112     if (handled_incremental)
4113         goto done;
4114 
4115     /* full maps */
4116     ceph_decode_32_safe(&p, end, nr_maps, bad);
4117     dout(" %d full maps\n", nr_maps);
4118     while (nr_maps) {
4119         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4120         epoch = ceph_decode_32(&p);
4121         maplen = ceph_decode_32(&p);
4122         ceph_decode_need(&p, end, maplen, bad);
4123         if (nr_maps > 1) {
4124             dout("skipping non-latest full map %u len %d\n",
4125                  epoch, maplen);
4126         } else if (osdc->osdmap->epoch >= epoch) {
4127             dout("skipping full map %u len %d, "
4128                  "older than our %u\n", epoch, maplen,
4129                  osdc->osdmap->epoch);
4130         } else {
4131             dout("taking full map %u len %d\n", epoch, maplen);
4132             err = handle_one_map(osdc, p, p + maplen, false,
4133                          &need_resend, &need_resend_linger);
4134             if (err)
4135                 goto bad;
4136         }
4137         p += maplen;
4138         nr_maps--;
4139     }
4140 
4141 done:
4142     /*
4143      * subscribe to subsequent osdmap updates if full to ensure
4144      * we find out when we are no longer full and stop returning
4145      * ENOSPC.
4146      */
4147     pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
4148     pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
4149           ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
4150           have_pool_full(osdc);
4151     if (was_pauserd || was_pausewr || pauserd || pausewr ||
4152         osdc->osdmap->epoch < osdc->epoch_barrier)
4153         maybe_request_map(osdc);
4154 
4155     kick_requests(osdc, &need_resend, &need_resend_linger);
4156 
4157     ceph_osdc_abort_on_full(osdc);
4158     ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
4159               osdc->osdmap->epoch);
4160     up_write(&osdc->lock);
4161     wake_up_all(&osdc->client->auth_wq);
4162     return;
4163 
4164 bad:
4165     pr_err("osdc handle_map corrupt msg\n");
4166     ceph_msg_dump(msg);
4167     up_write(&osdc->lock);
4168 }
4169 
4170 /*
4171  * Resubmit requests pending on the given osd.
4172  */
4173 static void kick_osd_requests(struct ceph_osd *osd)
4174 {
4175     struct rb_node *n;
4176 
4177     clear_backoffs(osd);
4178 
4179     for (n = rb_first(&osd->o_requests); n; ) {
4180         struct ceph_osd_request *req =
4181             rb_entry(n, struct ceph_osd_request, r_node);
4182 
4183         n = rb_next(n); /* cancel_linger_request() */
4184 
4185         if (!req->r_linger) {
4186             if (!req->r_t.paused)
4187                 send_request(req);
4188         } else {
4189             cancel_linger_request(req);
4190         }
4191     }
4192     for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
4193         struct ceph_osd_linger_request *lreq =
4194             rb_entry(n, struct ceph_osd_linger_request, node);
4195 
4196         send_linger(lreq);
4197     }
4198 }
4199 
4200 /*
4201  * If the osd connection drops, we need to resubmit all requests.
4202  */
4203 static void osd_fault(struct ceph_connection *con)
4204 {
4205     struct ceph_osd *osd = con->private;
4206     struct ceph_osd_client *osdc = osd->o_osdc;
4207 
4208     dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
4209 
4210     down_write(&osdc->lock);
4211     if (!osd_registered(osd)) {
4212         dout("%s osd%d unknown\n", __func__, osd->o_osd);
4213         goto out_unlock;
4214     }
4215 
4216     if (!reopen_osd(osd))
4217         kick_osd_requests(osd);
4218     maybe_request_map(osdc);
4219 
4220 out_unlock:
4221     up_write(&osdc->lock);
4222 }
4223 
4224 struct MOSDBackoff {
4225     struct ceph_spg spgid;
4226     u32 map_epoch;
4227     u8 op;
4228     u64 id;
4229     struct ceph_hobject_id *begin;
4230     struct ceph_hobject_id *end;
4231 };
4232 
4233 static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m)
4234 {
4235     void *p = msg->front.iov_base;
4236     void *const end = p + msg->front.iov_len;
4237     u8 struct_v;
4238     u32 struct_len;
4239     int ret;
4240 
4241     ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len);
4242     if (ret)
4243         return ret;
4244 
4245     ret = ceph_decode_pgid(&p, end, &m->spgid.pgid);
4246     if (ret)
4247         return ret;
4248 
4249     ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval);
4250     ceph_decode_32_safe(&p, end, m->map_epoch, e_inval);
4251     ceph_decode_8_safe(&p, end, m->op, e_inval);
4252     ceph_decode_64_safe(&p, end, m->id, e_inval);
4253 
4254     m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO);
4255     if (!m->begin)
4256         return -ENOMEM;
4257 
4258     ret = decode_hoid(&p, end, m->begin);
4259     if (ret) {
4260         free_hoid(m->begin);
4261         return ret;
4262     }
4263 
4264     m->end = kzalloc(sizeof(*m->end), GFP_NOIO);
4265     if (!m->end) {
4266         free_hoid(m->begin);
4267         return -ENOMEM;
4268     }
4269 
4270     ret = decode_hoid(&p, end, m->end);
4271     if (ret) {
4272         free_hoid(m->begin);
4273         free_hoid(m->end);
4274         return ret;
4275     }
4276 
4277     return 0;
4278 
4279 e_inval:
4280     return -EINVAL;
4281 }
4282 
4283 static struct ceph_msg *create_backoff_message(
4284                 const struct ceph_osd_backoff *backoff,
4285                 u32 map_epoch)
4286 {
4287     struct ceph_msg *msg;
4288     void *p, *end;
4289     int msg_size;
4290 
4291     msg_size = CEPH_ENCODING_START_BLK_LEN +
4292             CEPH_PGID_ENCODING_LEN + 1; /* spgid */
4293     msg_size += 4 + 1 + 8; /* map_epoch, op, id */
4294     msg_size += CEPH_ENCODING_START_BLK_LEN +
4295             hoid_encoding_size(backoff->begin);
4296     msg_size += CEPH_ENCODING_START_BLK_LEN +
4297             hoid_encoding_size(backoff->end);
4298 
4299     msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true);
4300     if (!msg)
4301         return NULL;
4302 
4303     p = msg->front.iov_base;
4304     end = p + msg->front_alloc_len;
4305 
4306     encode_spgid(&p, &backoff->spgid);
4307     ceph_encode_32(&p, map_epoch);
4308     ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK);
4309     ceph_encode_64(&p, backoff->id);
4310     encode_hoid(&p, end, backoff->begin);
4311     encode_hoid(&p, end, backoff->end);
4312     BUG_ON(p != end);
4313 
4314     msg->front.iov_len = p - msg->front.iov_base;
4315     msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */
4316     msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
4317 
4318     return msg;
4319 }
4320 
4321 static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m)
4322 {
4323     struct ceph_spg_mapping *spg;
4324     struct ceph_osd_backoff *backoff;
4325     struct ceph_msg *msg;
4326 
4327     dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4328          m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4329 
4330     spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid);
4331     if (!spg) {
4332         spg = alloc_spg_mapping();
4333         if (!spg) {
4334             pr_err("%s failed to allocate spg\n", __func__);
4335             return;
4336         }
4337         spg->spgid = m->spgid; /* struct */
4338         insert_spg_mapping(&osd->o_backoff_mappings, spg);
4339     }
4340 
4341     backoff = alloc_backoff();
4342     if (!backoff) {
4343         pr_err("%s failed to allocate backoff\n", __func__);
4344         return;
4345     }
4346     backoff->spgid = m->spgid; /* struct */
4347     backoff->id = m->id;
4348     backoff->begin = m->begin;
4349     m->begin = NULL; /* backoff now owns this */
4350     backoff->end = m->end;
4351     m->end = NULL;   /* ditto */
4352 
4353     insert_backoff(&spg->backoffs, backoff);
4354     insert_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4355 
4356     /*
4357      * Ack with original backoff's epoch so that the OSD can
4358      * discard this if there was a PG split.
4359      */
4360     msg = create_backoff_message(backoff, m->map_epoch);
4361     if (!msg) {
4362         pr_err("%s failed to allocate msg\n", __func__);
4363         return;
4364     }
4365     ceph_con_send(&osd->o_con, msg);
4366 }
4367 
4368 static bool target_contained_by(const struct ceph_osd_request_target *t,
4369                 const struct ceph_hobject_id *begin,
4370                 const struct ceph_hobject_id *end)
4371 {
4372     struct ceph_hobject_id hoid;
4373     int cmp;
4374 
4375     hoid_fill_from_target(&hoid, t);
4376     cmp = hoid_compare(&hoid, begin);
4377     return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0);
4378 }
4379 
4380 static void handle_backoff_unblock(struct ceph_osd *osd,
4381                    const struct MOSDBackoff *m)
4382 {
4383     struct ceph_spg_mapping *spg;
4384     struct ceph_osd_backoff *backoff;
4385     struct rb_node *n;
4386 
4387     dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4388          m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4389 
4390     backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id);
4391     if (!backoff) {
4392         pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n",
4393                __func__, osd->o_osd, m->spgid.pgid.pool,
4394                m->spgid.pgid.seed, m->spgid.shard, m->id);
4395         return;
4396     }
4397 
4398     if (hoid_compare(backoff->begin, m->begin) &&
4399         hoid_compare(backoff->end, m->end)) {
4400         pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n",
4401                __func__, osd->o_osd, m->spgid.pgid.pool,
4402                m->spgid.pgid.seed, m->spgid.shard, m->id);
4403         /* unblock it anyway... */
4404     }
4405 
4406     spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid);
4407     BUG_ON(!spg);
4408 
4409     erase_backoff(&spg->backoffs, backoff);
4410     erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4411     free_backoff(backoff);
4412 
4413     if (RB_EMPTY_ROOT(&spg->backoffs)) {
4414         erase_spg_mapping(&osd->o_backoff_mappings, spg);
4415         free_spg_mapping(spg);
4416     }
4417 
4418     for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
4419         struct ceph_osd_request *req =
4420             rb_entry(n, struct ceph_osd_request, r_node);
4421 
4422         if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) {
4423             /*
4424              * Match against @m, not @backoff -- the PG may
4425              * have split on the OSD.
4426              */
4427             if (target_contained_by(&req->r_t, m->begin, m->end)) {
4428                 /*
4429                  * If no other installed backoff applies,
4430                  * resend.
4431                  */
4432                 send_request(req);
4433             }
4434         }
4435     }
4436 }
4437 
4438 static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg)
4439 {
4440     struct ceph_osd_client *osdc = osd->o_osdc;
4441     struct MOSDBackoff m;
4442     int ret;
4443 
4444     down_read(&osdc->lock);
4445     if (!osd_registered(osd)) {
4446         dout("%s osd%d unknown\n", __func__, osd->o_osd);
4447         up_read(&osdc->lock);
4448         return;
4449     }
4450     WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
4451 
4452     mutex_lock(&osd->lock);
4453     ret = decode_MOSDBackoff(msg, &m);
4454     if (ret) {
4455         pr_err("failed to decode MOSDBackoff: %d\n", ret);
4456         ceph_msg_dump(msg);
4457         goto out_unlock;
4458     }
4459 
4460     switch (m.op) {
4461     case CEPH_OSD_BACKOFF_OP_BLOCK:
4462         handle_backoff_block(osd, &m);
4463         break;
4464     case CEPH_OSD_BACKOFF_OP_UNBLOCK:
4465         handle_backoff_unblock(osd, &m);
4466         break;
4467     default:
4468         pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op);
4469     }
4470 
4471     free_hoid(m.begin);
4472     free_hoid(m.end);
4473 
4474 out_unlock:
4475     mutex_unlock(&osd->lock);
4476     up_read(&osdc->lock);
4477 }
4478 
4479 /*
4480  * Process osd watch notifications
4481  */
4482 static void handle_watch_notify(struct ceph_osd_client *osdc,
4483                 struct ceph_msg *msg)
4484 {
4485     void *p = msg->front.iov_base;
4486     void *const end = p + msg->front.iov_len;
4487     struct ceph_osd_linger_request *lreq;
4488     struct linger_work *lwork;
4489     u8 proto_ver, opcode;
4490     u64 cookie, notify_id;
4491     u64 notifier_id = 0;
4492     s32 return_code = 0;
4493     void *payload = NULL;
4494     u32 payload_len = 0;
4495 
4496     ceph_decode_8_safe(&p, end, proto_ver, bad);
4497     ceph_decode_8_safe(&p, end, opcode, bad);
4498     ceph_decode_64_safe(&p, end, cookie, bad);
4499     p += 8; /* skip ver */
4500     ceph_decode_64_safe(&p, end, notify_id, bad);
4501 
4502     if (proto_ver >= 1) {
4503         ceph_decode_32_safe(&p, end, payload_len, bad);
4504         ceph_decode_need(&p, end, payload_len, bad);
4505         payload = p;
4506         p += payload_len;
4507     }
4508 
4509     if (le16_to_cpu(msg->hdr.version) >= 2)
4510         ceph_decode_32_safe(&p, end, return_code, bad);
4511 
4512     if (le16_to_cpu(msg->hdr.version) >= 3)
4513         ceph_decode_64_safe(&p, end, notifier_id, bad);
4514 
4515     down_read(&osdc->lock);
4516     lreq = lookup_linger_osdc(&osdc->linger_requests, cookie);
4517     if (!lreq) {
4518         dout("%s opcode %d cookie %llu dne\n", __func__, opcode,
4519              cookie);
4520         goto out_unlock_osdc;
4521     }
4522 
4523     mutex_lock(&lreq->lock);
4524     dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__,
4525          opcode, cookie, lreq, lreq->is_watch);
4526     if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
4527         if (!lreq->last_error) {
4528             lreq->last_error = -ENOTCONN;
4529             queue_watch_error(lreq);
4530         }
4531     } else if (!lreq->is_watch) {
4532         /* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */
4533         if (lreq->notify_id && lreq->notify_id != notify_id) {
4534             dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq,
4535                  lreq->notify_id, notify_id);
4536         } else if (!completion_done(&lreq->notify_finish_wait)) {
4537             struct ceph_msg_data *data =
4538                 msg->num_data_items ? &msg->data[0] : NULL;
4539 
4540             if (data) {
4541                 if (lreq->preply_pages) {
4542                     WARN_ON(data->type !=
4543                             CEPH_MSG_DATA_PAGES);
4544                     *lreq->preply_pages = data->pages;
4545                     *lreq->preply_len = data->length;
4546                     data->own_pages = false;
4547                 }
4548             }
4549             lreq->notify_finish_error = return_code;
4550             complete_all(&lreq->notify_finish_wait);
4551         }
4552     } else {
4553         /* CEPH_WATCH_EVENT_NOTIFY */
4554         lwork = lwork_alloc(lreq, do_watch_notify);
4555         if (!lwork) {
4556             pr_err("failed to allocate notify-lwork\n");
4557             goto out_unlock_lreq;
4558         }
4559 
4560         lwork->notify.notify_id = notify_id;
4561         lwork->notify.notifier_id = notifier_id;
4562         lwork->notify.payload = payload;
4563         lwork->notify.payload_len = payload_len;
4564         lwork->notify.msg = ceph_msg_get(msg);
4565         lwork_queue(lwork);
4566     }
4567 
4568 out_unlock_lreq:
4569     mutex_unlock(&lreq->lock);
4570 out_unlock_osdc:
4571     up_read(&osdc->lock);
4572     return;
4573 
4574 bad:
4575     pr_err("osdc handle_watch_notify corrupt msg\n");
4576 }
4577 
4578 /*
4579  * Register request, send initial attempt.
4580  */
4581 void ceph_osdc_start_request(struct ceph_osd_client *osdc,
4582                  struct ceph_osd_request *req)
4583 {
4584     down_read(&osdc->lock);
4585     submit_request(req, false);
4586     up_read(&osdc->lock);
4587 }
4588 EXPORT_SYMBOL(ceph_osdc_start_request);
4589 
4590 /*
4591  * Unregister request.  If @req was registered, it isn't completed:
4592  * r_result isn't set and __complete_request() isn't invoked.
4593  *
4594  * If @req wasn't registered, this call may have raced with
4595  * handle_reply(), in which case r_result would already be set and
4596  * __complete_request() would be getting invoked, possibly even
4597  * concurrently with this call.
4598  */
4599 void ceph_osdc_cancel_request(struct ceph_osd_request *req)
4600 {
4601     struct ceph_osd_client *osdc = req->r_osdc;
4602 
4603     down_write(&osdc->lock);
4604     if (req->r_osd)
4605         cancel_request(req);
4606     up_write(&osdc->lock);
4607 }
4608 EXPORT_SYMBOL(ceph_osdc_cancel_request);
4609 
4610 /*
4611  * @timeout: in jiffies, 0 means "wait forever"
4612  */
4613 static int wait_request_timeout(struct ceph_osd_request *req,
4614                 unsigned long timeout)
4615 {
4616     long left;
4617 
4618     dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
4619     left = wait_for_completion_killable_timeout(&req->r_completion,
4620                         ceph_timeout_jiffies(timeout));
4621     if (left <= 0) {
4622         left = left ?: -ETIMEDOUT;
4623         ceph_osdc_cancel_request(req);
4624     } else {
4625         left = req->r_result; /* completed */
4626     }
4627 
4628     return left;
4629 }
4630 
4631 /*
4632  * wait for a request to complete
4633  */
4634 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
4635                struct ceph_osd_request *req)
4636 {
4637     return wait_request_timeout(req, 0);
4638 }
4639 EXPORT_SYMBOL(ceph_osdc_wait_request);
4640 
4641 /*
4642  * sync - wait for all in-flight requests to flush.  avoid starvation.
4643  */
4644 void ceph_osdc_sync(struct ceph_osd_client *osdc)
4645 {
4646     struct rb_node *n, *p;
4647     u64 last_tid = atomic64_read(&osdc->last_tid);
4648 
4649 again:
4650     down_read(&osdc->lock);
4651     for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
4652         struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
4653 
4654         mutex_lock(&osd->lock);
4655         for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
4656             struct ceph_osd_request *req =
4657                 rb_entry(p, struct ceph_osd_request, r_node);
4658 
4659             if (req->r_tid > last_tid)
4660                 break;
4661 
4662             if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
4663                 continue;
4664 
4665             ceph_osdc_get_request(req);
4666             mutex_unlock(&osd->lock);
4667             up_read(&osdc->lock);
4668             dout("%s waiting on req %p tid %llu last_tid %llu\n",
4669                  __func__, req, req->r_tid, last_tid);
4670             wait_for_completion(&req->r_completion);
4671             ceph_osdc_put_request(req);
4672             goto again;
4673         }
4674 
4675         mutex_unlock(&osd->lock);
4676     }
4677 
4678     up_read(&osdc->lock);
4679     dout("%s done last_tid %llu\n", __func__, last_tid);
4680 }
4681 EXPORT_SYMBOL(ceph_osdc_sync);
4682 
4683 /*
4684  * Returns a handle, caller owns a ref.
4685  */
4686 struct ceph_osd_linger_request *
4687 ceph_osdc_watch(struct ceph_osd_client *osdc,
4688         struct ceph_object_id *oid,
4689         struct ceph_object_locator *oloc,
4690         rados_watchcb2_t wcb,
4691         rados_watcherrcb_t errcb,
4692         void *data)
4693 {
4694     struct ceph_osd_linger_request *lreq;
4695     int ret;
4696 
4697     lreq = linger_alloc(osdc);
4698     if (!lreq)
4699         return ERR_PTR(-ENOMEM);
4700 
4701     lreq->is_watch = true;
4702     lreq->wcb = wcb;
4703     lreq->errcb = errcb;
4704     lreq->data = data;
4705     lreq->watch_valid_thru = jiffies;
4706 
4707     ceph_oid_copy(&lreq->t.base_oid, oid);
4708     ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4709     lreq->t.flags = CEPH_OSD_FLAG_WRITE;
4710     ktime_get_real_ts64(&lreq->mtime);
4711 
4712     linger_submit(lreq);
4713     ret = linger_reg_commit_wait(lreq);
4714     if (ret) {
4715         linger_cancel(lreq);
4716         goto err_put_lreq;
4717     }
4718 
4719     return lreq;
4720 
4721 err_put_lreq:
4722     linger_put(lreq);
4723     return ERR_PTR(ret);
4724 }
4725 EXPORT_SYMBOL(ceph_osdc_watch);
4726 
4727 /*
4728  * Releases a ref.
4729  *
4730  * Times out after mount_timeout to preserve rbd unmap behaviour
4731  * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap
4732  * with mount_timeout").
4733  */
4734 int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
4735               struct ceph_osd_linger_request *lreq)
4736 {
4737     struct ceph_options *opts = osdc->client->options;
4738     struct ceph_osd_request *req;
4739     int ret;
4740 
4741     req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4742     if (!req)
4743         return -ENOMEM;
4744 
4745     ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
4746     ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
4747     req->r_flags = CEPH_OSD_FLAG_WRITE;
4748     ktime_get_real_ts64(&req->r_mtime);
4749     osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_UNWATCH,
4750                   lreq->linger_id, 0);
4751 
4752     ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4753     if (ret)
4754         goto out_put_req;
4755 
4756     ceph_osdc_start_request(osdc, req);
4757     linger_cancel(lreq);
4758     linger_put(lreq);
4759     ret = wait_request_timeout(req, opts->mount_timeout);
4760 
4761 out_put_req:
4762     ceph_osdc_put_request(req);
4763     return ret;
4764 }
4765 EXPORT_SYMBOL(ceph_osdc_unwatch);
4766 
4767 static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
4768                       u64 notify_id, u64 cookie, void *payload,
4769                       u32 payload_len)
4770 {
4771     struct ceph_osd_req_op *op;
4772     struct ceph_pagelist *pl;
4773     int ret;
4774 
4775     op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
4776 
4777     pl = ceph_pagelist_alloc(GFP_NOIO);
4778     if (!pl)
4779         return -ENOMEM;
4780 
4781     ret = ceph_pagelist_encode_64(pl, notify_id);
4782     ret |= ceph_pagelist_encode_64(pl, cookie);
4783     if (payload) {
4784         ret |= ceph_pagelist_encode_32(pl, payload_len);
4785         ret |= ceph_pagelist_append(pl, payload, payload_len);
4786     } else {
4787         ret |= ceph_pagelist_encode_32(pl, 0);
4788     }
4789     if (ret) {
4790         ceph_pagelist_release(pl);
4791         return -ENOMEM;
4792     }
4793 
4794     ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl);
4795     op->indata_len = pl->length;
4796     return 0;
4797 }
4798 
4799 int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
4800              struct ceph_object_id *oid,
4801              struct ceph_object_locator *oloc,
4802              u64 notify_id,
4803              u64 cookie,
4804              void *payload,
4805              u32 payload_len)
4806 {
4807     struct ceph_osd_request *req;
4808     int ret;
4809 
4810     req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4811     if (!req)
4812         return -ENOMEM;
4813 
4814     ceph_oid_copy(&req->r_base_oid, oid);
4815     ceph_oloc_copy(&req->r_base_oloc, oloc);
4816     req->r_flags = CEPH_OSD_FLAG_READ;
4817 
4818     ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
4819                      payload_len);
4820     if (ret)
4821         goto out_put_req;
4822 
4823     ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4824     if (ret)
4825         goto out_put_req;
4826 
4827     ceph_osdc_start_request(osdc, req);
4828     ret = ceph_osdc_wait_request(osdc, req);
4829 
4830 out_put_req:
4831     ceph_osdc_put_request(req);
4832     return ret;
4833 }
4834 EXPORT_SYMBOL(ceph_osdc_notify_ack);
4835 
4836 /*
4837  * @timeout: in seconds
4838  *
4839  * @preply_{pages,len} are initialized both on success and error.
4840  * The caller is responsible for:
4841  *
4842  *     ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
4843  */
4844 int ceph_osdc_notify(struct ceph_osd_client *osdc,
4845              struct ceph_object_id *oid,
4846              struct ceph_object_locator *oloc,
4847              void *payload,
4848              u32 payload_len,
4849              u32 timeout,
4850              struct page ***preply_pages,
4851              size_t *preply_len)
4852 {
4853     struct ceph_osd_linger_request *lreq;
4854     int ret;
4855 
4856     WARN_ON(!timeout);
4857     if (preply_pages) {
4858         *preply_pages = NULL;
4859         *preply_len = 0;
4860     }
4861 
4862     lreq = linger_alloc(osdc);
4863     if (!lreq)
4864         return -ENOMEM;
4865 
4866     lreq->request_pl = ceph_pagelist_alloc(GFP_NOIO);
4867     if (!lreq->request_pl) {
4868         ret = -ENOMEM;
4869         goto out_put_lreq;
4870     }
4871 
4872     ret = ceph_pagelist_encode_32(lreq->request_pl, 1); /* prot_ver */
4873     ret |= ceph_pagelist_encode_32(lreq->request_pl, timeout);
4874     ret |= ceph_pagelist_encode_32(lreq->request_pl, payload_len);
4875     ret |= ceph_pagelist_append(lreq->request_pl, payload, payload_len);
4876     if (ret) {
4877         ret = -ENOMEM;
4878         goto out_put_lreq;
4879     }
4880 
4881     /* for notify_id */
4882     lreq->notify_id_pages = ceph_alloc_page_vector(1, GFP_NOIO);
4883     if (IS_ERR(lreq->notify_id_pages)) {
4884         ret = PTR_ERR(lreq->notify_id_pages);
4885         lreq->notify_id_pages = NULL;
4886         goto out_put_lreq;
4887     }
4888 
4889     lreq->preply_pages = preply_pages;
4890     lreq->preply_len = preply_len;
4891 
4892     ceph_oid_copy(&lreq->t.base_oid, oid);
4893     ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4894     lreq->t.flags = CEPH_OSD_FLAG_READ;
4895 
4896     linger_submit(lreq);
4897     ret = linger_reg_commit_wait(lreq);
4898     if (!ret)
4899         ret = linger_notify_finish_wait(lreq);
4900     else
4901         dout("lreq %p failed to initiate notify %d\n", lreq, ret);
4902 
4903     linger_cancel(lreq);
4904 out_put_lreq:
4905     linger_put(lreq);
4906     return ret;
4907 }
4908 EXPORT_SYMBOL(ceph_osdc_notify);
4909 
4910 /*
4911  * Return the number of milliseconds since the watch was last
4912  * confirmed, or an error.  If there is an error, the watch is no
4913  * longer valid, and should be destroyed with ceph_osdc_unwatch().
4914  */
4915 int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
4916               struct ceph_osd_linger_request *lreq)
4917 {
4918     unsigned long stamp, age;
4919     int ret;
4920 
4921     down_read(&osdc->lock);
4922     mutex_lock(&lreq->lock);
4923     stamp = lreq->watch_valid_thru;
4924     if (!list_empty(&lreq->pending_lworks)) {
4925         struct linger_work *lwork =
4926             list_first_entry(&lreq->pending_lworks,
4927                      struct linger_work,
4928                      pending_item);
4929 
4930         if (time_before(lwork->queued_stamp, stamp))
4931             stamp = lwork->queued_stamp;
4932     }
4933     age = jiffies - stamp;
4934     dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__,
4935          lreq, lreq->linger_id, age, lreq->last_error);
4936     /* we are truncating to msecs, so return a safe upper bound */
4937     ret = lreq->last_error ?: 1 + jiffies_to_msecs(age);
4938 
4939     mutex_unlock(&lreq->lock);
4940     up_read(&osdc->lock);
4941     return ret;
4942 }
4943 
4944 static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
4945 {
4946     u8 struct_v;
4947     u32 struct_len;
4948     int ret;
4949 
4950     ret = ceph_start_decoding(p, end, 2, "watch_item_t",
4951                   &struct_v, &struct_len);
4952     if (ret)
4953         goto bad;
4954 
4955     ret = -EINVAL;
4956     ceph_decode_copy_safe(p, end, &item->name, sizeof(item->name), bad);
4957     ceph_decode_64_safe(p, end, item->cookie, bad);
4958     ceph_decode_skip_32(p, end, bad); /* skip timeout seconds */
4959 
4960     if (struct_v >= 2) {
4961         ret = ceph_decode_entity_addr(p, end, &item->addr);
4962         if (ret)
4963             goto bad;
4964     } else {
4965         ret = 0;
4966     }
4967 
4968     dout("%s %s%llu cookie %llu addr %s\n", __func__,
4969          ENTITY_NAME(item->name), item->cookie,
4970          ceph_pr_addr(&item->addr));
4971 bad:
4972     return ret;
4973 }
4974 
4975 static int decode_watchers(void **p, void *end,
4976                struct ceph_watch_item **watchers,
4977                u32 *num_watchers)
4978 {
4979     u8 struct_v;
4980     u32 struct_len;
4981     int i;
4982     int ret;
4983 
4984     ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
4985                   &struct_v, &struct_len);
4986     if (ret)
4987         return ret;
4988 
4989     *num_watchers = ceph_decode_32(p);
4990     *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
4991     if (!*watchers)
4992         return -ENOMEM;
4993 
4994     for (i = 0; i < *num_watchers; i++) {
4995         ret = decode_watcher(p, end, *watchers + i);
4996         if (ret) {
4997             kfree(*watchers);
4998             return ret;
4999         }
5000     }
5001 
5002     return 0;
5003 }
5004 
5005 /*
5006  * On success, the caller is responsible for:
5007  *
5008  *     kfree(watchers);
5009  */
5010 int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
5011                 struct ceph_object_id *oid,
5012                 struct ceph_object_locator *oloc,
5013                 struct ceph_watch_item **watchers,
5014                 u32 *num_watchers)
5015 {
5016     struct ceph_osd_request *req;
5017     struct page **pages;
5018     int ret;
5019 
5020     req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
5021     if (!req)
5022         return -ENOMEM;
5023 
5024     ceph_oid_copy(&req->r_base_oid, oid);
5025     ceph_oloc_copy(&req->r_base_oloc, oloc);
5026     req->r_flags = CEPH_OSD_FLAG_READ;
5027 
5028     pages = ceph_alloc_page_vector(1, GFP_NOIO);
5029     if (IS_ERR(pages)) {
5030         ret = PTR_ERR(pages);
5031         goto out_put_req;
5032     }
5033 
5034     osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
5035     ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
5036                          response_data),
5037                  pages, PAGE_SIZE, 0, false, true);
5038 
5039     ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
5040     if (ret)
5041         goto out_put_req;
5042 
5043     ceph_osdc_start_request(osdc, req);
5044     ret = ceph_osdc_wait_request(osdc, req);
5045     if (ret >= 0) {
5046         void *p = page_address(pages[0]);
5047         void *const end = p + req->r_ops[0].outdata_len;
5048 
5049         ret = decode_watchers(&p, end, watchers, num_watchers);
5050     }
5051 
5052 out_put_req:
5053     ceph_osdc_put_request(req);
5054     return ret;
5055 }
5056 EXPORT_SYMBOL(ceph_osdc_list_watchers);
5057 
5058 /*
5059  * Call all pending notify callbacks - for use after a watch is
5060  * unregistered, to make sure no more callbacks for it will be invoked
5061  */
5062 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
5063 {
5064     dout("%s osdc %p\n", __func__, osdc);
5065     flush_workqueue(osdc->notify_wq);
5066 }
5067 EXPORT_SYMBOL(ceph_osdc_flush_notifies);
5068 
5069 void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
5070 {
5071     down_read(&osdc->lock);
5072     maybe_request_map(osdc);
5073     up_read(&osdc->lock);
5074 }
5075 EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
5076 
5077 /*
5078  * Execute an OSD class method on an object.
5079  *
5080  * @flags: CEPH_OSD_FLAG_*
5081  * @resp_len: in/out param for reply length
5082  */
5083 int ceph_osdc_call(struct ceph_osd_client *osdc,
5084            struct ceph_object_id *oid,
5085            struct ceph_object_locator *oloc,
5086            const char *class, const char *method,
5087            unsigned int flags,
5088            struct page *req_page, size_t req_len,
5089            struct page **resp_pages, size_t *resp_len)
5090 {
5091     struct ceph_osd_request *req;
5092     int ret;
5093 
5094     if (req_len > PAGE_SIZE)
5095         return -E2BIG;
5096 
5097     req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
5098     if (!req)
5099         return -ENOMEM;
5100 
5101     ceph_oid_copy(&req->r_base_oid, oid);
5102     ceph_oloc_copy(&req->r_base_oloc, oloc);
5103     req->r_flags = flags;
5104 
5105     ret = osd_req_op_cls_init(req, 0, class, method);
5106     if (ret)
5107         goto out_put_req;
5108 
5109     if (req_page)
5110         osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
5111                           0, false, false);
5112     if (resp_pages)
5113         osd_req_op_cls_response_data_pages(req, 0, resp_pages,
5114                            *resp_len, 0, false, false);
5115 
5116     ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
5117     if (ret)
5118         goto out_put_req;
5119 
5120     ceph_osdc_start_request(osdc, req);
5121     ret = ceph_osdc_wait_request(osdc, req);
5122     if (ret >= 0) {
5123         ret = req->r_ops[0].rval;
5124         if (resp_pages)
5125             *resp_len = req->r_ops[0].outdata_len;
5126     }
5127 
5128 out_put_req:
5129     ceph_osdc_put_request(req);
5130     return ret;
5131 }
5132 EXPORT_SYMBOL(ceph_osdc_call);
5133 
5134 /*
5135  * reset all osd connections
5136  */
5137 void ceph_osdc_reopen_osds(struct ceph_osd_client *osdc)
5138 {
5139     struct rb_node *n;
5140 
5141     down_write(&osdc->lock);
5142     for (n = rb_first(&osdc->osds); n; ) {
5143         struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
5144 
5145         n = rb_next(n);
5146         if (!reopen_osd(osd))
5147             kick_osd_requests(osd);
5148     }
5149     up_write(&osdc->lock);
5150 }
5151 
5152 /*
5153  * init, shutdown
5154  */
5155 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
5156 {
5157     int err;
5158 
5159     dout("init\n");
5160     osdc->client = client;
5161     init_rwsem(&osdc->lock);
5162     osdc->osds = RB_ROOT;
5163     INIT_LIST_HEAD(&osdc->osd_lru);
5164     spin_lock_init(&osdc->osd_lru_lock);
5165     osd_init(&osdc->homeless_osd);
5166     osdc->homeless_osd.o_osdc = osdc;
5167     osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
5168     osdc->last_linger_id = CEPH_LINGER_ID_START;
5169     osdc->linger_requests = RB_ROOT;
5170     osdc->map_checks = RB_ROOT;
5171     osdc->linger_map_checks = RB_ROOT;
5172     INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
5173     INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
5174 
5175     err = -ENOMEM;
5176     osdc->osdmap = ceph_osdmap_alloc();
5177     if (!osdc->osdmap)
5178         goto out;
5179 
5180     osdc->req_mempool = mempool_create_slab_pool(10,
5181                              ceph_osd_request_cache);
5182     if (!osdc->req_mempool)
5183         goto out_map;
5184 
5185     err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
5186                 PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op");
5187     if (err < 0)
5188         goto out_mempool;
5189     err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
5190                 PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10,
5191                 "osd_op_reply");
5192     if (err < 0)
5193         goto out_msgpool;
5194 
5195     err = -ENOMEM;
5196     osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
5197     if (!osdc->notify_wq)
5198         goto out_msgpool_reply;
5199 
5200     osdc->completion_wq = create_singlethread_workqueue("ceph-completion");
5201     if (!osdc->completion_wq)
5202         goto out_notify_wq;
5203 
5204     schedule_delayed_work(&osdc->timeout_work,
5205                   osdc->client->options->osd_keepalive_timeout);
5206     schedule_delayed_work(&osdc->osds_timeout_work,
5207         round_jiffies_relative(osdc->client->options->osd_idle_ttl));
5208 
5209     return 0;
5210 
5211 out_notify_wq:
5212     destroy_workqueue(osdc->notify_wq);
5213 out_msgpool_reply:
5214     ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5215 out_msgpool:
5216     ceph_msgpool_destroy(&osdc->msgpool_op);
5217 out_mempool:
5218     mempool_destroy(osdc->req_mempool);
5219 out_map:
5220     ceph_osdmap_destroy(osdc->osdmap);
5221 out:
5222     return err;
5223 }
5224 
5225 void ceph_osdc_stop(struct ceph_osd_client *osdc)
5226 {
5227     destroy_workqueue(osdc->completion_wq);
5228     destroy_workqueue(osdc->notify_wq);
5229     cancel_delayed_work_sync(&osdc->timeout_work);
5230     cancel_delayed_work_sync(&osdc->osds_timeout_work);
5231 
5232     down_write(&osdc->lock);
5233     while (!RB_EMPTY_ROOT(&osdc->osds)) {
5234         struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
5235                         struct ceph_osd, o_node);
5236         close_osd(osd);
5237     }
5238     up_write(&osdc->lock);
5239     WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1);
5240     osd_cleanup(&osdc->homeless_osd);
5241 
5242     WARN_ON(!list_empty(&osdc->osd_lru));
5243     WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
5244     WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks));
5245     WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks));
5246     WARN_ON(atomic_read(&osdc->num_requests));
5247     WARN_ON(atomic_read(&osdc->num_homeless));
5248 
5249     ceph_osdmap_destroy(osdc->osdmap);
5250     mempool_destroy(osdc->req_mempool);
5251     ceph_msgpool_destroy(&osdc->msgpool_op);
5252     ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5253 }
5254 
5255 int osd_req_op_copy_from_init(struct ceph_osd_request *req,
5256                   u64 src_snapid, u64 src_version,
5257                   struct ceph_object_id *src_oid,
5258                   struct ceph_object_locator *src_oloc,
5259                   u32 src_fadvise_flags,
5260                   u32 dst_fadvise_flags,
5261                   u32 truncate_seq, u64 truncate_size,
5262                   u8 copy_from_flags)
5263 {
5264     struct ceph_osd_req_op *op;
5265     struct page **pages;
5266     void *p, *end;
5267 
5268     pages = ceph_alloc_page_vector(1, GFP_KERNEL);
5269     if (IS_ERR(pages))
5270         return PTR_ERR(pages);
5271 
5272     op = osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM2,
5273                  dst_fadvise_flags);
5274     op->copy_from.snapid = src_snapid;
5275     op->copy_from.src_version = src_version;
5276     op->copy_from.flags = copy_from_flags;
5277     op->copy_from.src_fadvise_flags = src_fadvise_flags;
5278 
5279     p = page_address(pages[0]);
5280     end = p + PAGE_SIZE;
5281     ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
5282     encode_oloc(&p, end, src_oloc);
5283     ceph_encode_32(&p, truncate_seq);
5284     ceph_encode_64(&p, truncate_size);
5285     op->indata_len = PAGE_SIZE - (end - p);
5286 
5287     ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
5288                  op->indata_len, 0, false, true);
5289     return 0;
5290 }
5291 EXPORT_SYMBOL(osd_req_op_copy_from_init);
5292 
5293 int __init ceph_osdc_setup(void)
5294 {
5295     size_t size = sizeof(struct ceph_osd_request) +
5296         CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
5297 
5298     BUG_ON(ceph_osd_request_cache);
5299     ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
5300                            0, 0, NULL);
5301 
5302     return ceph_osd_request_cache ? 0 : -ENOMEM;
5303 }
5304 
5305 void ceph_osdc_cleanup(void)
5306 {
5307     BUG_ON(!ceph_osd_request_cache);
5308     kmem_cache_destroy(ceph_osd_request_cache);
5309     ceph_osd_request_cache = NULL;
5310 }
5311 
5312 /*
5313  * handle incoming message
5314  */
5315 static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5316 {
5317     struct ceph_osd *osd = con->private;
5318     struct ceph_osd_client *osdc = osd->o_osdc;
5319     int type = le16_to_cpu(msg->hdr.type);
5320 
5321     switch (type) {
5322     case CEPH_MSG_OSD_MAP:
5323         ceph_osdc_handle_map(osdc, msg);
5324         break;
5325     case CEPH_MSG_OSD_OPREPLY:
5326         handle_reply(osd, msg);
5327         break;
5328     case CEPH_MSG_OSD_BACKOFF:
5329         handle_backoff(osd, msg);
5330         break;
5331     case CEPH_MSG_WATCH_NOTIFY:
5332         handle_watch_notify(osdc, msg);
5333         break;
5334 
5335     default:
5336         pr_err("received unknown message type %d %s\n", type,
5337                ceph_msg_type_name(type));
5338     }
5339 
5340     ceph_msg_put(msg);
5341 }
5342 
5343 /*
5344  * Lookup and return message for incoming reply.  Don't try to do
5345  * anything about a larger than preallocated data portion of the
5346  * message at the moment - for now, just skip the message.
5347  */
5348 static struct ceph_msg *get_reply(struct ceph_connection *con,
5349                   struct ceph_msg_header *hdr,
5350                   int *skip)
5351 {
5352     struct ceph_osd *osd = con->private;
5353     struct ceph_osd_client *osdc = osd->o_osdc;
5354     struct ceph_msg *m = NULL;
5355     struct ceph_osd_request *req;
5356     int front_len = le32_to_cpu(hdr->front_len);
5357     int data_len = le32_to_cpu(hdr->data_len);
5358     u64 tid = le64_to_cpu(hdr->tid);
5359 
5360     down_read(&osdc->lock);
5361     if (!osd_registered(osd)) {
5362         dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
5363         *skip = 1;
5364         goto out_unlock_osdc;
5365     }
5366     WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
5367 
5368     mutex_lock(&osd->lock);
5369     req = lookup_request(&osd->o_requests, tid);
5370     if (!req) {
5371         dout("%s osd%d tid %llu unknown, skipping\n", __func__,
5372              osd->o_osd, tid);
5373         *skip = 1;
5374         goto out_unlock_session;
5375     }
5376 
5377     ceph_msg_revoke_incoming(req->r_reply);
5378 
5379     if (front_len > req->r_reply->front_alloc_len) {
5380         pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
5381             __func__, osd->o_osd, req->r_tid, front_len,
5382             req->r_reply->front_alloc_len);
5383         m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
5384                  false);
5385         if (!m)
5386             goto out_unlock_session;
5387         ceph_msg_put(req->r_reply);
5388         req->r_reply = m;
5389     }
5390 
5391     if (data_len > req->r_reply->data_length) {
5392         pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
5393             __func__, osd->o_osd, req->r_tid, data_len,
5394             req->r_reply->data_length);
5395         m = NULL;
5396         *skip = 1;
5397         goto out_unlock_session;
5398     }
5399 
5400     m = ceph_msg_get(req->r_reply);
5401     dout("get_reply tid %lld %p\n", tid, m);
5402 
5403 out_unlock_session:
5404     mutex_unlock(&osd->lock);
5405 out_unlock_osdc:
5406     up_read(&osdc->lock);
5407     return m;
5408 }
5409 
5410 static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
5411 {
5412     struct ceph_msg *m;
5413     int type = le16_to_cpu(hdr->type);
5414     u32 front_len = le32_to_cpu(hdr->front_len);
5415     u32 data_len = le32_to_cpu(hdr->data_len);
5416 
5417     m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false);
5418     if (!m)
5419         return NULL;
5420 
5421     if (data_len) {
5422         struct page **pages;
5423 
5424         pages = ceph_alloc_page_vector(calc_pages_for(0, data_len),
5425                            GFP_NOIO);
5426         if (IS_ERR(pages)) {
5427             ceph_msg_put(m);
5428             return NULL;
5429         }
5430 
5431         ceph_msg_data_add_pages(m, pages, data_len, 0, true);
5432     }
5433 
5434     return m;
5435 }
5436 
5437 static struct ceph_msg *osd_alloc_msg(struct ceph_connection *con,
5438                       struct ceph_msg_header *hdr,
5439                       int *skip)
5440 {
5441     struct ceph_osd *osd = con->private;
5442     int type = le16_to_cpu(hdr->type);
5443 
5444     *skip = 0;
5445     switch (type) {
5446     case CEPH_MSG_OSD_MAP:
5447     case CEPH_MSG_OSD_BACKOFF:
5448     case CEPH_MSG_WATCH_NOTIFY:
5449         return alloc_msg_with_page_vector(hdr);
5450     case CEPH_MSG_OSD_OPREPLY:
5451         return get_reply(con, hdr, skip);
5452     default:
5453         pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
5454             osd->o_osd, type);
5455         *skip = 1;
5456         return NULL;
5457     }
5458 }
5459 
5460 /*
5461  * Wrappers to refcount containing ceph_osd struct
5462  */
5463 static struct ceph_connection *osd_get_con(struct ceph_connection *con)
5464 {
5465     struct ceph_osd *osd = con->private;
5466     if (get_osd(osd))
5467         return con;
5468     return NULL;
5469 }
5470 
5471 static void osd_put_con(struct ceph_connection *con)
5472 {
5473     struct ceph_osd *osd = con->private;
5474     put_osd(osd);
5475 }
5476 
5477 /*
5478  * authentication
5479  */
5480 
5481 /*
5482  * Note: returned pointer is the address of a structure that's
5483  * managed separately.  Caller must *not* attempt to free it.
5484  */
5485 static struct ceph_auth_handshake *
5486 osd_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
5487 {
5488     struct ceph_osd *o = con->private;
5489     struct ceph_osd_client *osdc = o->o_osdc;
5490     struct ceph_auth_client *ac = osdc->client->monc.auth;
5491     struct ceph_auth_handshake *auth = &o->o_auth;
5492     int ret;
5493 
5494     ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_OSD,
5495                      force_new, proto, NULL, NULL);
5496     if (ret)
5497         return ERR_PTR(ret);
5498 
5499     return auth;
5500 }
5501 
5502 static int osd_add_authorizer_challenge(struct ceph_connection *con,
5503                     void *challenge_buf, int challenge_buf_len)
5504 {
5505     struct ceph_osd *o = con->private;
5506     struct ceph_osd_client *osdc = o->o_osdc;
5507     struct ceph_auth_client *ac = osdc->client->monc.auth;
5508 
5509     return ceph_auth_add_authorizer_challenge(ac, o->o_auth.authorizer,
5510                         challenge_buf, challenge_buf_len);
5511 }
5512 
5513 static int osd_verify_authorizer_reply(struct ceph_connection *con)
5514 {
5515     struct ceph_osd *o = con->private;
5516     struct ceph_osd_client *osdc = o->o_osdc;
5517     struct ceph_auth_client *ac = osdc->client->monc.auth;
5518     struct ceph_auth_handshake *auth = &o->o_auth;
5519 
5520     return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
5521         auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
5522         NULL, NULL, NULL, NULL);
5523 }
5524 
5525 static int osd_invalidate_authorizer(struct ceph_connection *con)
5526 {
5527     struct ceph_osd *o = con->private;
5528     struct ceph_osd_client *osdc = o->o_osdc;
5529     struct ceph_auth_client *ac = osdc->client->monc.auth;
5530 
5531     ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
5532     return ceph_monc_validate_auth(&osdc->client->monc);
5533 }
5534 
5535 static int osd_get_auth_request(struct ceph_connection *con,
5536                 void *buf, int *buf_len,
5537                 void **authorizer, int *authorizer_len)
5538 {
5539     struct ceph_osd *o = con->private;
5540     struct ceph_auth_client *ac = o->o_osdc->client->monc.auth;
5541     struct ceph_auth_handshake *auth = &o->o_auth;
5542     int ret;
5543 
5544     ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_OSD,
5545                        buf, buf_len);
5546     if (ret)
5547         return ret;
5548 
5549     *authorizer = auth->authorizer_buf;
5550     *authorizer_len = auth->authorizer_buf_len;
5551     return 0;
5552 }
5553 
5554 static int osd_handle_auth_reply_more(struct ceph_connection *con,
5555                       void *reply, int reply_len,
5556                       void *buf, int *buf_len,
5557                       void **authorizer, int *authorizer_len)
5558 {
5559     struct ceph_osd *o = con->private;
5560     struct ceph_auth_client *ac = o->o_osdc->client->monc.auth;
5561     struct ceph_auth_handshake *auth = &o->o_auth;
5562     int ret;
5563 
5564     ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
5565                           buf, buf_len);
5566     if (ret)
5567         return ret;
5568 
5569     *authorizer = auth->authorizer_buf;
5570     *authorizer_len = auth->authorizer_buf_len;
5571     return 0;
5572 }
5573 
5574 static int osd_handle_auth_done(struct ceph_connection *con,
5575                 u64 global_id, void *reply, int reply_len,
5576                 u8 *session_key, int *session_key_len,
5577                 u8 *con_secret, int *con_secret_len)
5578 {
5579     struct ceph_osd *o = con->private;
5580     struct ceph_auth_client *ac = o->o_osdc->client->monc.auth;
5581     struct ceph_auth_handshake *auth = &o->o_auth;
5582 
5583     return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
5584                            session_key, session_key_len,
5585                            con_secret, con_secret_len);
5586 }
5587 
5588 static int osd_handle_auth_bad_method(struct ceph_connection *con,
5589                       int used_proto, int result,
5590                       const int *allowed_protos, int proto_cnt,
5591                       const int *allowed_modes, int mode_cnt)
5592 {
5593     struct ceph_osd *o = con->private;
5594     struct ceph_mon_client *monc = &o->o_osdc->client->monc;
5595     int ret;
5596 
5597     if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_OSD,
5598                         used_proto, result,
5599                         allowed_protos, proto_cnt,
5600                         allowed_modes, mode_cnt)) {
5601         ret = ceph_monc_validate_auth(monc);
5602         if (ret)
5603             return ret;
5604     }
5605 
5606     return -EACCES;
5607 }
5608 
5609 static void osd_reencode_message(struct ceph_msg *msg)
5610 {
5611     int type = le16_to_cpu(msg->hdr.type);
5612 
5613     if (type == CEPH_MSG_OSD_OP)
5614         encode_request_finish(msg);
5615 }
5616 
5617 static int osd_sign_message(struct ceph_msg *msg)
5618 {
5619     struct ceph_osd *o = msg->con->private;
5620     struct ceph_auth_handshake *auth = &o->o_auth;
5621 
5622     return ceph_auth_sign_message(auth, msg);
5623 }
5624 
5625 static int osd_check_message_signature(struct ceph_msg *msg)
5626 {
5627     struct ceph_osd *o = msg->con->private;
5628     struct ceph_auth_handshake *auth = &o->o_auth;
5629 
5630     return ceph_auth_check_message_signature(auth, msg);
5631 }
5632 
5633 static const struct ceph_connection_operations osd_con_ops = {
5634     .get = osd_get_con,
5635     .put = osd_put_con,
5636     .alloc_msg = osd_alloc_msg,
5637     .dispatch = osd_dispatch,
5638     .fault = osd_fault,
5639     .reencode_message = osd_reencode_message,
5640     .get_authorizer = osd_get_authorizer,
5641     .add_authorizer_challenge = osd_add_authorizer_challenge,
5642     .verify_authorizer_reply = osd_verify_authorizer_reply,
5643     .invalidate_authorizer = osd_invalidate_authorizer,
5644     .sign_message = osd_sign_message,
5645     .check_message_signature = osd_check_message_signature,
5646     .get_auth_request = osd_get_auth_request,
5647     .handle_auth_reply_more = osd_handle_auth_reply_more,
5648     .handle_auth_done = osd_handle_auth_done,
5649     .handle_auth_bad_method = osd_handle_auth_bad_method,
5650 };