Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0
0002 
0003 #include <linux/ceph/ceph_debug.h>
0004 
0005 #include <linux/module.h>
0006 #include <linux/slab.h>
0007 
0008 #include <linux/ceph/libceph.h>
0009 #include <linux/ceph/osdmap.h>
0010 #include <linux/ceph/decode.h>
0011 #include <linux/crush/hash.h>
0012 #include <linux/crush/mapper.h>
0013 
0014 static __printf(2, 3)
0015 void osdmap_info(const struct ceph_osdmap *map, const char *fmt, ...)
0016 {
0017     struct va_format vaf;
0018     va_list args;
0019 
0020     va_start(args, fmt);
0021     vaf.fmt = fmt;
0022     vaf.va = &args;
0023 
0024     printk(KERN_INFO "%s (%pU e%u): %pV", KBUILD_MODNAME, &map->fsid,
0025            map->epoch, &vaf);
0026 
0027     va_end(args);
0028 }
0029 
0030 char *ceph_osdmap_state_str(char *str, int len, u32 state)
0031 {
0032     if (!len)
0033         return str;
0034 
0035     if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP))
0036         snprintf(str, len, "exists, up");
0037     else if (state & CEPH_OSD_EXISTS)
0038         snprintf(str, len, "exists");
0039     else if (state & CEPH_OSD_UP)
0040         snprintf(str, len, "up");
0041     else
0042         snprintf(str, len, "doesn't exist");
0043 
0044     return str;
0045 }
0046 
0047 /* maps */
0048 
0049 static int calc_bits_of(unsigned int t)
0050 {
0051     int b = 0;
0052     while (t) {
0053         t = t >> 1;
0054         b++;
0055     }
0056     return b;
0057 }
0058 
0059 /*
0060  * the foo_mask is the smallest value 2^n-1 that is >= foo.
0061  */
0062 static void calc_pg_masks(struct ceph_pg_pool_info *pi)
0063 {
0064     pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1;
0065     pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1;
0066 }
0067 
0068 /*
0069  * decode crush map
0070  */
0071 static int crush_decode_uniform_bucket(void **p, void *end,
0072                        struct crush_bucket_uniform *b)
0073 {
0074     dout("crush_decode_uniform_bucket %p to %p\n", *p, end);
0075     ceph_decode_need(p, end, (1+b->h.size) * sizeof(u32), bad);
0076     b->item_weight = ceph_decode_32(p);
0077     return 0;
0078 bad:
0079     return -EINVAL;
0080 }
0081 
0082 static int crush_decode_list_bucket(void **p, void *end,
0083                     struct crush_bucket_list *b)
0084 {
0085     int j;
0086     dout("crush_decode_list_bucket %p to %p\n", *p, end);
0087     b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
0088     if (b->item_weights == NULL)
0089         return -ENOMEM;
0090     b->sum_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
0091     if (b->sum_weights == NULL)
0092         return -ENOMEM;
0093     ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
0094     for (j = 0; j < b->h.size; j++) {
0095         b->item_weights[j] = ceph_decode_32(p);
0096         b->sum_weights[j] = ceph_decode_32(p);
0097     }
0098     return 0;
0099 bad:
0100     return -EINVAL;
0101 }
0102 
0103 static int crush_decode_tree_bucket(void **p, void *end,
0104                     struct crush_bucket_tree *b)
0105 {
0106     int j;
0107     dout("crush_decode_tree_bucket %p to %p\n", *p, end);
0108     ceph_decode_8_safe(p, end, b->num_nodes, bad);
0109     b->node_weights = kcalloc(b->num_nodes, sizeof(u32), GFP_NOFS);
0110     if (b->node_weights == NULL)
0111         return -ENOMEM;
0112     ceph_decode_need(p, end, b->num_nodes * sizeof(u32), bad);
0113     for (j = 0; j < b->num_nodes; j++)
0114         b->node_weights[j] = ceph_decode_32(p);
0115     return 0;
0116 bad:
0117     return -EINVAL;
0118 }
0119 
0120 static int crush_decode_straw_bucket(void **p, void *end,
0121                      struct crush_bucket_straw *b)
0122 {
0123     int j;
0124     dout("crush_decode_straw_bucket %p to %p\n", *p, end);
0125     b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
0126     if (b->item_weights == NULL)
0127         return -ENOMEM;
0128     b->straws = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
0129     if (b->straws == NULL)
0130         return -ENOMEM;
0131     ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
0132     for (j = 0; j < b->h.size; j++) {
0133         b->item_weights[j] = ceph_decode_32(p);
0134         b->straws[j] = ceph_decode_32(p);
0135     }
0136     return 0;
0137 bad:
0138     return -EINVAL;
0139 }
0140 
0141 static int crush_decode_straw2_bucket(void **p, void *end,
0142                       struct crush_bucket_straw2 *b)
0143 {
0144     int j;
0145     dout("crush_decode_straw2_bucket %p to %p\n", *p, end);
0146     b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS);
0147     if (b->item_weights == NULL)
0148         return -ENOMEM;
0149     ceph_decode_need(p, end, b->h.size * sizeof(u32), bad);
0150     for (j = 0; j < b->h.size; j++)
0151         b->item_weights[j] = ceph_decode_32(p);
0152     return 0;
0153 bad:
0154     return -EINVAL;
0155 }
0156 
0157 struct crush_name_node {
0158     struct rb_node cn_node;
0159     int cn_id;
0160     char cn_name[];
0161 };
0162 
0163 static struct crush_name_node *alloc_crush_name(size_t name_len)
0164 {
0165     struct crush_name_node *cn;
0166 
0167     cn = kmalloc(sizeof(*cn) + name_len + 1, GFP_NOIO);
0168     if (!cn)
0169         return NULL;
0170 
0171     RB_CLEAR_NODE(&cn->cn_node);
0172     return cn;
0173 }
0174 
0175 static void free_crush_name(struct crush_name_node *cn)
0176 {
0177     WARN_ON(!RB_EMPTY_NODE(&cn->cn_node));
0178 
0179     kfree(cn);
0180 }
0181 
0182 DEFINE_RB_FUNCS(crush_name, struct crush_name_node, cn_id, cn_node)
0183 
0184 static int decode_crush_names(void **p, void *end, struct rb_root *root)
0185 {
0186     u32 n;
0187 
0188     ceph_decode_32_safe(p, end, n, e_inval);
0189     while (n--) {
0190         struct crush_name_node *cn;
0191         int id;
0192         u32 name_len;
0193 
0194         ceph_decode_32_safe(p, end, id, e_inval);
0195         ceph_decode_32_safe(p, end, name_len, e_inval);
0196         ceph_decode_need(p, end, name_len, e_inval);
0197 
0198         cn = alloc_crush_name(name_len);
0199         if (!cn)
0200             return -ENOMEM;
0201 
0202         cn->cn_id = id;
0203         memcpy(cn->cn_name, *p, name_len);
0204         cn->cn_name[name_len] = '\0';
0205         *p += name_len;
0206 
0207         if (!__insert_crush_name(root, cn)) {
0208             free_crush_name(cn);
0209             return -EEXIST;
0210         }
0211     }
0212 
0213     return 0;
0214 
0215 e_inval:
0216     return -EINVAL;
0217 }
0218 
0219 void clear_crush_names(struct rb_root *root)
0220 {
0221     while (!RB_EMPTY_ROOT(root)) {
0222         struct crush_name_node *cn =
0223             rb_entry(rb_first(root), struct crush_name_node, cn_node);
0224 
0225         erase_crush_name(root, cn);
0226         free_crush_name(cn);
0227     }
0228 }
0229 
0230 static struct crush_choose_arg_map *alloc_choose_arg_map(void)
0231 {
0232     struct crush_choose_arg_map *arg_map;
0233 
0234     arg_map = kzalloc(sizeof(*arg_map), GFP_NOIO);
0235     if (!arg_map)
0236         return NULL;
0237 
0238     RB_CLEAR_NODE(&arg_map->node);
0239     return arg_map;
0240 }
0241 
0242 static void free_choose_arg_map(struct crush_choose_arg_map *arg_map)
0243 {
0244     if (arg_map) {
0245         int i, j;
0246 
0247         WARN_ON(!RB_EMPTY_NODE(&arg_map->node));
0248 
0249         for (i = 0; i < arg_map->size; i++) {
0250             struct crush_choose_arg *arg = &arg_map->args[i];
0251 
0252             for (j = 0; j < arg->weight_set_size; j++)
0253                 kfree(arg->weight_set[j].weights);
0254             kfree(arg->weight_set);
0255             kfree(arg->ids);
0256         }
0257         kfree(arg_map->args);
0258         kfree(arg_map);
0259     }
0260 }
0261 
0262 DEFINE_RB_FUNCS(choose_arg_map, struct crush_choose_arg_map, choose_args_index,
0263         node);
0264 
0265 void clear_choose_args(struct crush_map *c)
0266 {
0267     while (!RB_EMPTY_ROOT(&c->choose_args)) {
0268         struct crush_choose_arg_map *arg_map =
0269             rb_entry(rb_first(&c->choose_args),
0270                  struct crush_choose_arg_map, node);
0271 
0272         erase_choose_arg_map(&c->choose_args, arg_map);
0273         free_choose_arg_map(arg_map);
0274     }
0275 }
0276 
0277 static u32 *decode_array_32_alloc(void **p, void *end, u32 *plen)
0278 {
0279     u32 *a = NULL;
0280     u32 len;
0281     int ret;
0282 
0283     ceph_decode_32_safe(p, end, len, e_inval);
0284     if (len) {
0285         u32 i;
0286 
0287         a = kmalloc_array(len, sizeof(u32), GFP_NOIO);
0288         if (!a) {
0289             ret = -ENOMEM;
0290             goto fail;
0291         }
0292 
0293         ceph_decode_need(p, end, len * sizeof(u32), e_inval);
0294         for (i = 0; i < len; i++)
0295             a[i] = ceph_decode_32(p);
0296     }
0297 
0298     *plen = len;
0299     return a;
0300 
0301 e_inval:
0302     ret = -EINVAL;
0303 fail:
0304     kfree(a);
0305     return ERR_PTR(ret);
0306 }
0307 
0308 /*
0309  * Assumes @arg is zero-initialized.
0310  */
0311 static int decode_choose_arg(void **p, void *end, struct crush_choose_arg *arg)
0312 {
0313     int ret;
0314 
0315     ceph_decode_32_safe(p, end, arg->weight_set_size, e_inval);
0316     if (arg->weight_set_size) {
0317         u32 i;
0318 
0319         arg->weight_set = kmalloc_array(arg->weight_set_size,
0320                         sizeof(*arg->weight_set),
0321                         GFP_NOIO);
0322         if (!arg->weight_set)
0323             return -ENOMEM;
0324 
0325         for (i = 0; i < arg->weight_set_size; i++) {
0326             struct crush_weight_set *w = &arg->weight_set[i];
0327 
0328             w->weights = decode_array_32_alloc(p, end, &w->size);
0329             if (IS_ERR(w->weights)) {
0330                 ret = PTR_ERR(w->weights);
0331                 w->weights = NULL;
0332                 return ret;
0333             }
0334         }
0335     }
0336 
0337     arg->ids = decode_array_32_alloc(p, end, &arg->ids_size);
0338     if (IS_ERR(arg->ids)) {
0339         ret = PTR_ERR(arg->ids);
0340         arg->ids = NULL;
0341         return ret;
0342     }
0343 
0344     return 0;
0345 
0346 e_inval:
0347     return -EINVAL;
0348 }
0349 
0350 static int decode_choose_args(void **p, void *end, struct crush_map *c)
0351 {
0352     struct crush_choose_arg_map *arg_map = NULL;
0353     u32 num_choose_arg_maps, num_buckets;
0354     int ret;
0355 
0356     ceph_decode_32_safe(p, end, num_choose_arg_maps, e_inval);
0357     while (num_choose_arg_maps--) {
0358         arg_map = alloc_choose_arg_map();
0359         if (!arg_map) {
0360             ret = -ENOMEM;
0361             goto fail;
0362         }
0363 
0364         ceph_decode_64_safe(p, end, arg_map->choose_args_index,
0365                     e_inval);
0366         arg_map->size = c->max_buckets;
0367         arg_map->args = kcalloc(arg_map->size, sizeof(*arg_map->args),
0368                     GFP_NOIO);
0369         if (!arg_map->args) {
0370             ret = -ENOMEM;
0371             goto fail;
0372         }
0373 
0374         ceph_decode_32_safe(p, end, num_buckets, e_inval);
0375         while (num_buckets--) {
0376             struct crush_choose_arg *arg;
0377             u32 bucket_index;
0378 
0379             ceph_decode_32_safe(p, end, bucket_index, e_inval);
0380             if (bucket_index >= arg_map->size)
0381                 goto e_inval;
0382 
0383             arg = &arg_map->args[bucket_index];
0384             ret = decode_choose_arg(p, end, arg);
0385             if (ret)
0386                 goto fail;
0387 
0388             if (arg->ids_size &&
0389                 arg->ids_size != c->buckets[bucket_index]->size)
0390                 goto e_inval;
0391         }
0392 
0393         insert_choose_arg_map(&c->choose_args, arg_map);
0394     }
0395 
0396     return 0;
0397 
0398 e_inval:
0399     ret = -EINVAL;
0400 fail:
0401     free_choose_arg_map(arg_map);
0402     return ret;
0403 }
0404 
0405 static void crush_finalize(struct crush_map *c)
0406 {
0407     __s32 b;
0408 
0409     /* Space for the array of pointers to per-bucket workspace */
0410     c->working_size = sizeof(struct crush_work) +
0411         c->max_buckets * sizeof(struct crush_work_bucket *);
0412 
0413     for (b = 0; b < c->max_buckets; b++) {
0414         if (!c->buckets[b])
0415             continue;
0416 
0417         switch (c->buckets[b]->alg) {
0418         default:
0419             /*
0420              * The base case, permutation variables and
0421              * the pointer to the permutation array.
0422              */
0423             c->working_size += sizeof(struct crush_work_bucket);
0424             break;
0425         }
0426         /* Every bucket has a permutation array. */
0427         c->working_size += c->buckets[b]->size * sizeof(__u32);
0428     }
0429 }
0430 
0431 static struct crush_map *crush_decode(void *pbyval, void *end)
0432 {
0433     struct crush_map *c;
0434     int err;
0435     int i, j;
0436     void **p = &pbyval;
0437     void *start = pbyval;
0438     u32 magic;
0439 
0440     dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
0441 
0442     c = kzalloc(sizeof(*c), GFP_NOFS);
0443     if (c == NULL)
0444         return ERR_PTR(-ENOMEM);
0445 
0446     c->type_names = RB_ROOT;
0447     c->names = RB_ROOT;
0448     c->choose_args = RB_ROOT;
0449 
0450         /* set tunables to default values */
0451         c->choose_local_tries = 2;
0452         c->choose_local_fallback_tries = 5;
0453         c->choose_total_tries = 19;
0454     c->chooseleaf_descend_once = 0;
0455 
0456     ceph_decode_need(p, end, 4*sizeof(u32), bad);
0457     magic = ceph_decode_32(p);
0458     if (magic != CRUSH_MAGIC) {
0459         pr_err("crush_decode magic %x != current %x\n",
0460                (unsigned int)magic, (unsigned int)CRUSH_MAGIC);
0461         goto bad;
0462     }
0463     c->max_buckets = ceph_decode_32(p);
0464     c->max_rules = ceph_decode_32(p);
0465     c->max_devices = ceph_decode_32(p);
0466 
0467     c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS);
0468     if (c->buckets == NULL)
0469         goto badmem;
0470     c->rules = kcalloc(c->max_rules, sizeof(*c->rules), GFP_NOFS);
0471     if (c->rules == NULL)
0472         goto badmem;
0473 
0474     /* buckets */
0475     for (i = 0; i < c->max_buckets; i++) {
0476         int size = 0;
0477         u32 alg;
0478         struct crush_bucket *b;
0479 
0480         ceph_decode_32_safe(p, end, alg, bad);
0481         if (alg == 0) {
0482             c->buckets[i] = NULL;
0483             continue;
0484         }
0485         dout("crush_decode bucket %d off %x %p to %p\n",
0486              i, (int)(*p-start), *p, end);
0487 
0488         switch (alg) {
0489         case CRUSH_BUCKET_UNIFORM:
0490             size = sizeof(struct crush_bucket_uniform);
0491             break;
0492         case CRUSH_BUCKET_LIST:
0493             size = sizeof(struct crush_bucket_list);
0494             break;
0495         case CRUSH_BUCKET_TREE:
0496             size = sizeof(struct crush_bucket_tree);
0497             break;
0498         case CRUSH_BUCKET_STRAW:
0499             size = sizeof(struct crush_bucket_straw);
0500             break;
0501         case CRUSH_BUCKET_STRAW2:
0502             size = sizeof(struct crush_bucket_straw2);
0503             break;
0504         default:
0505             goto bad;
0506         }
0507         BUG_ON(size == 0);
0508         b = c->buckets[i] = kzalloc(size, GFP_NOFS);
0509         if (b == NULL)
0510             goto badmem;
0511 
0512         ceph_decode_need(p, end, 4*sizeof(u32), bad);
0513         b->id = ceph_decode_32(p);
0514         b->type = ceph_decode_16(p);
0515         b->alg = ceph_decode_8(p);
0516         b->hash = ceph_decode_8(p);
0517         b->weight = ceph_decode_32(p);
0518         b->size = ceph_decode_32(p);
0519 
0520         dout("crush_decode bucket size %d off %x %p to %p\n",
0521              b->size, (int)(*p-start), *p, end);
0522 
0523         b->items = kcalloc(b->size, sizeof(__s32), GFP_NOFS);
0524         if (b->items == NULL)
0525             goto badmem;
0526 
0527         ceph_decode_need(p, end, b->size*sizeof(u32), bad);
0528         for (j = 0; j < b->size; j++)
0529             b->items[j] = ceph_decode_32(p);
0530 
0531         switch (b->alg) {
0532         case CRUSH_BUCKET_UNIFORM:
0533             err = crush_decode_uniform_bucket(p, end,
0534                   (struct crush_bucket_uniform *)b);
0535             if (err < 0)
0536                 goto fail;
0537             break;
0538         case CRUSH_BUCKET_LIST:
0539             err = crush_decode_list_bucket(p, end,
0540                    (struct crush_bucket_list *)b);
0541             if (err < 0)
0542                 goto fail;
0543             break;
0544         case CRUSH_BUCKET_TREE:
0545             err = crush_decode_tree_bucket(p, end,
0546                 (struct crush_bucket_tree *)b);
0547             if (err < 0)
0548                 goto fail;
0549             break;
0550         case CRUSH_BUCKET_STRAW:
0551             err = crush_decode_straw_bucket(p, end,
0552                 (struct crush_bucket_straw *)b);
0553             if (err < 0)
0554                 goto fail;
0555             break;
0556         case CRUSH_BUCKET_STRAW2:
0557             err = crush_decode_straw2_bucket(p, end,
0558                 (struct crush_bucket_straw2 *)b);
0559             if (err < 0)
0560                 goto fail;
0561             break;
0562         }
0563     }
0564 
0565     /* rules */
0566     dout("rule vec is %p\n", c->rules);
0567     for (i = 0; i < c->max_rules; i++) {
0568         u32 yes;
0569         struct crush_rule *r;
0570 
0571         ceph_decode_32_safe(p, end, yes, bad);
0572         if (!yes) {
0573             dout("crush_decode NO rule %d off %x %p to %p\n",
0574                  i, (int)(*p-start), *p, end);
0575             c->rules[i] = NULL;
0576             continue;
0577         }
0578 
0579         dout("crush_decode rule %d off %x %p to %p\n",
0580              i, (int)(*p-start), *p, end);
0581 
0582         /* len */
0583         ceph_decode_32_safe(p, end, yes, bad);
0584 #if BITS_PER_LONG == 32
0585         if (yes > (ULONG_MAX - sizeof(*r))
0586               / sizeof(struct crush_rule_step))
0587             goto bad;
0588 #endif
0589         r = kmalloc(struct_size(r, steps, yes), GFP_NOFS);
0590         if (r == NULL)
0591             goto badmem;
0592         dout(" rule %d is at %p\n", i, r);
0593         c->rules[i] = r;
0594         r->len = yes;
0595         ceph_decode_copy_safe(p, end, &r->mask, 4, bad); /* 4 u8's */
0596         ceph_decode_need(p, end, r->len*3*sizeof(u32), bad);
0597         for (j = 0; j < r->len; j++) {
0598             r->steps[j].op = ceph_decode_32(p);
0599             r->steps[j].arg1 = ceph_decode_32(p);
0600             r->steps[j].arg2 = ceph_decode_32(p);
0601         }
0602     }
0603 
0604     err = decode_crush_names(p, end, &c->type_names);
0605     if (err)
0606         goto fail;
0607 
0608     err = decode_crush_names(p, end, &c->names);
0609     if (err)
0610         goto fail;
0611 
0612     ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */
0613 
0614         /* tunables */
0615         ceph_decode_need(p, end, 3*sizeof(u32), done);
0616         c->choose_local_tries = ceph_decode_32(p);
0617         c->choose_local_fallback_tries =  ceph_decode_32(p);
0618         c->choose_total_tries = ceph_decode_32(p);
0619         dout("crush decode tunable choose_local_tries = %d\n",
0620              c->choose_local_tries);
0621         dout("crush decode tunable choose_local_fallback_tries = %d\n",
0622              c->choose_local_fallback_tries);
0623         dout("crush decode tunable choose_total_tries = %d\n",
0624              c->choose_total_tries);
0625 
0626     ceph_decode_need(p, end, sizeof(u32), done);
0627     c->chooseleaf_descend_once = ceph_decode_32(p);
0628     dout("crush decode tunable chooseleaf_descend_once = %d\n",
0629          c->chooseleaf_descend_once);
0630 
0631     ceph_decode_need(p, end, sizeof(u8), done);
0632     c->chooseleaf_vary_r = ceph_decode_8(p);
0633     dout("crush decode tunable chooseleaf_vary_r = %d\n",
0634          c->chooseleaf_vary_r);
0635 
0636     /* skip straw_calc_version, allowed_bucket_algs */
0637     ceph_decode_need(p, end, sizeof(u8) + sizeof(u32), done);
0638     *p += sizeof(u8) + sizeof(u32);
0639 
0640     ceph_decode_need(p, end, sizeof(u8), done);
0641     c->chooseleaf_stable = ceph_decode_8(p);
0642     dout("crush decode tunable chooseleaf_stable = %d\n",
0643          c->chooseleaf_stable);
0644 
0645     if (*p != end) {
0646         /* class_map */
0647         ceph_decode_skip_map(p, end, 32, 32, bad);
0648         /* class_name */
0649         ceph_decode_skip_map(p, end, 32, string, bad);
0650         /* class_bucket */
0651         ceph_decode_skip_map_of_map(p, end, 32, 32, 32, bad);
0652     }
0653 
0654     if (*p != end) {
0655         err = decode_choose_args(p, end, c);
0656         if (err)
0657             goto fail;
0658     }
0659 
0660 done:
0661     crush_finalize(c);
0662     dout("crush_decode success\n");
0663     return c;
0664 
0665 badmem:
0666     err = -ENOMEM;
0667 fail:
0668     dout("crush_decode fail %d\n", err);
0669     crush_destroy(c);
0670     return ERR_PTR(err);
0671 
0672 bad:
0673     err = -EINVAL;
0674     goto fail;
0675 }
0676 
0677 int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs)
0678 {
0679     if (lhs->pool < rhs->pool)
0680         return -1;
0681     if (lhs->pool > rhs->pool)
0682         return 1;
0683     if (lhs->seed < rhs->seed)
0684         return -1;
0685     if (lhs->seed > rhs->seed)
0686         return 1;
0687 
0688     return 0;
0689 }
0690 
0691 int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs)
0692 {
0693     int ret;
0694 
0695     ret = ceph_pg_compare(&lhs->pgid, &rhs->pgid);
0696     if (ret)
0697         return ret;
0698 
0699     if (lhs->shard < rhs->shard)
0700         return -1;
0701     if (lhs->shard > rhs->shard)
0702         return 1;
0703 
0704     return 0;
0705 }
0706 
0707 static struct ceph_pg_mapping *alloc_pg_mapping(size_t payload_len)
0708 {
0709     struct ceph_pg_mapping *pg;
0710 
0711     pg = kmalloc(sizeof(*pg) + payload_len, GFP_NOIO);
0712     if (!pg)
0713         return NULL;
0714 
0715     RB_CLEAR_NODE(&pg->node);
0716     return pg;
0717 }
0718 
0719 static void free_pg_mapping(struct ceph_pg_mapping *pg)
0720 {
0721     WARN_ON(!RB_EMPTY_NODE(&pg->node));
0722 
0723     kfree(pg);
0724 }
0725 
0726 /*
0727  * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
0728  * to a set of osds) and primary_temp (explicit primary setting)
0729  */
0730 DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare,
0731          RB_BYPTR, const struct ceph_pg *, node)
0732 
0733 /*
0734  * rbtree of pg pool info
0735  */
0736 DEFINE_RB_FUNCS(pg_pool, struct ceph_pg_pool_info, id, node)
0737 
0738 struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id)
0739 {
0740     return lookup_pg_pool(&map->pg_pools, id);
0741 }
0742 
0743 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
0744 {
0745     struct ceph_pg_pool_info *pi;
0746 
0747     if (id == CEPH_NOPOOL)
0748         return NULL;
0749 
0750     if (WARN_ON_ONCE(id > (u64) INT_MAX))
0751         return NULL;
0752 
0753     pi = lookup_pg_pool(&map->pg_pools, id);
0754     return pi ? pi->name : NULL;
0755 }
0756 EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
0757 
0758 int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
0759 {
0760     struct rb_node *rbp;
0761 
0762     for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) {
0763         struct ceph_pg_pool_info *pi =
0764             rb_entry(rbp, struct ceph_pg_pool_info, node);
0765         if (pi->name && strcmp(pi->name, name) == 0)
0766             return pi->id;
0767     }
0768     return -ENOENT;
0769 }
0770 EXPORT_SYMBOL(ceph_pg_poolid_by_name);
0771 
0772 u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id)
0773 {
0774     struct ceph_pg_pool_info *pi;
0775 
0776     pi = lookup_pg_pool(&map->pg_pools, id);
0777     return pi ? pi->flags : 0;
0778 }
0779 EXPORT_SYMBOL(ceph_pg_pool_flags);
0780 
0781 static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
0782 {
0783     erase_pg_pool(root, pi);
0784     kfree(pi->name);
0785     kfree(pi);
0786 }
0787 
0788 static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
0789 {
0790     u8 ev, cv;
0791     unsigned len, num;
0792     void *pool_end;
0793 
0794     ceph_decode_need(p, end, 2 + 4, bad);
0795     ev = ceph_decode_8(p);  /* encoding version */
0796     cv = ceph_decode_8(p); /* compat version */
0797     if (ev < 5) {
0798         pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
0799         return -EINVAL;
0800     }
0801     if (cv > 9) {
0802         pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);
0803         return -EINVAL;
0804     }
0805     len = ceph_decode_32(p);
0806     ceph_decode_need(p, end, len, bad);
0807     pool_end = *p + len;
0808 
0809     pi->type = ceph_decode_8(p);
0810     pi->size = ceph_decode_8(p);
0811     pi->crush_ruleset = ceph_decode_8(p);
0812     pi->object_hash = ceph_decode_8(p);
0813 
0814     pi->pg_num = ceph_decode_32(p);
0815     pi->pgp_num = ceph_decode_32(p);
0816 
0817     *p += 4 + 4;  /* skip lpg* */
0818     *p += 4;      /* skip last_change */
0819     *p += 8 + 4;  /* skip snap_seq, snap_epoch */
0820 
0821     /* skip snaps */
0822     num = ceph_decode_32(p);
0823     while (num--) {
0824         *p += 8;  /* snapid key */
0825         *p += 1 + 1; /* versions */
0826         len = ceph_decode_32(p);
0827         *p += len;
0828     }
0829 
0830     /* skip removed_snaps */
0831     num = ceph_decode_32(p);
0832     *p += num * (8 + 8);
0833 
0834     *p += 8;  /* skip auid */
0835     pi->flags = ceph_decode_64(p);
0836     *p += 4;  /* skip crash_replay_interval */
0837 
0838     if (ev >= 7)
0839         pi->min_size = ceph_decode_8(p);
0840     else
0841         pi->min_size = pi->size - pi->size / 2;
0842 
0843     if (ev >= 8)
0844         *p += 8 + 8;  /* skip quota_max_* */
0845 
0846     if (ev >= 9) {
0847         /* skip tiers */
0848         num = ceph_decode_32(p);
0849         *p += num * 8;
0850 
0851         *p += 8;  /* skip tier_of */
0852         *p += 1;  /* skip cache_mode */
0853 
0854         pi->read_tier = ceph_decode_64(p);
0855         pi->write_tier = ceph_decode_64(p);
0856     } else {
0857         pi->read_tier = -1;
0858         pi->write_tier = -1;
0859     }
0860 
0861     if (ev >= 10) {
0862         /* skip properties */
0863         num = ceph_decode_32(p);
0864         while (num--) {
0865             len = ceph_decode_32(p);
0866             *p += len; /* key */
0867             len = ceph_decode_32(p);
0868             *p += len; /* val */
0869         }
0870     }
0871 
0872     if (ev >= 11) {
0873         /* skip hit_set_params */
0874         *p += 1 + 1; /* versions */
0875         len = ceph_decode_32(p);
0876         *p += len;
0877 
0878         *p += 4; /* skip hit_set_period */
0879         *p += 4; /* skip hit_set_count */
0880     }
0881 
0882     if (ev >= 12)
0883         *p += 4; /* skip stripe_width */
0884 
0885     if (ev >= 13) {
0886         *p += 8; /* skip target_max_bytes */
0887         *p += 8; /* skip target_max_objects */
0888         *p += 4; /* skip cache_target_dirty_ratio_micro */
0889         *p += 4; /* skip cache_target_full_ratio_micro */
0890         *p += 4; /* skip cache_min_flush_age */
0891         *p += 4; /* skip cache_min_evict_age */
0892     }
0893 
0894     if (ev >=  14) {
0895         /* skip erasure_code_profile */
0896         len = ceph_decode_32(p);
0897         *p += len;
0898     }
0899 
0900     /*
0901      * last_force_op_resend_preluminous, will be overridden if the
0902      * map was encoded with RESEND_ON_SPLIT
0903      */
0904     if (ev >= 15)
0905         pi->last_force_request_resend = ceph_decode_32(p);
0906     else
0907         pi->last_force_request_resend = 0;
0908 
0909     if (ev >= 16)
0910         *p += 4; /* skip min_read_recency_for_promote */
0911 
0912     if (ev >= 17)
0913         *p += 8; /* skip expected_num_objects */
0914 
0915     if (ev >= 19)
0916         *p += 4; /* skip cache_target_dirty_high_ratio_micro */
0917 
0918     if (ev >= 20)
0919         *p += 4; /* skip min_write_recency_for_promote */
0920 
0921     if (ev >= 21)
0922         *p += 1; /* skip use_gmt_hitset */
0923 
0924     if (ev >= 22)
0925         *p += 1; /* skip fast_read */
0926 
0927     if (ev >= 23) {
0928         *p += 4; /* skip hit_set_grade_decay_rate */
0929         *p += 4; /* skip hit_set_search_last_n */
0930     }
0931 
0932     if (ev >= 24) {
0933         /* skip opts */
0934         *p += 1 + 1; /* versions */
0935         len = ceph_decode_32(p);
0936         *p += len;
0937     }
0938 
0939     if (ev >= 25)
0940         pi->last_force_request_resend = ceph_decode_32(p);
0941 
0942     /* ignore the rest */
0943 
0944     *p = pool_end;
0945     calc_pg_masks(pi);
0946     return 0;
0947 
0948 bad:
0949     return -EINVAL;
0950 }
0951 
0952 static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
0953 {
0954     struct ceph_pg_pool_info *pi;
0955     u32 num, len;
0956     u64 pool;
0957 
0958     ceph_decode_32_safe(p, end, num, bad);
0959     dout(" %d pool names\n", num);
0960     while (num--) {
0961         ceph_decode_64_safe(p, end, pool, bad);
0962         ceph_decode_32_safe(p, end, len, bad);
0963         dout("  pool %llu len %d\n", pool, len);
0964         ceph_decode_need(p, end, len, bad);
0965         pi = lookup_pg_pool(&map->pg_pools, pool);
0966         if (pi) {
0967             char *name = kstrndup(*p, len, GFP_NOFS);
0968 
0969             if (!name)
0970                 return -ENOMEM;
0971             kfree(pi->name);
0972             pi->name = name;
0973             dout("  name is %s\n", pi->name);
0974         }
0975         *p += len;
0976     }
0977     return 0;
0978 
0979 bad:
0980     return -EINVAL;
0981 }
0982 
0983 /*
0984  * CRUSH workspaces
0985  *
0986  * workspace_manager framework borrowed from fs/btrfs/compression.c.
0987  * Two simplifications: there is only one type of workspace and there
0988  * is always at least one workspace.
0989  */
0990 static struct crush_work *alloc_workspace(const struct crush_map *c)
0991 {
0992     struct crush_work *work;
0993     size_t work_size;
0994 
0995     WARN_ON(!c->working_size);
0996     work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
0997     dout("%s work_size %zu bytes\n", __func__, work_size);
0998 
0999     work = kvmalloc(work_size, GFP_NOIO);
1000     if (!work)
1001         return NULL;
1002 
1003     INIT_LIST_HEAD(&work->item);
1004     crush_init_workspace(c, work);
1005     return work;
1006 }
1007 
1008 static void free_workspace(struct crush_work *work)
1009 {
1010     WARN_ON(!list_empty(&work->item));
1011     kvfree(work);
1012 }
1013 
1014 static void init_workspace_manager(struct workspace_manager *wsm)
1015 {
1016     INIT_LIST_HEAD(&wsm->idle_ws);
1017     spin_lock_init(&wsm->ws_lock);
1018     atomic_set(&wsm->total_ws, 0);
1019     wsm->free_ws = 0;
1020     init_waitqueue_head(&wsm->ws_wait);
1021 }
1022 
1023 static void add_initial_workspace(struct workspace_manager *wsm,
1024                   struct crush_work *work)
1025 {
1026     WARN_ON(!list_empty(&wsm->idle_ws));
1027 
1028     list_add(&work->item, &wsm->idle_ws);
1029     atomic_set(&wsm->total_ws, 1);
1030     wsm->free_ws = 1;
1031 }
1032 
1033 static void cleanup_workspace_manager(struct workspace_manager *wsm)
1034 {
1035     struct crush_work *work;
1036 
1037     while (!list_empty(&wsm->idle_ws)) {
1038         work = list_first_entry(&wsm->idle_ws, struct crush_work,
1039                     item);
1040         list_del_init(&work->item);
1041         free_workspace(work);
1042     }
1043     atomic_set(&wsm->total_ws, 0);
1044     wsm->free_ws = 0;
1045 }
1046 
1047 /*
1048  * Finds an available workspace or allocates a new one.  If it's not
1049  * possible to allocate a new one, waits until there is one.
1050  */
1051 static struct crush_work *get_workspace(struct workspace_manager *wsm,
1052                     const struct crush_map *c)
1053 {
1054     struct crush_work *work;
1055     int cpus = num_online_cpus();
1056 
1057 again:
1058     spin_lock(&wsm->ws_lock);
1059     if (!list_empty(&wsm->idle_ws)) {
1060         work = list_first_entry(&wsm->idle_ws, struct crush_work,
1061                     item);
1062         list_del_init(&work->item);
1063         wsm->free_ws--;
1064         spin_unlock(&wsm->ws_lock);
1065         return work;
1066 
1067     }
1068     if (atomic_read(&wsm->total_ws) > cpus) {
1069         DEFINE_WAIT(wait);
1070 
1071         spin_unlock(&wsm->ws_lock);
1072         prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
1073         if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
1074             schedule();
1075         finish_wait(&wsm->ws_wait, &wait);
1076         goto again;
1077     }
1078     atomic_inc(&wsm->total_ws);
1079     spin_unlock(&wsm->ws_lock);
1080 
1081     work = alloc_workspace(c);
1082     if (!work) {
1083         atomic_dec(&wsm->total_ws);
1084         wake_up(&wsm->ws_wait);
1085 
1086         /*
1087          * Do not return the error but go back to waiting.  We
1088          * have the initial workspace and the CRUSH computation
1089          * time is bounded so we will get it eventually.
1090          */
1091         WARN_ON(atomic_read(&wsm->total_ws) < 1);
1092         goto again;
1093     }
1094     return work;
1095 }
1096 
1097 /*
1098  * Puts a workspace back on the list or frees it if we have enough
1099  * idle ones sitting around.
1100  */
1101 static void put_workspace(struct workspace_manager *wsm,
1102               struct crush_work *work)
1103 {
1104     spin_lock(&wsm->ws_lock);
1105     if (wsm->free_ws <= num_online_cpus()) {
1106         list_add(&work->item, &wsm->idle_ws);
1107         wsm->free_ws++;
1108         spin_unlock(&wsm->ws_lock);
1109         goto wake;
1110     }
1111     spin_unlock(&wsm->ws_lock);
1112 
1113     free_workspace(work);
1114     atomic_dec(&wsm->total_ws);
1115 wake:
1116     if (wq_has_sleeper(&wsm->ws_wait))
1117         wake_up(&wsm->ws_wait);
1118 }
1119 
1120 /*
1121  * osd map
1122  */
1123 struct ceph_osdmap *ceph_osdmap_alloc(void)
1124 {
1125     struct ceph_osdmap *map;
1126 
1127     map = kzalloc(sizeof(*map), GFP_NOIO);
1128     if (!map)
1129         return NULL;
1130 
1131     map->pg_pools = RB_ROOT;
1132     map->pool_max = -1;
1133     map->pg_temp = RB_ROOT;
1134     map->primary_temp = RB_ROOT;
1135     map->pg_upmap = RB_ROOT;
1136     map->pg_upmap_items = RB_ROOT;
1137 
1138     init_workspace_manager(&map->crush_wsm);
1139 
1140     return map;
1141 }
1142 
1143 void ceph_osdmap_destroy(struct ceph_osdmap *map)
1144 {
1145     dout("osdmap_destroy %p\n", map);
1146 
1147     if (map->crush)
1148         crush_destroy(map->crush);
1149     cleanup_workspace_manager(&map->crush_wsm);
1150 
1151     while (!RB_EMPTY_ROOT(&map->pg_temp)) {
1152         struct ceph_pg_mapping *pg =
1153             rb_entry(rb_first(&map->pg_temp),
1154                  struct ceph_pg_mapping, node);
1155         erase_pg_mapping(&map->pg_temp, pg);
1156         free_pg_mapping(pg);
1157     }
1158     while (!RB_EMPTY_ROOT(&map->primary_temp)) {
1159         struct ceph_pg_mapping *pg =
1160             rb_entry(rb_first(&map->primary_temp),
1161                  struct ceph_pg_mapping, node);
1162         erase_pg_mapping(&map->primary_temp, pg);
1163         free_pg_mapping(pg);
1164     }
1165     while (!RB_EMPTY_ROOT(&map->pg_upmap)) {
1166         struct ceph_pg_mapping *pg =
1167             rb_entry(rb_first(&map->pg_upmap),
1168                  struct ceph_pg_mapping, node);
1169         rb_erase(&pg->node, &map->pg_upmap);
1170         kfree(pg);
1171     }
1172     while (!RB_EMPTY_ROOT(&map->pg_upmap_items)) {
1173         struct ceph_pg_mapping *pg =
1174             rb_entry(rb_first(&map->pg_upmap_items),
1175                  struct ceph_pg_mapping, node);
1176         rb_erase(&pg->node, &map->pg_upmap_items);
1177         kfree(pg);
1178     }
1179     while (!RB_EMPTY_ROOT(&map->pg_pools)) {
1180         struct ceph_pg_pool_info *pi =
1181             rb_entry(rb_first(&map->pg_pools),
1182                  struct ceph_pg_pool_info, node);
1183         __remove_pg_pool(&map->pg_pools, pi);
1184     }
1185     kvfree(map->osd_state);
1186     kvfree(map->osd_weight);
1187     kvfree(map->osd_addr);
1188     kvfree(map->osd_primary_affinity);
1189     kfree(map);
1190 }
1191 
1192 /*
1193  * Adjust max_osd value, (re)allocate arrays.
1194  *
1195  * The new elements are properly initialized.
1196  */
1197 static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
1198 {
1199     u32 *state;
1200     u32 *weight;
1201     struct ceph_entity_addr *addr;
1202     u32 to_copy;
1203     int i;
1204 
1205     dout("%s old %u new %u\n", __func__, map->max_osd, max);
1206     if (max == map->max_osd)
1207         return 0;
1208 
1209     state = kvmalloc(array_size(max, sizeof(*state)), GFP_NOFS);
1210     weight = kvmalloc(array_size(max, sizeof(*weight)), GFP_NOFS);
1211     addr = kvmalloc(array_size(max, sizeof(*addr)), GFP_NOFS);
1212     if (!state || !weight || !addr) {
1213         kvfree(state);
1214         kvfree(weight);
1215         kvfree(addr);
1216         return -ENOMEM;
1217     }
1218 
1219     to_copy = min(map->max_osd, max);
1220     if (map->osd_state) {
1221         memcpy(state, map->osd_state, to_copy * sizeof(*state));
1222         memcpy(weight, map->osd_weight, to_copy * sizeof(*weight));
1223         memcpy(addr, map->osd_addr, to_copy * sizeof(*addr));
1224         kvfree(map->osd_state);
1225         kvfree(map->osd_weight);
1226         kvfree(map->osd_addr);
1227     }
1228 
1229     map->osd_state = state;
1230     map->osd_weight = weight;
1231     map->osd_addr = addr;
1232     for (i = map->max_osd; i < max; i++) {
1233         map->osd_state[i] = 0;
1234         map->osd_weight[i] = CEPH_OSD_OUT;
1235         memset(map->osd_addr + i, 0, sizeof(*map->osd_addr));
1236     }
1237 
1238     if (map->osd_primary_affinity) {
1239         u32 *affinity;
1240 
1241         affinity = kvmalloc(array_size(max, sizeof(*affinity)),
1242                      GFP_NOFS);
1243         if (!affinity)
1244             return -ENOMEM;
1245 
1246         memcpy(affinity, map->osd_primary_affinity,
1247                to_copy * sizeof(*affinity));
1248         kvfree(map->osd_primary_affinity);
1249 
1250         map->osd_primary_affinity = affinity;
1251         for (i = map->max_osd; i < max; i++)
1252             map->osd_primary_affinity[i] =
1253                 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1254     }
1255 
1256     map->max_osd = max;
1257 
1258     return 0;
1259 }
1260 
1261 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
1262 {
1263     struct crush_work *work;
1264 
1265     if (IS_ERR(crush))
1266         return PTR_ERR(crush);
1267 
1268     work = alloc_workspace(crush);
1269     if (!work) {
1270         crush_destroy(crush);
1271         return -ENOMEM;
1272     }
1273 
1274     if (map->crush)
1275         crush_destroy(map->crush);
1276     cleanup_workspace_manager(&map->crush_wsm);
1277     map->crush = crush;
1278     add_initial_workspace(&map->crush_wsm, work);
1279     return 0;
1280 }
1281 
1282 #define OSDMAP_WRAPPER_COMPAT_VER   7
1283 #define OSDMAP_CLIENT_DATA_COMPAT_VER   1
1284 
1285 /*
1286  * Return 0 or error.  On success, *v is set to 0 for old (v6) osdmaps,
1287  * to struct_v of the client_data section for new (v7 and above)
1288  * osdmaps.
1289  */
1290 static int get_osdmap_client_data_v(void **p, void *end,
1291                     const char *prefix, u8 *v)
1292 {
1293     u8 struct_v;
1294 
1295     ceph_decode_8_safe(p, end, struct_v, e_inval);
1296     if (struct_v >= 7) {
1297         u8 struct_compat;
1298 
1299         ceph_decode_8_safe(p, end, struct_compat, e_inval);
1300         if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) {
1301             pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n",
1302                 struct_v, struct_compat,
1303                 OSDMAP_WRAPPER_COMPAT_VER, prefix);
1304             return -EINVAL;
1305         }
1306         *p += 4; /* ignore wrapper struct_len */
1307 
1308         ceph_decode_8_safe(p, end, struct_v, e_inval);
1309         ceph_decode_8_safe(p, end, struct_compat, e_inval);
1310         if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) {
1311             pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n",
1312                 struct_v, struct_compat,
1313                 OSDMAP_CLIENT_DATA_COMPAT_VER, prefix);
1314             return -EINVAL;
1315         }
1316         *p += 4; /* ignore client data struct_len */
1317     } else {
1318         u16 version;
1319 
1320         *p -= 1;
1321         ceph_decode_16_safe(p, end, version, e_inval);
1322         if (version < 6) {
1323             pr_warn("got v %d < 6 of %s ceph_osdmap\n",
1324                 version, prefix);
1325             return -EINVAL;
1326         }
1327 
1328         /* old osdmap encoding */
1329         struct_v = 0;
1330     }
1331 
1332     *v = struct_v;
1333     return 0;
1334 
1335 e_inval:
1336     return -EINVAL;
1337 }
1338 
1339 static int __decode_pools(void **p, void *end, struct ceph_osdmap *map,
1340               bool incremental)
1341 {
1342     u32 n;
1343 
1344     ceph_decode_32_safe(p, end, n, e_inval);
1345     while (n--) {
1346         struct ceph_pg_pool_info *pi;
1347         u64 pool;
1348         int ret;
1349 
1350         ceph_decode_64_safe(p, end, pool, e_inval);
1351 
1352         pi = lookup_pg_pool(&map->pg_pools, pool);
1353         if (!incremental || !pi) {
1354             pi = kzalloc(sizeof(*pi), GFP_NOFS);
1355             if (!pi)
1356                 return -ENOMEM;
1357 
1358             RB_CLEAR_NODE(&pi->node);
1359             pi->id = pool;
1360 
1361             if (!__insert_pg_pool(&map->pg_pools, pi)) {
1362                 kfree(pi);
1363                 return -EEXIST;
1364             }
1365         }
1366 
1367         ret = decode_pool(p, end, pi);
1368         if (ret)
1369             return ret;
1370     }
1371 
1372     return 0;
1373 
1374 e_inval:
1375     return -EINVAL;
1376 }
1377 
1378 static int decode_pools(void **p, void *end, struct ceph_osdmap *map)
1379 {
1380     return __decode_pools(p, end, map, false);
1381 }
1382 
1383 static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map)
1384 {
1385     return __decode_pools(p, end, map, true);
1386 }
1387 
1388 typedef struct ceph_pg_mapping *(*decode_mapping_fn_t)(void **, void *, bool);
1389 
1390 static int decode_pg_mapping(void **p, void *end, struct rb_root *mapping_root,
1391                  decode_mapping_fn_t fn, bool incremental)
1392 {
1393     u32 n;
1394 
1395     WARN_ON(!incremental && !fn);
1396 
1397     ceph_decode_32_safe(p, end, n, e_inval);
1398     while (n--) {
1399         struct ceph_pg_mapping *pg;
1400         struct ceph_pg pgid;
1401         int ret;
1402 
1403         ret = ceph_decode_pgid(p, end, &pgid);
1404         if (ret)
1405             return ret;
1406 
1407         pg = lookup_pg_mapping(mapping_root, &pgid);
1408         if (pg) {
1409             WARN_ON(!incremental);
1410             erase_pg_mapping(mapping_root, pg);
1411             free_pg_mapping(pg);
1412         }
1413 
1414         if (fn) {
1415             pg = fn(p, end, incremental);
1416             if (IS_ERR(pg))
1417                 return PTR_ERR(pg);
1418 
1419             if (pg) {
1420                 pg->pgid = pgid; /* struct */
1421                 insert_pg_mapping(mapping_root, pg);
1422             }
1423         }
1424     }
1425 
1426     return 0;
1427 
1428 e_inval:
1429     return -EINVAL;
1430 }
1431 
1432 static struct ceph_pg_mapping *__decode_pg_temp(void **p, void *end,
1433                         bool incremental)
1434 {
1435     struct ceph_pg_mapping *pg;
1436     u32 len, i;
1437 
1438     ceph_decode_32_safe(p, end, len, e_inval);
1439     if (len == 0 && incremental)
1440         return NULL;    /* new_pg_temp: [] to remove */
1441     if (len > (SIZE_MAX - sizeof(*pg)) / sizeof(u32))
1442         return ERR_PTR(-EINVAL);
1443 
1444     ceph_decode_need(p, end, len * sizeof(u32), e_inval);
1445     pg = alloc_pg_mapping(len * sizeof(u32));
1446     if (!pg)
1447         return ERR_PTR(-ENOMEM);
1448 
1449     pg->pg_temp.len = len;
1450     for (i = 0; i < len; i++)
1451         pg->pg_temp.osds[i] = ceph_decode_32(p);
1452 
1453     return pg;
1454 
1455 e_inval:
1456     return ERR_PTR(-EINVAL);
1457 }
1458 
1459 static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map)
1460 {
1461     return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
1462                  false);
1463 }
1464 
1465 static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map)
1466 {
1467     return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
1468                  true);
1469 }
1470 
1471 static struct ceph_pg_mapping *__decode_primary_temp(void **p, void *end,
1472                              bool incremental)
1473 {
1474     struct ceph_pg_mapping *pg;
1475     u32 osd;
1476 
1477     ceph_decode_32_safe(p, end, osd, e_inval);
1478     if (osd == (u32)-1 && incremental)
1479         return NULL;    /* new_primary_temp: -1 to remove */
1480 
1481     pg = alloc_pg_mapping(0);
1482     if (!pg)
1483         return ERR_PTR(-ENOMEM);
1484 
1485     pg->primary_temp.osd = osd;
1486     return pg;
1487 
1488 e_inval:
1489     return ERR_PTR(-EINVAL);
1490 }
1491 
1492 static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map)
1493 {
1494     return decode_pg_mapping(p, end, &map->primary_temp,
1495                  __decode_primary_temp, false);
1496 }
1497 
1498 static int decode_new_primary_temp(void **p, void *end,
1499                    struct ceph_osdmap *map)
1500 {
1501     return decode_pg_mapping(p, end, &map->primary_temp,
1502                  __decode_primary_temp, true);
1503 }
1504 
1505 u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd)
1506 {
1507     BUG_ON(osd >= map->max_osd);
1508 
1509     if (!map->osd_primary_affinity)
1510         return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1511 
1512     return map->osd_primary_affinity[osd];
1513 }
1514 
1515 static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff)
1516 {
1517     BUG_ON(osd >= map->max_osd);
1518 
1519     if (!map->osd_primary_affinity) {
1520         int i;
1521 
1522         map->osd_primary_affinity = kvmalloc(
1523             array_size(map->max_osd, sizeof(*map->osd_primary_affinity)),
1524             GFP_NOFS);
1525         if (!map->osd_primary_affinity)
1526             return -ENOMEM;
1527 
1528         for (i = 0; i < map->max_osd; i++)
1529             map->osd_primary_affinity[i] =
1530                 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
1531     }
1532 
1533     map->osd_primary_affinity[osd] = aff;
1534 
1535     return 0;
1536 }
1537 
1538 static int decode_primary_affinity(void **p, void *end,
1539                    struct ceph_osdmap *map)
1540 {
1541     u32 len, i;
1542 
1543     ceph_decode_32_safe(p, end, len, e_inval);
1544     if (len == 0) {
1545         kvfree(map->osd_primary_affinity);
1546         map->osd_primary_affinity = NULL;
1547         return 0;
1548     }
1549     if (len != map->max_osd)
1550         goto e_inval;
1551 
1552     ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval);
1553 
1554     for (i = 0; i < map->max_osd; i++) {
1555         int ret;
1556 
1557         ret = set_primary_affinity(map, i, ceph_decode_32(p));
1558         if (ret)
1559             return ret;
1560     }
1561 
1562     return 0;
1563 
1564 e_inval:
1565     return -EINVAL;
1566 }
1567 
1568 static int decode_new_primary_affinity(void **p, void *end,
1569                        struct ceph_osdmap *map)
1570 {
1571     u32 n;
1572 
1573     ceph_decode_32_safe(p, end, n, e_inval);
1574     while (n--) {
1575         u32 osd, aff;
1576         int ret;
1577 
1578         ceph_decode_32_safe(p, end, osd, e_inval);
1579         ceph_decode_32_safe(p, end, aff, e_inval);
1580 
1581         ret = set_primary_affinity(map, osd, aff);
1582         if (ret)
1583             return ret;
1584 
1585         osdmap_info(map, "osd%d primary-affinity 0x%x\n", osd, aff);
1586     }
1587 
1588     return 0;
1589 
1590 e_inval:
1591     return -EINVAL;
1592 }
1593 
1594 static struct ceph_pg_mapping *__decode_pg_upmap(void **p, void *end,
1595                          bool __unused)
1596 {
1597     return __decode_pg_temp(p, end, false);
1598 }
1599 
1600 static int decode_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1601 {
1602     return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
1603                  false);
1604 }
1605 
1606 static int decode_new_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1607 {
1608     return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
1609                  true);
1610 }
1611 
1612 static int decode_old_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1613 {
1614     return decode_pg_mapping(p, end, &map->pg_upmap, NULL, true);
1615 }
1616 
1617 static struct ceph_pg_mapping *__decode_pg_upmap_items(void **p, void *end,
1618                                bool __unused)
1619 {
1620     struct ceph_pg_mapping *pg;
1621     u32 len, i;
1622 
1623     ceph_decode_32_safe(p, end, len, e_inval);
1624     if (len > (SIZE_MAX - sizeof(*pg)) / (2 * sizeof(u32)))
1625         return ERR_PTR(-EINVAL);
1626 
1627     ceph_decode_need(p, end, 2 * len * sizeof(u32), e_inval);
1628     pg = alloc_pg_mapping(2 * len * sizeof(u32));
1629     if (!pg)
1630         return ERR_PTR(-ENOMEM);
1631 
1632     pg->pg_upmap_items.len = len;
1633     for (i = 0; i < len; i++) {
1634         pg->pg_upmap_items.from_to[i][0] = ceph_decode_32(p);
1635         pg->pg_upmap_items.from_to[i][1] = ceph_decode_32(p);
1636     }
1637 
1638     return pg;
1639 
1640 e_inval:
1641     return ERR_PTR(-EINVAL);
1642 }
1643 
1644 static int decode_pg_upmap_items(void **p, void *end, struct ceph_osdmap *map)
1645 {
1646     return decode_pg_mapping(p, end, &map->pg_upmap_items,
1647                  __decode_pg_upmap_items, false);
1648 }
1649 
1650 static int decode_new_pg_upmap_items(void **p, void *end,
1651                      struct ceph_osdmap *map)
1652 {
1653     return decode_pg_mapping(p, end, &map->pg_upmap_items,
1654                  __decode_pg_upmap_items, true);
1655 }
1656 
1657 static int decode_old_pg_upmap_items(void **p, void *end,
1658                      struct ceph_osdmap *map)
1659 {
1660     return decode_pg_mapping(p, end, &map->pg_upmap_items, NULL, true);
1661 }
1662 
1663 /*
1664  * decode a full map.
1665  */
1666 static int osdmap_decode(void **p, void *end, bool msgr2,
1667              struct ceph_osdmap *map)
1668 {
1669     u8 struct_v;
1670     u32 epoch = 0;
1671     void *start = *p;
1672     u32 max;
1673     u32 len, i;
1674     int err;
1675 
1676     dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1677 
1678     err = get_osdmap_client_data_v(p, end, "full", &struct_v);
1679     if (err)
1680         goto bad;
1681 
1682     /* fsid, epoch, created, modified */
1683     ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) +
1684              sizeof(map->created) + sizeof(map->modified), e_inval);
1685     ceph_decode_copy(p, &map->fsid, sizeof(map->fsid));
1686     epoch = map->epoch = ceph_decode_32(p);
1687     ceph_decode_copy(p, &map->created, sizeof(map->created));
1688     ceph_decode_copy(p, &map->modified, sizeof(map->modified));
1689 
1690     /* pools */
1691     err = decode_pools(p, end, map);
1692     if (err)
1693         goto bad;
1694 
1695     /* pool_name */
1696     err = decode_pool_names(p, end, map);
1697     if (err)
1698         goto bad;
1699 
1700     ceph_decode_32_safe(p, end, map->pool_max, e_inval);
1701 
1702     ceph_decode_32_safe(p, end, map->flags, e_inval);
1703 
1704     /* max_osd */
1705     ceph_decode_32_safe(p, end, max, e_inval);
1706 
1707     /* (re)alloc osd arrays */
1708     err = osdmap_set_max_osd(map, max);
1709     if (err)
1710         goto bad;
1711 
1712     /* osd_state, osd_weight, osd_addrs->client_addr */
1713     ceph_decode_need(p, end, 3*sizeof(u32) +
1714              map->max_osd*(struct_v >= 5 ? sizeof(u32) :
1715                                sizeof(u8)) +
1716                        sizeof(*map->osd_weight), e_inval);
1717     if (ceph_decode_32(p) != map->max_osd)
1718         goto e_inval;
1719 
1720     if (struct_v >= 5) {
1721         for (i = 0; i < map->max_osd; i++)
1722             map->osd_state[i] = ceph_decode_32(p);
1723     } else {
1724         for (i = 0; i < map->max_osd; i++)
1725             map->osd_state[i] = ceph_decode_8(p);
1726     }
1727 
1728     if (ceph_decode_32(p) != map->max_osd)
1729         goto e_inval;
1730 
1731     for (i = 0; i < map->max_osd; i++)
1732         map->osd_weight[i] = ceph_decode_32(p);
1733 
1734     if (ceph_decode_32(p) != map->max_osd)
1735         goto e_inval;
1736 
1737     for (i = 0; i < map->max_osd; i++) {
1738         struct ceph_entity_addr *addr = &map->osd_addr[i];
1739 
1740         if (struct_v >= 8)
1741             err = ceph_decode_entity_addrvec(p, end, msgr2, addr);
1742         else
1743             err = ceph_decode_entity_addr(p, end, addr);
1744         if (err)
1745             goto bad;
1746 
1747         dout("%s osd%d addr %s\n", __func__, i, ceph_pr_addr(addr));
1748     }
1749 
1750     /* pg_temp */
1751     err = decode_pg_temp(p, end, map);
1752     if (err)
1753         goto bad;
1754 
1755     /* primary_temp */
1756     if (struct_v >= 1) {
1757         err = decode_primary_temp(p, end, map);
1758         if (err)
1759             goto bad;
1760     }
1761 
1762     /* primary_affinity */
1763     if (struct_v >= 2) {
1764         err = decode_primary_affinity(p, end, map);
1765         if (err)
1766             goto bad;
1767     } else {
1768         WARN_ON(map->osd_primary_affinity);
1769     }
1770 
1771     /* crush */
1772     ceph_decode_32_safe(p, end, len, e_inval);
1773     err = osdmap_set_crush(map, crush_decode(*p, min(*p + len, end)));
1774     if (err)
1775         goto bad;
1776 
1777     *p += len;
1778     if (struct_v >= 3) {
1779         /* erasure_code_profiles */
1780         ceph_decode_skip_map_of_map(p, end, string, string, string,
1781                         e_inval);
1782     }
1783 
1784     if (struct_v >= 4) {
1785         err = decode_pg_upmap(p, end, map);
1786         if (err)
1787             goto bad;
1788 
1789         err = decode_pg_upmap_items(p, end, map);
1790         if (err)
1791             goto bad;
1792     } else {
1793         WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap));
1794         WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap_items));
1795     }
1796 
1797     /* ignore the rest */
1798     *p = end;
1799 
1800     dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
1801     return 0;
1802 
1803 e_inval:
1804     err = -EINVAL;
1805 bad:
1806     pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1807            err, epoch, (int)(*p - start), *p, start, end);
1808     print_hex_dump(KERN_DEBUG, "osdmap: ",
1809                DUMP_PREFIX_OFFSET, 16, 1,
1810                start, end - start, true);
1811     return err;
1812 }
1813 
1814 /*
1815  * Allocate and decode a full map.
1816  */
1817 struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end, bool msgr2)
1818 {
1819     struct ceph_osdmap *map;
1820     int ret;
1821 
1822     map = ceph_osdmap_alloc();
1823     if (!map)
1824         return ERR_PTR(-ENOMEM);
1825 
1826     ret = osdmap_decode(p, end, msgr2, map);
1827     if (ret) {
1828         ceph_osdmap_destroy(map);
1829         return ERR_PTR(ret);
1830     }
1831 
1832     return map;
1833 }
1834 
1835 /*
1836  * Encoding order is (new_up_client, new_state, new_weight).  Need to
1837  * apply in the (new_weight, new_state, new_up_client) order, because
1838  * an incremental map may look like e.g.
1839  *
1840  *     new_up_client: { osd=6, addr=... } # set osd_state and addr
1841  *     new_state: { osd=6, xorstate=EXISTS } # clear osd_state
1842  */
1843 static int decode_new_up_state_weight(void **p, void *end, u8 struct_v,
1844                       bool msgr2, struct ceph_osdmap *map)
1845 {
1846     void *new_up_client;
1847     void *new_state;
1848     void *new_weight_end;
1849     u32 len;
1850     int ret;
1851     int i;
1852 
1853     new_up_client = *p;
1854     ceph_decode_32_safe(p, end, len, e_inval);
1855     for (i = 0; i < len; ++i) {
1856         struct ceph_entity_addr addr;
1857 
1858         ceph_decode_skip_32(p, end, e_inval);
1859         if (struct_v >= 7)
1860             ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr);
1861         else
1862             ret = ceph_decode_entity_addr(p, end, &addr);
1863         if (ret)
1864             return ret;
1865     }
1866 
1867     new_state = *p;
1868     ceph_decode_32_safe(p, end, len, e_inval);
1869     len *= sizeof(u32) + (struct_v >= 5 ? sizeof(u32) : sizeof(u8));
1870     ceph_decode_need(p, end, len, e_inval);
1871     *p += len;
1872 
1873     /* new_weight */
1874     ceph_decode_32_safe(p, end, len, e_inval);
1875     while (len--) {
1876         s32 osd;
1877         u32 w;
1878 
1879         ceph_decode_need(p, end, 2*sizeof(u32), e_inval);
1880         osd = ceph_decode_32(p);
1881         w = ceph_decode_32(p);
1882         BUG_ON(osd >= map->max_osd);
1883         osdmap_info(map, "osd%d weight 0x%x %s\n", osd, w,
1884                 w == CEPH_OSD_IN ? "(in)" :
1885                 (w == CEPH_OSD_OUT ? "(out)" : ""));
1886         map->osd_weight[osd] = w;
1887 
1888         /*
1889          * If we are marking in, set the EXISTS, and clear the
1890          * AUTOOUT and NEW bits.
1891          */
1892         if (w) {
1893             map->osd_state[osd] |= CEPH_OSD_EXISTS;
1894             map->osd_state[osd] &= ~(CEPH_OSD_AUTOOUT |
1895                          CEPH_OSD_NEW);
1896         }
1897     }
1898     new_weight_end = *p;
1899 
1900     /* new_state (up/down) */
1901     *p = new_state;
1902     len = ceph_decode_32(p);
1903     while (len--) {
1904         s32 osd;
1905         u32 xorstate;
1906 
1907         osd = ceph_decode_32(p);
1908         if (struct_v >= 5)
1909             xorstate = ceph_decode_32(p);
1910         else
1911             xorstate = ceph_decode_8(p);
1912         if (xorstate == 0)
1913             xorstate = CEPH_OSD_UP;
1914         BUG_ON(osd >= map->max_osd);
1915         if ((map->osd_state[osd] & CEPH_OSD_UP) &&
1916             (xorstate & CEPH_OSD_UP))
1917             osdmap_info(map, "osd%d down\n", osd);
1918         if ((map->osd_state[osd] & CEPH_OSD_EXISTS) &&
1919             (xorstate & CEPH_OSD_EXISTS)) {
1920             osdmap_info(map, "osd%d does not exist\n", osd);
1921             ret = set_primary_affinity(map, osd,
1922                            CEPH_OSD_DEFAULT_PRIMARY_AFFINITY);
1923             if (ret)
1924                 return ret;
1925             memset(map->osd_addr + osd, 0, sizeof(*map->osd_addr));
1926             map->osd_state[osd] = 0;
1927         } else {
1928             map->osd_state[osd] ^= xorstate;
1929         }
1930     }
1931 
1932     /* new_up_client */
1933     *p = new_up_client;
1934     len = ceph_decode_32(p);
1935     while (len--) {
1936         s32 osd;
1937         struct ceph_entity_addr addr;
1938 
1939         osd = ceph_decode_32(p);
1940         BUG_ON(osd >= map->max_osd);
1941         if (struct_v >= 7)
1942             ret = ceph_decode_entity_addrvec(p, end, msgr2, &addr);
1943         else
1944             ret = ceph_decode_entity_addr(p, end, &addr);
1945         if (ret)
1946             return ret;
1947 
1948         dout("%s osd%d addr %s\n", __func__, osd, ceph_pr_addr(&addr));
1949 
1950         osdmap_info(map, "osd%d up\n", osd);
1951         map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
1952         map->osd_addr[osd] = addr;
1953     }
1954 
1955     *p = new_weight_end;
1956     return 0;
1957 
1958 e_inval:
1959     return -EINVAL;
1960 }
1961 
1962 /*
1963  * decode and apply an incremental map update.
1964  */
1965 struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, bool msgr2,
1966                          struct ceph_osdmap *map)
1967 {
1968     struct ceph_fsid fsid;
1969     u32 epoch = 0;
1970     struct ceph_timespec modified;
1971     s32 len;
1972     u64 pool;
1973     __s64 new_pool_max;
1974     __s32 new_flags, max;
1975     void *start = *p;
1976     int err;
1977     u8 struct_v;
1978 
1979     dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1980 
1981     err = get_osdmap_client_data_v(p, end, "inc", &struct_v);
1982     if (err)
1983         goto bad;
1984 
1985     /* fsid, epoch, modified, new_pool_max, new_flags */
1986     ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) +
1987              sizeof(u64) + sizeof(u32), e_inval);
1988     ceph_decode_copy(p, &fsid, sizeof(fsid));
1989     epoch = ceph_decode_32(p);
1990     BUG_ON(epoch != map->epoch+1);
1991     ceph_decode_copy(p, &modified, sizeof(modified));
1992     new_pool_max = ceph_decode_64(p);
1993     new_flags = ceph_decode_32(p);
1994 
1995     /* full map? */
1996     ceph_decode_32_safe(p, end, len, e_inval);
1997     if (len > 0) {
1998         dout("apply_incremental full map len %d, %p to %p\n",
1999              len, *p, end);
2000         return ceph_osdmap_decode(p, min(*p+len, end), msgr2);
2001     }
2002 
2003     /* new crush? */
2004     ceph_decode_32_safe(p, end, len, e_inval);
2005     if (len > 0) {
2006         err = osdmap_set_crush(map,
2007                        crush_decode(*p, min(*p + len, end)));
2008         if (err)
2009             goto bad;
2010         *p += len;
2011     }
2012 
2013     /* new flags? */
2014     if (new_flags >= 0)
2015         map->flags = new_flags;
2016     if (new_pool_max >= 0)
2017         map->pool_max = new_pool_max;
2018 
2019     /* new max? */
2020     ceph_decode_32_safe(p, end, max, e_inval);
2021     if (max >= 0) {
2022         err = osdmap_set_max_osd(map, max);
2023         if (err)
2024             goto bad;
2025     }
2026 
2027     map->epoch++;
2028     map->modified = modified;
2029 
2030     /* new_pools */
2031     err = decode_new_pools(p, end, map);
2032     if (err)
2033         goto bad;
2034 
2035     /* new_pool_names */
2036     err = decode_pool_names(p, end, map);
2037     if (err)
2038         goto bad;
2039 
2040     /* old_pool */
2041     ceph_decode_32_safe(p, end, len, e_inval);
2042     while (len--) {
2043         struct ceph_pg_pool_info *pi;
2044 
2045         ceph_decode_64_safe(p, end, pool, e_inval);
2046         pi = lookup_pg_pool(&map->pg_pools, pool);
2047         if (pi)
2048             __remove_pg_pool(&map->pg_pools, pi);
2049     }
2050 
2051     /* new_up_client, new_state, new_weight */
2052     err = decode_new_up_state_weight(p, end, struct_v, msgr2, map);
2053     if (err)
2054         goto bad;
2055 
2056     /* new_pg_temp */
2057     err = decode_new_pg_temp(p, end, map);
2058     if (err)
2059         goto bad;
2060 
2061     /* new_primary_temp */
2062     if (struct_v >= 1) {
2063         err = decode_new_primary_temp(p, end, map);
2064         if (err)
2065             goto bad;
2066     }
2067 
2068     /* new_primary_affinity */
2069     if (struct_v >= 2) {
2070         err = decode_new_primary_affinity(p, end, map);
2071         if (err)
2072             goto bad;
2073     }
2074 
2075     if (struct_v >= 3) {
2076         /* new_erasure_code_profiles */
2077         ceph_decode_skip_map_of_map(p, end, string, string, string,
2078                         e_inval);
2079         /* old_erasure_code_profiles */
2080         ceph_decode_skip_set(p, end, string, e_inval);
2081     }
2082 
2083     if (struct_v >= 4) {
2084         err = decode_new_pg_upmap(p, end, map);
2085         if (err)
2086             goto bad;
2087 
2088         err = decode_old_pg_upmap(p, end, map);
2089         if (err)
2090             goto bad;
2091 
2092         err = decode_new_pg_upmap_items(p, end, map);
2093         if (err)
2094             goto bad;
2095 
2096         err = decode_old_pg_upmap_items(p, end, map);
2097         if (err)
2098             goto bad;
2099     }
2100 
2101     /* ignore the rest */
2102     *p = end;
2103 
2104     dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
2105     return map;
2106 
2107 e_inval:
2108     err = -EINVAL;
2109 bad:
2110     pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
2111            err, epoch, (int)(*p - start), *p, start, end);
2112     print_hex_dump(KERN_DEBUG, "osdmap: ",
2113                DUMP_PREFIX_OFFSET, 16, 1,
2114                start, end - start, true);
2115     return ERR_PTR(err);
2116 }
2117 
2118 void ceph_oloc_copy(struct ceph_object_locator *dest,
2119             const struct ceph_object_locator *src)
2120 {
2121     ceph_oloc_destroy(dest);
2122 
2123     dest->pool = src->pool;
2124     if (src->pool_ns)
2125         dest->pool_ns = ceph_get_string(src->pool_ns);
2126     else
2127         dest->pool_ns = NULL;
2128 }
2129 EXPORT_SYMBOL(ceph_oloc_copy);
2130 
2131 void ceph_oloc_destroy(struct ceph_object_locator *oloc)
2132 {
2133     ceph_put_string(oloc->pool_ns);
2134 }
2135 EXPORT_SYMBOL(ceph_oloc_destroy);
2136 
2137 void ceph_oid_copy(struct ceph_object_id *dest,
2138            const struct ceph_object_id *src)
2139 {
2140     ceph_oid_destroy(dest);
2141 
2142     if (src->name != src->inline_name) {
2143         /* very rare, see ceph_object_id definition */
2144         dest->name = kmalloc(src->name_len + 1,
2145                      GFP_NOIO | __GFP_NOFAIL);
2146     } else {
2147         dest->name = dest->inline_name;
2148     }
2149     memcpy(dest->name, src->name, src->name_len + 1);
2150     dest->name_len = src->name_len;
2151 }
2152 EXPORT_SYMBOL(ceph_oid_copy);
2153 
2154 static __printf(2, 0)
2155 int oid_printf_vargs(struct ceph_object_id *oid, const char *fmt, va_list ap)
2156 {
2157     int len;
2158 
2159     WARN_ON(!ceph_oid_empty(oid));
2160 
2161     len = vsnprintf(oid->inline_name, sizeof(oid->inline_name), fmt, ap);
2162     if (len >= sizeof(oid->inline_name))
2163         return len;
2164 
2165     oid->name_len = len;
2166     return 0;
2167 }
2168 
2169 /*
2170  * If oid doesn't fit into inline buffer, BUG.
2171  */
2172 void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...)
2173 {
2174     va_list ap;
2175 
2176     va_start(ap, fmt);
2177     BUG_ON(oid_printf_vargs(oid, fmt, ap));
2178     va_end(ap);
2179 }
2180 EXPORT_SYMBOL(ceph_oid_printf);
2181 
2182 static __printf(3, 0)
2183 int oid_aprintf_vargs(struct ceph_object_id *oid, gfp_t gfp,
2184               const char *fmt, va_list ap)
2185 {
2186     va_list aq;
2187     int len;
2188 
2189     va_copy(aq, ap);
2190     len = oid_printf_vargs(oid, fmt, aq);
2191     va_end(aq);
2192 
2193     if (len) {
2194         char *external_name;
2195 
2196         external_name = kmalloc(len + 1, gfp);
2197         if (!external_name)
2198             return -ENOMEM;
2199 
2200         oid->name = external_name;
2201         WARN_ON(vsnprintf(oid->name, len + 1, fmt, ap) != len);
2202         oid->name_len = len;
2203     }
2204 
2205     return 0;
2206 }
2207 
2208 /*
2209  * If oid doesn't fit into inline buffer, allocate.
2210  */
2211 int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
2212              const char *fmt, ...)
2213 {
2214     va_list ap;
2215     int ret;
2216 
2217     va_start(ap, fmt);
2218     ret = oid_aprintf_vargs(oid, gfp, fmt, ap);
2219     va_end(ap);
2220 
2221     return ret;
2222 }
2223 EXPORT_SYMBOL(ceph_oid_aprintf);
2224 
2225 void ceph_oid_destroy(struct ceph_object_id *oid)
2226 {
2227     if (oid->name != oid->inline_name)
2228         kfree(oid->name);
2229 }
2230 EXPORT_SYMBOL(ceph_oid_destroy);
2231 
2232 /*
2233  * osds only
2234  */
2235 static bool __osds_equal(const struct ceph_osds *lhs,
2236              const struct ceph_osds *rhs)
2237 {
2238     if (lhs->size == rhs->size &&
2239         !memcmp(lhs->osds, rhs->osds, rhs->size * sizeof(rhs->osds[0])))
2240         return true;
2241 
2242     return false;
2243 }
2244 
2245 /*
2246  * osds + primary
2247  */
2248 static bool osds_equal(const struct ceph_osds *lhs,
2249                const struct ceph_osds *rhs)
2250 {
2251     if (__osds_equal(lhs, rhs) &&
2252         lhs->primary == rhs->primary)
2253         return true;
2254 
2255     return false;
2256 }
2257 
2258 static bool osds_valid(const struct ceph_osds *set)
2259 {
2260     /* non-empty set */
2261     if (set->size > 0 && set->primary >= 0)
2262         return true;
2263 
2264     /* empty can_shift_osds set */
2265     if (!set->size && set->primary == -1)
2266         return true;
2267 
2268     /* empty !can_shift_osds set - all NONE */
2269     if (set->size > 0 && set->primary == -1) {
2270         int i;
2271 
2272         for (i = 0; i < set->size; i++) {
2273             if (set->osds[i] != CRUSH_ITEM_NONE)
2274                 break;
2275         }
2276         if (i == set->size)
2277             return true;
2278     }
2279 
2280     return false;
2281 }
2282 
2283 void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src)
2284 {
2285     memcpy(dest->osds, src->osds, src->size * sizeof(src->osds[0]));
2286     dest->size = src->size;
2287     dest->primary = src->primary;
2288 }
2289 
2290 bool ceph_pg_is_split(const struct ceph_pg *pgid, u32 old_pg_num,
2291               u32 new_pg_num)
2292 {
2293     int old_bits = calc_bits_of(old_pg_num);
2294     int old_mask = (1 << old_bits) - 1;
2295     int n;
2296 
2297     WARN_ON(pgid->seed >= old_pg_num);
2298     if (new_pg_num <= old_pg_num)
2299         return false;
2300 
2301     for (n = 1; ; n++) {
2302         int next_bit = n << (old_bits - 1);
2303         u32 s = next_bit | pgid->seed;
2304 
2305         if (s < old_pg_num || s == pgid->seed)
2306             continue;
2307         if (s >= new_pg_num)
2308             break;
2309 
2310         s = ceph_stable_mod(s, old_pg_num, old_mask);
2311         if (s == pgid->seed)
2312             return true;
2313     }
2314 
2315     return false;
2316 }
2317 
2318 bool ceph_is_new_interval(const struct ceph_osds *old_acting,
2319               const struct ceph_osds *new_acting,
2320               const struct ceph_osds *old_up,
2321               const struct ceph_osds *new_up,
2322               int old_size,
2323               int new_size,
2324               int old_min_size,
2325               int new_min_size,
2326               u32 old_pg_num,
2327               u32 new_pg_num,
2328               bool old_sort_bitwise,
2329               bool new_sort_bitwise,
2330               bool old_recovery_deletes,
2331               bool new_recovery_deletes,
2332               const struct ceph_pg *pgid)
2333 {
2334     return !osds_equal(old_acting, new_acting) ||
2335            !osds_equal(old_up, new_up) ||
2336            old_size != new_size ||
2337            old_min_size != new_min_size ||
2338            ceph_pg_is_split(pgid, old_pg_num, new_pg_num) ||
2339            old_sort_bitwise != new_sort_bitwise ||
2340            old_recovery_deletes != new_recovery_deletes;
2341 }
2342 
2343 static int calc_pg_rank(int osd, const struct ceph_osds *acting)
2344 {
2345     int i;
2346 
2347     for (i = 0; i < acting->size; i++) {
2348         if (acting->osds[i] == osd)
2349             return i;
2350     }
2351 
2352     return -1;
2353 }
2354 
2355 static bool primary_changed(const struct ceph_osds *old_acting,
2356                 const struct ceph_osds *new_acting)
2357 {
2358     if (!old_acting->size && !new_acting->size)
2359         return false; /* both still empty */
2360 
2361     if (!old_acting->size ^ !new_acting->size)
2362         return true; /* was empty, now not, or vice versa */
2363 
2364     if (old_acting->primary != new_acting->primary)
2365         return true; /* primary changed */
2366 
2367     if (calc_pg_rank(old_acting->primary, old_acting) !=
2368         calc_pg_rank(new_acting->primary, new_acting))
2369         return true;
2370 
2371     return false; /* same primary (tho replicas may have changed) */
2372 }
2373 
2374 bool ceph_osds_changed(const struct ceph_osds *old_acting,
2375                const struct ceph_osds *new_acting,
2376                bool any_change)
2377 {
2378     if (primary_changed(old_acting, new_acting))
2379         return true;
2380 
2381     if (any_change && !__osds_equal(old_acting, new_acting))
2382         return true;
2383 
2384     return false;
2385 }
2386 
2387 /*
2388  * Map an object into a PG.
2389  *
2390  * Should only be called with target_oid and target_oloc (as opposed to
2391  * base_oid and base_oloc), since tiering isn't taken into account.
2392  */
2393 void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
2394                  const struct ceph_object_id *oid,
2395                  const struct ceph_object_locator *oloc,
2396                  struct ceph_pg *raw_pgid)
2397 {
2398     WARN_ON(pi->id != oloc->pool);
2399 
2400     if (!oloc->pool_ns) {
2401         raw_pgid->pool = oloc->pool;
2402         raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
2403                          oid->name_len);
2404         dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
2405              raw_pgid->pool, raw_pgid->seed);
2406     } else {
2407         char stack_buf[256];
2408         char *buf = stack_buf;
2409         int nsl = oloc->pool_ns->len;
2410         size_t total = nsl + 1 + oid->name_len;
2411 
2412         if (total > sizeof(stack_buf))
2413             buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL);
2414         memcpy(buf, oloc->pool_ns->str, nsl);
2415         buf[nsl] = '\037';
2416         memcpy(buf + nsl + 1, oid->name, oid->name_len);
2417         raw_pgid->pool = oloc->pool;
2418         raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total);
2419         if (buf != stack_buf)
2420             kfree(buf);
2421         dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__,
2422              oid->name, nsl, oloc->pool_ns->str,
2423              raw_pgid->pool, raw_pgid->seed);
2424     }
2425 }
2426 
2427 int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
2428                   const struct ceph_object_id *oid,
2429                   const struct ceph_object_locator *oloc,
2430                   struct ceph_pg *raw_pgid)
2431 {
2432     struct ceph_pg_pool_info *pi;
2433 
2434     pi = ceph_pg_pool_by_id(osdmap, oloc->pool);
2435     if (!pi)
2436         return -ENOENT;
2437 
2438     __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid);
2439     return 0;
2440 }
2441 EXPORT_SYMBOL(ceph_object_locator_to_pg);
2442 
2443 /*
2444  * Map a raw PG (full precision ps) into an actual PG.
2445  */
2446 static void raw_pg_to_pg(struct ceph_pg_pool_info *pi,
2447              const struct ceph_pg *raw_pgid,
2448              struct ceph_pg *pgid)
2449 {
2450     pgid->pool = raw_pgid->pool;
2451     pgid->seed = ceph_stable_mod(raw_pgid->seed, pi->pg_num,
2452                      pi->pg_num_mask);
2453 }
2454 
2455 /*
2456  * Map a raw PG (full precision ps) into a placement ps (placement
2457  * seed).  Include pool id in that value so that different pools don't
2458  * use the same seeds.
2459  */
2460 static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi,
2461              const struct ceph_pg *raw_pgid)
2462 {
2463     if (pi->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
2464         /* hash pool id and seed so that pool PGs do not overlap */
2465         return crush_hash32_2(CRUSH_HASH_RJENKINS1,
2466                       ceph_stable_mod(raw_pgid->seed,
2467                               pi->pgp_num,
2468                               pi->pgp_num_mask),
2469                       raw_pgid->pool);
2470     } else {
2471         /*
2472          * legacy behavior: add ps and pool together.  this is
2473          * not a great approach because the PGs from each pool
2474          * will overlap on top of each other: 0.5 == 1.4 ==
2475          * 2.3 == ...
2476          */
2477         return ceph_stable_mod(raw_pgid->seed, pi->pgp_num,
2478                        pi->pgp_num_mask) +
2479                (unsigned)raw_pgid->pool;
2480     }
2481 }
2482 
2483 /*
2484  * Magic value used for a "default" fallback choose_args, used if the
2485  * crush_choose_arg_map passed to do_crush() does not exist.  If this
2486  * also doesn't exist, fall back to canonical weights.
2487  */
2488 #define CEPH_DEFAULT_CHOOSE_ARGS    -1
2489 
2490 static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
2491             int *result, int result_max,
2492             const __u32 *weight, int weight_max,
2493             s64 choose_args_index)
2494 {
2495     struct crush_choose_arg_map *arg_map;
2496     struct crush_work *work;
2497     int r;
2498 
2499     BUG_ON(result_max > CEPH_PG_MAX_SIZE);
2500 
2501     arg_map = lookup_choose_arg_map(&map->crush->choose_args,
2502                     choose_args_index);
2503     if (!arg_map)
2504         arg_map = lookup_choose_arg_map(&map->crush->choose_args,
2505                         CEPH_DEFAULT_CHOOSE_ARGS);
2506 
2507     work = get_workspace(&map->crush_wsm, map->crush);
2508     r = crush_do_rule(map->crush, ruleno, x, result, result_max,
2509               weight, weight_max, work,
2510               arg_map ? arg_map->args : NULL);
2511     put_workspace(&map->crush_wsm, work);
2512     return r;
2513 }
2514 
2515 static void remove_nonexistent_osds(struct ceph_osdmap *osdmap,
2516                     struct ceph_pg_pool_info *pi,
2517                     struct ceph_osds *set)
2518 {
2519     int i;
2520 
2521     if (ceph_can_shift_osds(pi)) {
2522         int removed = 0;
2523 
2524         /* shift left */
2525         for (i = 0; i < set->size; i++) {
2526             if (!ceph_osd_exists(osdmap, set->osds[i])) {
2527                 removed++;
2528                 continue;
2529             }
2530             if (removed)
2531                 set->osds[i - removed] = set->osds[i];
2532         }
2533         set->size -= removed;
2534     } else {
2535         /* set dne devices to NONE */
2536         for (i = 0; i < set->size; i++) {
2537             if (!ceph_osd_exists(osdmap, set->osds[i]))
2538                 set->osds[i] = CRUSH_ITEM_NONE;
2539         }
2540     }
2541 }
2542 
2543 /*
2544  * Calculate raw set (CRUSH output) for given PG and filter out
2545  * nonexistent OSDs.  ->primary is undefined for a raw set.
2546  *
2547  * Placement seed (CRUSH input) is returned through @ppps.
2548  */
2549 static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
2550                struct ceph_pg_pool_info *pi,
2551                const struct ceph_pg *raw_pgid,
2552                struct ceph_osds *raw,
2553                u32 *ppps)
2554 {
2555     u32 pps = raw_pg_to_pps(pi, raw_pgid);
2556     int ruleno;
2557     int len;
2558 
2559     ceph_osds_init(raw);
2560     if (ppps)
2561         *ppps = pps;
2562 
2563     ruleno = crush_find_rule(osdmap->crush, pi->crush_ruleset, pi->type,
2564                  pi->size);
2565     if (ruleno < 0) {
2566         pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
2567                pi->id, pi->crush_ruleset, pi->type, pi->size);
2568         return;
2569     }
2570 
2571     if (pi->size > ARRAY_SIZE(raw->osds)) {
2572         pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n",
2573                pi->id, pi->crush_ruleset, pi->type, pi->size,
2574                ARRAY_SIZE(raw->osds));
2575         return;
2576     }
2577 
2578     len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size,
2579                osdmap->osd_weight, osdmap->max_osd, pi->id);
2580     if (len < 0) {
2581         pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
2582                len, ruleno, pi->id, pi->crush_ruleset, pi->type,
2583                pi->size);
2584         return;
2585     }
2586 
2587     raw->size = len;
2588     remove_nonexistent_osds(osdmap, pi, raw);
2589 }
2590 
2591 /* apply pg_upmap[_items] mappings */
2592 static void apply_upmap(struct ceph_osdmap *osdmap,
2593             const struct ceph_pg *pgid,
2594             struct ceph_osds *raw)
2595 {
2596     struct ceph_pg_mapping *pg;
2597     int i, j;
2598 
2599     pg = lookup_pg_mapping(&osdmap->pg_upmap, pgid);
2600     if (pg) {
2601         /* make sure targets aren't marked out */
2602         for (i = 0; i < pg->pg_upmap.len; i++) {
2603             int osd = pg->pg_upmap.osds[i];
2604 
2605             if (osd != CRUSH_ITEM_NONE &&
2606                 osd < osdmap->max_osd &&
2607                 osdmap->osd_weight[osd] == 0) {
2608                 /* reject/ignore explicit mapping */
2609                 return;
2610             }
2611         }
2612         for (i = 0; i < pg->pg_upmap.len; i++)
2613             raw->osds[i] = pg->pg_upmap.osds[i];
2614         raw->size = pg->pg_upmap.len;
2615         /* check and apply pg_upmap_items, if any */
2616     }
2617 
2618     pg = lookup_pg_mapping(&osdmap->pg_upmap_items, pgid);
2619     if (pg) {
2620         /*
2621          * Note: this approach does not allow a bidirectional swap,
2622          * e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1].
2623          */
2624         for (i = 0; i < pg->pg_upmap_items.len; i++) {
2625             int from = pg->pg_upmap_items.from_to[i][0];
2626             int to = pg->pg_upmap_items.from_to[i][1];
2627             int pos = -1;
2628             bool exists = false;
2629 
2630             /* make sure replacement doesn't already appear */
2631             for (j = 0; j < raw->size; j++) {
2632                 int osd = raw->osds[j];
2633 
2634                 if (osd == to) {
2635                     exists = true;
2636                     break;
2637                 }
2638                 /* ignore mapping if target is marked out */
2639                 if (osd == from && pos < 0 &&
2640                     !(to != CRUSH_ITEM_NONE &&
2641                       to < osdmap->max_osd &&
2642                       osdmap->osd_weight[to] == 0)) {
2643                     pos = j;
2644                 }
2645             }
2646             if (!exists && pos >= 0)
2647                 raw->osds[pos] = to;
2648         }
2649     }
2650 }
2651 
2652 /*
2653  * Given raw set, calculate up set and up primary.  By definition of an
2654  * up set, the result won't contain nonexistent or down OSDs.
2655  *
2656  * This is done in-place - on return @set is the up set.  If it's
2657  * empty, ->primary will remain undefined.
2658  */
2659 static void raw_to_up_osds(struct ceph_osdmap *osdmap,
2660                struct ceph_pg_pool_info *pi,
2661                struct ceph_osds *set)
2662 {
2663     int i;
2664 
2665     /* ->primary is undefined for a raw set */
2666     BUG_ON(set->primary != -1);
2667 
2668     if (ceph_can_shift_osds(pi)) {
2669         int removed = 0;
2670 
2671         /* shift left */
2672         for (i = 0; i < set->size; i++) {
2673             if (ceph_osd_is_down(osdmap, set->osds[i])) {
2674                 removed++;
2675                 continue;
2676             }
2677             if (removed)
2678                 set->osds[i - removed] = set->osds[i];
2679         }
2680         set->size -= removed;
2681         if (set->size > 0)
2682             set->primary = set->osds[0];
2683     } else {
2684         /* set down/dne devices to NONE */
2685         for (i = set->size - 1; i >= 0; i--) {
2686             if (ceph_osd_is_down(osdmap, set->osds[i]))
2687                 set->osds[i] = CRUSH_ITEM_NONE;
2688             else
2689                 set->primary = set->osds[i];
2690         }
2691     }
2692 }
2693 
2694 static void apply_primary_affinity(struct ceph_osdmap *osdmap,
2695                    struct ceph_pg_pool_info *pi,
2696                    u32 pps,
2697                    struct ceph_osds *up)
2698 {
2699     int i;
2700     int pos = -1;
2701 
2702     /*
2703      * Do we have any non-default primary_affinity values for these
2704      * osds?
2705      */
2706     if (!osdmap->osd_primary_affinity)
2707         return;
2708 
2709     for (i = 0; i < up->size; i++) {
2710         int osd = up->osds[i];
2711 
2712         if (osd != CRUSH_ITEM_NONE &&
2713             osdmap->osd_primary_affinity[osd] !=
2714                     CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) {
2715             break;
2716         }
2717     }
2718     if (i == up->size)
2719         return;
2720 
2721     /*
2722      * Pick the primary.  Feed both the seed (for the pg) and the
2723      * osd into the hash/rng so that a proportional fraction of an
2724      * osd's pgs get rejected as primary.
2725      */
2726     for (i = 0; i < up->size; i++) {
2727         int osd = up->osds[i];
2728         u32 aff;
2729 
2730         if (osd == CRUSH_ITEM_NONE)
2731             continue;
2732 
2733         aff = osdmap->osd_primary_affinity[osd];
2734         if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY &&
2735             (crush_hash32_2(CRUSH_HASH_RJENKINS1,
2736                     pps, osd) >> 16) >= aff) {
2737             /*
2738              * We chose not to use this primary.  Note it
2739              * anyway as a fallback in case we don't pick
2740              * anyone else, but keep looking.
2741              */
2742             if (pos < 0)
2743                 pos = i;
2744         } else {
2745             pos = i;
2746             break;
2747         }
2748     }
2749     if (pos < 0)
2750         return;
2751 
2752     up->primary = up->osds[pos];
2753 
2754     if (ceph_can_shift_osds(pi) && pos > 0) {
2755         /* move the new primary to the front */
2756         for (i = pos; i > 0; i--)
2757             up->osds[i] = up->osds[i - 1];
2758         up->osds[0] = up->primary;
2759     }
2760 }
2761 
2762 /*
2763  * Get pg_temp and primary_temp mappings for given PG.
2764  *
2765  * Note that a PG may have none, only pg_temp, only primary_temp or
2766  * both pg_temp and primary_temp mappings.  This means @temp isn't
2767  * always a valid OSD set on return: in the "only primary_temp" case,
2768  * @temp will have its ->primary >= 0 but ->size == 0.
2769  */
2770 static void get_temp_osds(struct ceph_osdmap *osdmap,
2771               struct ceph_pg_pool_info *pi,
2772               const struct ceph_pg *pgid,
2773               struct ceph_osds *temp)
2774 {
2775     struct ceph_pg_mapping *pg;
2776     int i;
2777 
2778     ceph_osds_init(temp);
2779 
2780     /* pg_temp? */
2781     pg = lookup_pg_mapping(&osdmap->pg_temp, pgid);
2782     if (pg) {
2783         for (i = 0; i < pg->pg_temp.len; i++) {
2784             if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
2785                 if (ceph_can_shift_osds(pi))
2786                     continue;
2787 
2788                 temp->osds[temp->size++] = CRUSH_ITEM_NONE;
2789             } else {
2790                 temp->osds[temp->size++] = pg->pg_temp.osds[i];
2791             }
2792         }
2793 
2794         /* apply pg_temp's primary */
2795         for (i = 0; i < temp->size; i++) {
2796             if (temp->osds[i] != CRUSH_ITEM_NONE) {
2797                 temp->primary = temp->osds[i];
2798                 break;
2799             }
2800         }
2801     }
2802 
2803     /* primary_temp? */
2804     pg = lookup_pg_mapping(&osdmap->primary_temp, pgid);
2805     if (pg)
2806         temp->primary = pg->primary_temp.osd;
2807 }
2808 
2809 /*
2810  * Map a PG to its acting set as well as its up set.
2811  *
2812  * Acting set is used for data mapping purposes, while up set can be
2813  * recorded for detecting interval changes and deciding whether to
2814  * resend a request.
2815  */
2816 void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
2817                    struct ceph_pg_pool_info *pi,
2818                    const struct ceph_pg *raw_pgid,
2819                    struct ceph_osds *up,
2820                    struct ceph_osds *acting)
2821 {
2822     struct ceph_pg pgid;
2823     u32 pps;
2824 
2825     WARN_ON(pi->id != raw_pgid->pool);
2826     raw_pg_to_pg(pi, raw_pgid, &pgid);
2827 
2828     pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps);
2829     apply_upmap(osdmap, &pgid, up);
2830     raw_to_up_osds(osdmap, pi, up);
2831     apply_primary_affinity(osdmap, pi, pps, up);
2832     get_temp_osds(osdmap, pi, &pgid, acting);
2833     if (!acting->size) {
2834         memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0]));
2835         acting->size = up->size;
2836         if (acting->primary == -1)
2837             acting->primary = up->primary;
2838     }
2839     WARN_ON(!osds_valid(up) || !osds_valid(acting));
2840 }
2841 
2842 bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap,
2843                   struct ceph_pg_pool_info *pi,
2844                   const struct ceph_pg *raw_pgid,
2845                   struct ceph_spg *spgid)
2846 {
2847     struct ceph_pg pgid;
2848     struct ceph_osds up, acting;
2849     int i;
2850 
2851     WARN_ON(pi->id != raw_pgid->pool);
2852     raw_pg_to_pg(pi, raw_pgid, &pgid);
2853 
2854     if (ceph_can_shift_osds(pi)) {
2855         spgid->pgid = pgid; /* struct */
2856         spgid->shard = CEPH_SPG_NOSHARD;
2857         return true;
2858     }
2859 
2860     ceph_pg_to_up_acting_osds(osdmap, pi, &pgid, &up, &acting);
2861     for (i = 0; i < acting.size; i++) {
2862         if (acting.osds[i] == acting.primary) {
2863             spgid->pgid = pgid; /* struct */
2864             spgid->shard = i;
2865             return true;
2866         }
2867     }
2868 
2869     return false;
2870 }
2871 
2872 /*
2873  * Return acting primary for given PG, or -1 if none.
2874  */
2875 int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
2876                   const struct ceph_pg *raw_pgid)
2877 {
2878     struct ceph_pg_pool_info *pi;
2879     struct ceph_osds up, acting;
2880 
2881     pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool);
2882     if (!pi)
2883         return -1;
2884 
2885     ceph_pg_to_up_acting_osds(osdmap, pi, raw_pgid, &up, &acting);
2886     return acting.primary;
2887 }
2888 EXPORT_SYMBOL(ceph_pg_to_acting_primary);
2889 
2890 static struct crush_loc_node *alloc_crush_loc(size_t type_name_len,
2891                           size_t name_len)
2892 {
2893     struct crush_loc_node *loc;
2894 
2895     loc = kmalloc(sizeof(*loc) + type_name_len + name_len + 2, GFP_NOIO);
2896     if (!loc)
2897         return NULL;
2898 
2899     RB_CLEAR_NODE(&loc->cl_node);
2900     return loc;
2901 }
2902 
2903 static void free_crush_loc(struct crush_loc_node *loc)
2904 {
2905     WARN_ON(!RB_EMPTY_NODE(&loc->cl_node));
2906 
2907     kfree(loc);
2908 }
2909 
2910 static int crush_loc_compare(const struct crush_loc *loc1,
2911                  const struct crush_loc *loc2)
2912 {
2913     return strcmp(loc1->cl_type_name, loc2->cl_type_name) ?:
2914            strcmp(loc1->cl_name, loc2->cl_name);
2915 }
2916 
2917 DEFINE_RB_FUNCS2(crush_loc, struct crush_loc_node, cl_loc, crush_loc_compare,
2918          RB_BYPTR, const struct crush_loc *, cl_node)
2919 
2920 /*
2921  * Parses a set of <bucket type name>':'<bucket name> pairs separated
2922  * by '|', e.g. "rack:foo1|rack:foo2|datacenter:bar".
2923  *
2924  * Note that @crush_location is modified by strsep().
2925  */
2926 int ceph_parse_crush_location(char *crush_location, struct rb_root *locs)
2927 {
2928     struct crush_loc_node *loc;
2929     const char *type_name, *name, *colon;
2930     size_t type_name_len, name_len;
2931 
2932     dout("%s '%s'\n", __func__, crush_location);
2933     while ((type_name = strsep(&crush_location, "|"))) {
2934         colon = strchr(type_name, ':');
2935         if (!colon)
2936             return -EINVAL;
2937 
2938         type_name_len = colon - type_name;
2939         if (type_name_len == 0)
2940             return -EINVAL;
2941 
2942         name = colon + 1;
2943         name_len = strlen(name);
2944         if (name_len == 0)
2945             return -EINVAL;
2946 
2947         loc = alloc_crush_loc(type_name_len, name_len);
2948         if (!loc)
2949             return -ENOMEM;
2950 
2951         loc->cl_loc.cl_type_name = loc->cl_data;
2952         memcpy(loc->cl_loc.cl_type_name, type_name, type_name_len);
2953         loc->cl_loc.cl_type_name[type_name_len] = '\0';
2954 
2955         loc->cl_loc.cl_name = loc->cl_data + type_name_len + 1;
2956         memcpy(loc->cl_loc.cl_name, name, name_len);
2957         loc->cl_loc.cl_name[name_len] = '\0';
2958 
2959         if (!__insert_crush_loc(locs, loc)) {
2960             free_crush_loc(loc);
2961             return -EEXIST;
2962         }
2963 
2964         dout("%s type_name '%s' name '%s'\n", __func__,
2965              loc->cl_loc.cl_type_name, loc->cl_loc.cl_name);
2966     }
2967 
2968     return 0;
2969 }
2970 
2971 int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2)
2972 {
2973     struct rb_node *n1 = rb_first(locs1);
2974     struct rb_node *n2 = rb_first(locs2);
2975     int ret;
2976 
2977     for ( ; n1 && n2; n1 = rb_next(n1), n2 = rb_next(n2)) {
2978         struct crush_loc_node *loc1 =
2979             rb_entry(n1, struct crush_loc_node, cl_node);
2980         struct crush_loc_node *loc2 =
2981             rb_entry(n2, struct crush_loc_node, cl_node);
2982 
2983         ret = crush_loc_compare(&loc1->cl_loc, &loc2->cl_loc);
2984         if (ret)
2985             return ret;
2986     }
2987 
2988     if (!n1 && n2)
2989         return -1;
2990     if (n1 && !n2)
2991         return 1;
2992     return 0;
2993 }
2994 
2995 void ceph_clear_crush_locs(struct rb_root *locs)
2996 {
2997     while (!RB_EMPTY_ROOT(locs)) {
2998         struct crush_loc_node *loc =
2999             rb_entry(rb_first(locs), struct crush_loc_node, cl_node);
3000 
3001         erase_crush_loc(locs, loc);
3002         free_crush_loc(loc);
3003     }
3004 }
3005 
3006 /*
3007  * [a-zA-Z0-9-_.]+
3008  */
3009 static bool is_valid_crush_name(const char *name)
3010 {
3011     do {
3012         if (!('a' <= *name && *name <= 'z') &&
3013             !('A' <= *name && *name <= 'Z') &&
3014             !('0' <= *name && *name <= '9') &&
3015             *name != '-' && *name != '_' && *name != '.')
3016             return false;
3017     } while (*++name != '\0');
3018 
3019     return true;
3020 }
3021 
3022 /*
3023  * Gets the parent of an item.  Returns its id (<0 because the
3024  * parent is always a bucket), type id (>0 for the same reason,
3025  * via @parent_type_id) and location (via @parent_loc).  If no
3026  * parent, returns 0.
3027  *
3028  * Does a linear search, as there are no parent pointers of any
3029  * kind.  Note that the result is ambiguous for items that occur
3030  * multiple times in the map.
3031  */
3032 static int get_immediate_parent(struct crush_map *c, int id,
3033                 u16 *parent_type_id,
3034                 struct crush_loc *parent_loc)
3035 {
3036     struct crush_bucket *b;
3037     struct crush_name_node *type_cn, *cn;
3038     int i, j;
3039 
3040     for (i = 0; i < c->max_buckets; i++) {
3041         b = c->buckets[i];
3042         if (!b)
3043             continue;
3044 
3045         /* ignore per-class shadow hierarchy */
3046         cn = lookup_crush_name(&c->names, b->id);
3047         if (!cn || !is_valid_crush_name(cn->cn_name))
3048             continue;
3049 
3050         for (j = 0; j < b->size; j++) {
3051             if (b->items[j] != id)
3052                 continue;
3053 
3054             *parent_type_id = b->type;
3055             type_cn = lookup_crush_name(&c->type_names, b->type);
3056             parent_loc->cl_type_name = type_cn->cn_name;
3057             parent_loc->cl_name = cn->cn_name;
3058             return b->id;
3059         }
3060     }
3061 
3062     return 0;  /* no parent */
3063 }
3064 
3065 /*
3066  * Calculates the locality/distance from an item to a client
3067  * location expressed in terms of CRUSH hierarchy as a set of
3068  * (bucket type name, bucket name) pairs.  Specifically, looks
3069  * for the lowest-valued bucket type for which the location of
3070  * @id matches one of the locations in @locs, so for standard
3071  * bucket types (host = 1, rack = 3, datacenter = 8, zone = 9)
3072  * a matching host is closer than a matching rack and a matching
3073  * data center is closer than a matching zone.
3074  *
3075  * Specifying multiple locations (a "multipath" location) such
3076  * as "rack=foo1 rack=foo2 datacenter=bar" is allowed -- @locs
3077  * is a multimap.  The locality will be:
3078  *
3079  * - 3 for OSDs in racks foo1 and foo2
3080  * - 8 for OSDs in data center bar
3081  * - -1 for all other OSDs
3082  *
3083  * The lowest possible bucket type is 1, so the best locality
3084  * for an OSD is 1 (i.e. a matching host).  Locality 0 would be
3085  * the OSD itself.
3086  */
3087 int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id,
3088                 struct rb_root *locs)
3089 {
3090     struct crush_loc loc;
3091     u16 type_id;
3092 
3093     /*
3094      * Instead of repeated get_immediate_parent() calls,
3095      * the location of @id could be obtained with a single
3096      * depth-first traversal.
3097      */
3098     for (;;) {
3099         id = get_immediate_parent(osdmap->crush, id, &type_id, &loc);
3100         if (id >= 0)
3101             return -1;  /* not local */
3102 
3103         if (lookup_crush_loc(locs, &loc))
3104             return type_id;
3105     }
3106 }