Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0
0002 #include <linux/ceph/ceph_debug.h>
0003 
0004 #include <linux/module.h>
0005 #include <linux/types.h>
0006 #include <linux/slab.h>
0007 #include <linux/random.h>
0008 #include <linux/sched.h>
0009 
0010 #include <linux/ceph/ceph_features.h>
0011 #include <linux/ceph/mon_client.h>
0012 #include <linux/ceph/libceph.h>
0013 #include <linux/ceph/debugfs.h>
0014 #include <linux/ceph/decode.h>
0015 #include <linux/ceph/auth.h>
0016 
0017 /*
0018  * Interact with Ceph monitor cluster.  Handle requests for new map
0019  * versions, and periodically resend as needed.  Also implement
0020  * statfs() and umount().
0021  *
0022  * A small cluster of Ceph "monitors" are responsible for managing critical
0023  * cluster configuration and state information.  An odd number (e.g., 3, 5)
0024  * of cmon daemons use a modified version of the Paxos part-time parliament
0025  * algorithm to manage the MDS map (mds cluster membership), OSD map, and
0026  * list of clients who have mounted the file system.
0027  *
0028  * We maintain an open, active session with a monitor at all times in order to
0029  * receive timely MDSMap updates.  We periodically send a keepalive byte on the
0030  * TCP socket to ensure we detect a failure.  If the connection does break, we
0031  * randomly hunt for a new monitor.  Once the connection is reestablished, we
0032  * resend any outstanding requests.
0033  */
0034 
0035 static const struct ceph_connection_operations mon_con_ops;
0036 
0037 static int __validate_auth(struct ceph_mon_client *monc);
0038 
0039 static int decode_mon_info(void **p, void *end, bool msgr2,
0040                struct ceph_entity_addr *addr)
0041 {
0042     void *mon_info_end;
0043     u32 struct_len;
0044     u8 struct_v;
0045     int ret;
0046 
0047     ret = ceph_start_decoding(p, end, 1, "mon_info_t", &struct_v,
0048                   &struct_len);
0049     if (ret)
0050         return ret;
0051 
0052     mon_info_end = *p + struct_len;
0053     ceph_decode_skip_string(p, end, e_inval);  /* skip mon name */
0054     ret = ceph_decode_entity_addrvec(p, end, msgr2, addr);
0055     if (ret)
0056         return ret;
0057 
0058     *p = mon_info_end;
0059     return 0;
0060 
0061 e_inval:
0062     return -EINVAL;
0063 }
0064 
0065 /*
0066  * Decode a monmap blob (e.g., during mount).
0067  *
0068  * Assume MonMap v3 (i.e. encoding with MONNAMES and MONENC).
0069  */
0070 static struct ceph_monmap *ceph_monmap_decode(void **p, void *end, bool msgr2)
0071 {
0072     struct ceph_monmap *monmap = NULL;
0073     struct ceph_fsid fsid;
0074     u32 struct_len;
0075     int blob_len;
0076     int num_mon;
0077     u8 struct_v;
0078     u32 epoch;
0079     int ret;
0080     int i;
0081 
0082     ceph_decode_32_safe(p, end, blob_len, e_inval);
0083     ceph_decode_need(p, end, blob_len, e_inval);
0084 
0085     ret = ceph_start_decoding(p, end, 6, "monmap", &struct_v, &struct_len);
0086     if (ret)
0087         goto fail;
0088 
0089     dout("%s struct_v %d\n", __func__, struct_v);
0090     ceph_decode_copy_safe(p, end, &fsid, sizeof(fsid), e_inval);
0091     ceph_decode_32_safe(p, end, epoch, e_inval);
0092     if (struct_v >= 6) {
0093         u32 feat_struct_len;
0094         u8 feat_struct_v;
0095 
0096         *p += sizeof(struct ceph_timespec);  /* skip last_changed */
0097         *p += sizeof(struct ceph_timespec);  /* skip created */
0098 
0099         ret = ceph_start_decoding(p, end, 1, "mon_feature_t",
0100                       &feat_struct_v, &feat_struct_len);
0101         if (ret)
0102             goto fail;
0103 
0104         *p += feat_struct_len;  /* skip persistent_features */
0105 
0106         ret = ceph_start_decoding(p, end, 1, "mon_feature_t",
0107                       &feat_struct_v, &feat_struct_len);
0108         if (ret)
0109             goto fail;
0110 
0111         *p += feat_struct_len;  /* skip optional_features */
0112     }
0113     ceph_decode_32_safe(p, end, num_mon, e_inval);
0114 
0115     dout("%s fsid %pU epoch %u num_mon %d\n", __func__, &fsid, epoch,
0116          num_mon);
0117     if (num_mon > CEPH_MAX_MON)
0118         goto e_inval;
0119 
0120     monmap = kmalloc(struct_size(monmap, mon_inst, num_mon), GFP_NOIO);
0121     if (!monmap) {
0122         ret = -ENOMEM;
0123         goto fail;
0124     }
0125     monmap->fsid = fsid;
0126     monmap->epoch = epoch;
0127     monmap->num_mon = num_mon;
0128 
0129     /* legacy_mon_addr map or mon_info map */
0130     for (i = 0; i < num_mon; i++) {
0131         struct ceph_entity_inst *inst = &monmap->mon_inst[i];
0132 
0133         ceph_decode_skip_string(p, end, e_inval);  /* skip mon name */
0134         inst->name.type = CEPH_ENTITY_TYPE_MON;
0135         inst->name.num = cpu_to_le64(i);
0136 
0137         if (struct_v >= 6)
0138             ret = decode_mon_info(p, end, msgr2, &inst->addr);
0139         else
0140             ret = ceph_decode_entity_addr(p, end, &inst->addr);
0141         if (ret)
0142             goto fail;
0143 
0144         dout("%s mon%d addr %s\n", __func__, i,
0145              ceph_pr_addr(&inst->addr));
0146     }
0147 
0148     return monmap;
0149 
0150 e_inval:
0151     ret = -EINVAL;
0152 fail:
0153     kfree(monmap);
0154     return ERR_PTR(ret);
0155 }
0156 
0157 /*
0158  * return true if *addr is included in the monmap.
0159  */
0160 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
0161 {
0162     int i;
0163 
0164     for (i = 0; i < m->num_mon; i++) {
0165         if (ceph_addr_equal_no_type(addr, &m->mon_inst[i].addr))
0166             return 1;
0167     }
0168 
0169     return 0;
0170 }
0171 
0172 /*
0173  * Send an auth request.
0174  */
0175 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
0176 {
0177     monc->pending_auth = 1;
0178     monc->m_auth->front.iov_len = len;
0179     monc->m_auth->hdr.front_len = cpu_to_le32(len);
0180     ceph_msg_revoke(monc->m_auth);
0181     ceph_msg_get(monc->m_auth);  /* keep our ref */
0182     ceph_con_send(&monc->con, monc->m_auth);
0183 }
0184 
0185 /*
0186  * Close monitor session, if any.
0187  */
0188 static void __close_session(struct ceph_mon_client *monc)
0189 {
0190     dout("__close_session closing mon%d\n", monc->cur_mon);
0191     ceph_msg_revoke(monc->m_auth);
0192     ceph_msg_revoke_incoming(monc->m_auth_reply);
0193     ceph_msg_revoke(monc->m_subscribe);
0194     ceph_msg_revoke_incoming(monc->m_subscribe_ack);
0195     ceph_con_close(&monc->con);
0196 
0197     monc->pending_auth = 0;
0198     ceph_auth_reset(monc->auth);
0199 }
0200 
0201 /*
0202  * Pick a new monitor at random and set cur_mon.  If we are repicking
0203  * (i.e. cur_mon is already set), be sure to pick a different one.
0204  */
0205 static void pick_new_mon(struct ceph_mon_client *monc)
0206 {
0207     int old_mon = monc->cur_mon;
0208 
0209     BUG_ON(monc->monmap->num_mon < 1);
0210 
0211     if (monc->monmap->num_mon == 1) {
0212         monc->cur_mon = 0;
0213     } else {
0214         int max = monc->monmap->num_mon;
0215         int o = -1;
0216         int n;
0217 
0218         if (monc->cur_mon >= 0) {
0219             if (monc->cur_mon < monc->monmap->num_mon)
0220                 o = monc->cur_mon;
0221             if (o >= 0)
0222                 max--;
0223         }
0224 
0225         n = prandom_u32() % max;
0226         if (o >= 0 && n >= o)
0227             n++;
0228 
0229         monc->cur_mon = n;
0230     }
0231 
0232     dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
0233          monc->cur_mon, monc->monmap->num_mon);
0234 }
0235 
0236 /*
0237  * Open a session with a new monitor.
0238  */
0239 static void __open_session(struct ceph_mon_client *monc)
0240 {
0241     int ret;
0242 
0243     pick_new_mon(monc);
0244 
0245     monc->hunting = true;
0246     if (monc->had_a_connection) {
0247         monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
0248         if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
0249             monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
0250     }
0251 
0252     monc->sub_renew_after = jiffies; /* i.e., expired */
0253     monc->sub_renew_sent = 0;
0254 
0255     dout("%s opening mon%d\n", __func__, monc->cur_mon);
0256     ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
0257               &monc->monmap->mon_inst[monc->cur_mon].addr);
0258 
0259     /*
0260      * Queue a keepalive to ensure that in case of an early fault
0261      * the messenger doesn't put us into STANDBY state and instead
0262      * retries.  This also ensures that our timestamp is valid by
0263      * the time we finish hunting and delayed_work() checks it.
0264      */
0265     ceph_con_keepalive(&monc->con);
0266     if (ceph_msgr2(monc->client)) {
0267         monc->pending_auth = 1;
0268         return;
0269     }
0270 
0271     /* initiate authentication handshake */
0272     ret = ceph_auth_build_hello(monc->auth,
0273                     monc->m_auth->front.iov_base,
0274                     monc->m_auth->front_alloc_len);
0275     BUG_ON(ret <= 0);
0276     __send_prepared_auth_request(monc, ret);
0277 }
0278 
0279 static void reopen_session(struct ceph_mon_client *monc)
0280 {
0281     if (!monc->hunting)
0282         pr_info("mon%d %s session lost, hunting for new mon\n",
0283             monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr));
0284 
0285     __close_session(monc);
0286     __open_session(monc);
0287 }
0288 
0289 void ceph_monc_reopen_session(struct ceph_mon_client *monc)
0290 {
0291     mutex_lock(&monc->mutex);
0292     reopen_session(monc);
0293     mutex_unlock(&monc->mutex);
0294 }
0295 
0296 static void un_backoff(struct ceph_mon_client *monc)
0297 {
0298     monc->hunt_mult /= 2; /* reduce by 50% */
0299     if (monc->hunt_mult < 1)
0300         monc->hunt_mult = 1;
0301     dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult);
0302 }
0303 
0304 /*
0305  * Reschedule delayed work timer.
0306  */
0307 static void __schedule_delayed(struct ceph_mon_client *monc)
0308 {
0309     unsigned long delay;
0310 
0311     if (monc->hunting)
0312         delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
0313     else
0314         delay = CEPH_MONC_PING_INTERVAL;
0315 
0316     dout("__schedule_delayed after %lu\n", delay);
0317     mod_delayed_work(system_wq, &monc->delayed_work,
0318              round_jiffies_relative(delay));
0319 }
0320 
0321 const char *ceph_sub_str[] = {
0322     [CEPH_SUB_MONMAP] = "monmap",
0323     [CEPH_SUB_OSDMAP] = "osdmap",
0324     [CEPH_SUB_FSMAP]  = "fsmap.user",
0325     [CEPH_SUB_MDSMAP] = "mdsmap",
0326 };
0327 
0328 /*
0329  * Send subscribe request for one or more maps, according to
0330  * monc->subs.
0331  */
0332 static void __send_subscribe(struct ceph_mon_client *monc)
0333 {
0334     struct ceph_msg *msg = monc->m_subscribe;
0335     void *p = msg->front.iov_base;
0336     void *const end = p + msg->front_alloc_len;
0337     int num = 0;
0338     int i;
0339 
0340     dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
0341 
0342     BUG_ON(monc->cur_mon < 0);
0343 
0344     if (!monc->sub_renew_sent)
0345         monc->sub_renew_sent = jiffies | 1; /* never 0 */
0346 
0347     msg->hdr.version = cpu_to_le16(2);
0348 
0349     for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
0350         if (monc->subs[i].want)
0351             num++;
0352     }
0353     BUG_ON(num < 1); /* monmap sub is always there */
0354     ceph_encode_32(&p, num);
0355     for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
0356         char buf[32];
0357         int len;
0358 
0359         if (!monc->subs[i].want)
0360             continue;
0361 
0362         len = sprintf(buf, "%s", ceph_sub_str[i]);
0363         if (i == CEPH_SUB_MDSMAP &&
0364             monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
0365             len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
0366 
0367         dout("%s %s start %llu flags 0x%x\n", __func__, buf,
0368              le64_to_cpu(monc->subs[i].item.start),
0369              monc->subs[i].item.flags);
0370         ceph_encode_string(&p, end, buf, len);
0371         memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
0372         p += sizeof(monc->subs[i].item);
0373     }
0374 
0375     BUG_ON(p > end);
0376     msg->front.iov_len = p - msg->front.iov_base;
0377     msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
0378     ceph_msg_revoke(msg);
0379     ceph_con_send(&monc->con, ceph_msg_get(msg));
0380 }
0381 
0382 static void handle_subscribe_ack(struct ceph_mon_client *monc,
0383                  struct ceph_msg *msg)
0384 {
0385     unsigned int seconds;
0386     struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
0387 
0388     if (msg->front.iov_len < sizeof(*h))
0389         goto bad;
0390     seconds = le32_to_cpu(h->duration);
0391 
0392     mutex_lock(&monc->mutex);
0393     if (monc->sub_renew_sent) {
0394         /*
0395          * This is only needed for legacy (infernalis or older)
0396          * MONs -- see delayed_work().
0397          */
0398         monc->sub_renew_after = monc->sub_renew_sent +
0399                         (seconds >> 1) * HZ - 1;
0400         dout("%s sent %lu duration %d renew after %lu\n", __func__,
0401              monc->sub_renew_sent, seconds, monc->sub_renew_after);
0402         monc->sub_renew_sent = 0;
0403     } else {
0404         dout("%s sent %lu renew after %lu, ignoring\n", __func__,
0405              monc->sub_renew_sent, monc->sub_renew_after);
0406     }
0407     mutex_unlock(&monc->mutex);
0408     return;
0409 bad:
0410     pr_err("got corrupt subscribe-ack msg\n");
0411     ceph_msg_dump(msg);
0412 }
0413 
0414 /*
0415  * Register interest in a map
0416  *
0417  * @sub: one of CEPH_SUB_*
0418  * @epoch: X for "every map since X", or 0 for "just the latest"
0419  */
0420 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
0421                  u32 epoch, bool continuous)
0422 {
0423     __le64 start = cpu_to_le64(epoch);
0424     u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
0425 
0426     dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
0427          epoch, continuous);
0428 
0429     if (monc->subs[sub].want &&
0430         monc->subs[sub].item.start == start &&
0431         monc->subs[sub].item.flags == flags)
0432         return false;
0433 
0434     monc->subs[sub].item.start = start;
0435     monc->subs[sub].item.flags = flags;
0436     monc->subs[sub].want = true;
0437 
0438     return true;
0439 }
0440 
0441 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
0442             bool continuous)
0443 {
0444     bool need_request;
0445 
0446     mutex_lock(&monc->mutex);
0447     need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
0448     mutex_unlock(&monc->mutex);
0449 
0450     return need_request;
0451 }
0452 EXPORT_SYMBOL(ceph_monc_want_map);
0453 
0454 /*
0455  * Keep track of which maps we have
0456  *
0457  * @sub: one of CEPH_SUB_*
0458  */
0459 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
0460                 u32 epoch)
0461 {
0462     dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
0463 
0464     if (monc->subs[sub].want) {
0465         if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
0466             monc->subs[sub].want = false;
0467         else
0468             monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
0469     }
0470 
0471     monc->subs[sub].have = epoch;
0472 }
0473 
0474 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
0475 {
0476     mutex_lock(&monc->mutex);
0477     __ceph_monc_got_map(monc, sub, epoch);
0478     mutex_unlock(&monc->mutex);
0479 }
0480 EXPORT_SYMBOL(ceph_monc_got_map);
0481 
0482 void ceph_monc_renew_subs(struct ceph_mon_client *monc)
0483 {
0484     mutex_lock(&monc->mutex);
0485     __send_subscribe(monc);
0486     mutex_unlock(&monc->mutex);
0487 }
0488 EXPORT_SYMBOL(ceph_monc_renew_subs);
0489 
0490 /*
0491  * Wait for an osdmap with a given epoch.
0492  *
0493  * @epoch: epoch to wait for
0494  * @timeout: in jiffies, 0 means "wait forever"
0495  */
0496 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
0497               unsigned long timeout)
0498 {
0499     unsigned long started = jiffies;
0500     long ret;
0501 
0502     mutex_lock(&monc->mutex);
0503     while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
0504         mutex_unlock(&monc->mutex);
0505 
0506         if (timeout && time_after_eq(jiffies, started + timeout))
0507             return -ETIMEDOUT;
0508 
0509         ret = wait_event_interruptible_timeout(monc->client->auth_wq,
0510                      monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
0511                      ceph_timeout_jiffies(timeout));
0512         if (ret < 0)
0513             return ret;
0514 
0515         mutex_lock(&monc->mutex);
0516     }
0517 
0518     mutex_unlock(&monc->mutex);
0519     return 0;
0520 }
0521 EXPORT_SYMBOL(ceph_monc_wait_osdmap);
0522 
0523 /*
0524  * Open a session with a random monitor.  Request monmap and osdmap,
0525  * which are waited upon in __ceph_open_session().
0526  */
0527 int ceph_monc_open_session(struct ceph_mon_client *monc)
0528 {
0529     mutex_lock(&monc->mutex);
0530     __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
0531     __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
0532     __open_session(monc);
0533     __schedule_delayed(monc);
0534     mutex_unlock(&monc->mutex);
0535     return 0;
0536 }
0537 EXPORT_SYMBOL(ceph_monc_open_session);
0538 
0539 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
0540                  struct ceph_msg *msg)
0541 {
0542     struct ceph_client *client = monc->client;
0543     struct ceph_monmap *monmap;
0544     void *p, *end;
0545 
0546     mutex_lock(&monc->mutex);
0547 
0548     dout("handle_monmap\n");
0549     p = msg->front.iov_base;
0550     end = p + msg->front.iov_len;
0551 
0552     monmap = ceph_monmap_decode(&p, end, ceph_msgr2(client));
0553     if (IS_ERR(monmap)) {
0554         pr_err("problem decoding monmap, %d\n",
0555                (int)PTR_ERR(monmap));
0556         ceph_msg_dump(msg);
0557         goto out;
0558     }
0559 
0560     if (ceph_check_fsid(client, &monmap->fsid) < 0) {
0561         kfree(monmap);
0562         goto out;
0563     }
0564 
0565     kfree(monc->monmap);
0566     monc->monmap = monmap;
0567 
0568     __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
0569     client->have_fsid = true;
0570 
0571 out:
0572     mutex_unlock(&monc->mutex);
0573     wake_up_all(&client->auth_wq);
0574 }
0575 
0576 /*
0577  * generic requests (currently statfs, mon_get_version)
0578  */
0579 DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
0580 
0581 static void release_generic_request(struct kref *kref)
0582 {
0583     struct ceph_mon_generic_request *req =
0584         container_of(kref, struct ceph_mon_generic_request, kref);
0585 
0586     dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
0587          req->reply);
0588     WARN_ON(!RB_EMPTY_NODE(&req->node));
0589 
0590     if (req->reply)
0591         ceph_msg_put(req->reply);
0592     if (req->request)
0593         ceph_msg_put(req->request);
0594 
0595     kfree(req);
0596 }
0597 
0598 static void put_generic_request(struct ceph_mon_generic_request *req)
0599 {
0600     if (req)
0601         kref_put(&req->kref, release_generic_request);
0602 }
0603 
0604 static void get_generic_request(struct ceph_mon_generic_request *req)
0605 {
0606     kref_get(&req->kref);
0607 }
0608 
0609 static struct ceph_mon_generic_request *
0610 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
0611 {
0612     struct ceph_mon_generic_request *req;
0613 
0614     req = kzalloc(sizeof(*req), gfp);
0615     if (!req)
0616         return NULL;
0617 
0618     req->monc = monc;
0619     kref_init(&req->kref);
0620     RB_CLEAR_NODE(&req->node);
0621     init_completion(&req->completion);
0622 
0623     dout("%s greq %p\n", __func__, req);
0624     return req;
0625 }
0626 
0627 static void register_generic_request(struct ceph_mon_generic_request *req)
0628 {
0629     struct ceph_mon_client *monc = req->monc;
0630 
0631     WARN_ON(req->tid);
0632 
0633     get_generic_request(req);
0634     req->tid = ++monc->last_tid;
0635     insert_generic_request(&monc->generic_request_tree, req);
0636 }
0637 
0638 static void send_generic_request(struct ceph_mon_client *monc,
0639                  struct ceph_mon_generic_request *req)
0640 {
0641     WARN_ON(!req->tid);
0642 
0643     dout("%s greq %p tid %llu\n", __func__, req, req->tid);
0644     req->request->hdr.tid = cpu_to_le64(req->tid);
0645     ceph_con_send(&monc->con, ceph_msg_get(req->request));
0646 }
0647 
0648 static void __finish_generic_request(struct ceph_mon_generic_request *req)
0649 {
0650     struct ceph_mon_client *monc = req->monc;
0651 
0652     dout("%s greq %p tid %llu\n", __func__, req, req->tid);
0653     erase_generic_request(&monc->generic_request_tree, req);
0654 
0655     ceph_msg_revoke(req->request);
0656     ceph_msg_revoke_incoming(req->reply);
0657 }
0658 
0659 static void finish_generic_request(struct ceph_mon_generic_request *req)
0660 {
0661     __finish_generic_request(req);
0662     put_generic_request(req);
0663 }
0664 
0665 static void complete_generic_request(struct ceph_mon_generic_request *req)
0666 {
0667     if (req->complete_cb)
0668         req->complete_cb(req);
0669     else
0670         complete_all(&req->completion);
0671     put_generic_request(req);
0672 }
0673 
0674 static void cancel_generic_request(struct ceph_mon_generic_request *req)
0675 {
0676     struct ceph_mon_client *monc = req->monc;
0677     struct ceph_mon_generic_request *lookup_req;
0678 
0679     dout("%s greq %p tid %llu\n", __func__, req, req->tid);
0680 
0681     mutex_lock(&monc->mutex);
0682     lookup_req = lookup_generic_request(&monc->generic_request_tree,
0683                         req->tid);
0684     if (lookup_req) {
0685         WARN_ON(lookup_req != req);
0686         finish_generic_request(req);
0687     }
0688 
0689     mutex_unlock(&monc->mutex);
0690 }
0691 
0692 static int wait_generic_request(struct ceph_mon_generic_request *req)
0693 {
0694     int ret;
0695 
0696     dout("%s greq %p tid %llu\n", __func__, req, req->tid);
0697     ret = wait_for_completion_interruptible(&req->completion);
0698     if (ret)
0699         cancel_generic_request(req);
0700     else
0701         ret = req->result; /* completed */
0702 
0703     return ret;
0704 }
0705 
0706 static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
0707                      struct ceph_msg_header *hdr,
0708                      int *skip)
0709 {
0710     struct ceph_mon_client *monc = con->private;
0711     struct ceph_mon_generic_request *req;
0712     u64 tid = le64_to_cpu(hdr->tid);
0713     struct ceph_msg *m;
0714 
0715     mutex_lock(&monc->mutex);
0716     req = lookup_generic_request(&monc->generic_request_tree, tid);
0717     if (!req) {
0718         dout("get_generic_reply %lld dne\n", tid);
0719         *skip = 1;
0720         m = NULL;
0721     } else {
0722         dout("get_generic_reply %lld got %p\n", tid, req->reply);
0723         *skip = 0;
0724         m = ceph_msg_get(req->reply);
0725         /*
0726          * we don't need to track the connection reading into
0727          * this reply because we only have one open connection
0728          * at a time, ever.
0729          */
0730     }
0731     mutex_unlock(&monc->mutex);
0732     return m;
0733 }
0734 
0735 /*
0736  * statfs
0737  */
0738 static void handle_statfs_reply(struct ceph_mon_client *monc,
0739                 struct ceph_msg *msg)
0740 {
0741     struct ceph_mon_generic_request *req;
0742     struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
0743     u64 tid = le64_to_cpu(msg->hdr.tid);
0744 
0745     dout("%s msg %p tid %llu\n", __func__, msg, tid);
0746 
0747     if (msg->front.iov_len != sizeof(*reply))
0748         goto bad;
0749 
0750     mutex_lock(&monc->mutex);
0751     req = lookup_generic_request(&monc->generic_request_tree, tid);
0752     if (!req) {
0753         mutex_unlock(&monc->mutex);
0754         return;
0755     }
0756 
0757     req->result = 0;
0758     *req->u.st = reply->st; /* struct */
0759     __finish_generic_request(req);
0760     mutex_unlock(&monc->mutex);
0761 
0762     complete_generic_request(req);
0763     return;
0764 
0765 bad:
0766     pr_err("corrupt statfs reply, tid %llu\n", tid);
0767     ceph_msg_dump(msg);
0768 }
0769 
0770 /*
0771  * Do a synchronous statfs().
0772  */
0773 int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool,
0774             struct ceph_statfs *buf)
0775 {
0776     struct ceph_mon_generic_request *req;
0777     struct ceph_mon_statfs *h;
0778     int ret = -ENOMEM;
0779 
0780     req = alloc_generic_request(monc, GFP_NOFS);
0781     if (!req)
0782         goto out;
0783 
0784     req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
0785                     true);
0786     if (!req->request)
0787         goto out;
0788 
0789     req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
0790     if (!req->reply)
0791         goto out;
0792 
0793     req->u.st = buf;
0794     req->request->hdr.version = cpu_to_le16(2);
0795 
0796     mutex_lock(&monc->mutex);
0797     register_generic_request(req);
0798     /* fill out request */
0799     h = req->request->front.iov_base;
0800     h->monhdr.have_version = 0;
0801     h->monhdr.session_mon = cpu_to_le16(-1);
0802     h->monhdr.session_mon_tid = 0;
0803     h->fsid = monc->monmap->fsid;
0804     h->contains_data_pool = (data_pool != CEPH_NOPOOL);
0805     h->data_pool = cpu_to_le64(data_pool);
0806     send_generic_request(monc, req);
0807     mutex_unlock(&monc->mutex);
0808 
0809     ret = wait_generic_request(req);
0810 out:
0811     put_generic_request(req);
0812     return ret;
0813 }
0814 EXPORT_SYMBOL(ceph_monc_do_statfs);
0815 
0816 static void handle_get_version_reply(struct ceph_mon_client *monc,
0817                      struct ceph_msg *msg)
0818 {
0819     struct ceph_mon_generic_request *req;
0820     u64 tid = le64_to_cpu(msg->hdr.tid);
0821     void *p = msg->front.iov_base;
0822     void *end = p + msg->front_alloc_len;
0823     u64 handle;
0824 
0825     dout("%s msg %p tid %llu\n", __func__, msg, tid);
0826 
0827     ceph_decode_need(&p, end, 2*sizeof(u64), bad);
0828     handle = ceph_decode_64(&p);
0829     if (tid != 0 && tid != handle)
0830         goto bad;
0831 
0832     mutex_lock(&monc->mutex);
0833     req = lookup_generic_request(&monc->generic_request_tree, handle);
0834     if (!req) {
0835         mutex_unlock(&monc->mutex);
0836         return;
0837     }
0838 
0839     req->result = 0;
0840     req->u.newest = ceph_decode_64(&p);
0841     __finish_generic_request(req);
0842     mutex_unlock(&monc->mutex);
0843 
0844     complete_generic_request(req);
0845     return;
0846 
0847 bad:
0848     pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
0849     ceph_msg_dump(msg);
0850 }
0851 
0852 static struct ceph_mon_generic_request *
0853 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
0854             ceph_monc_callback_t cb, u64 private_data)
0855 {
0856     struct ceph_mon_generic_request *req;
0857 
0858     req = alloc_generic_request(monc, GFP_NOIO);
0859     if (!req)
0860         goto err_put_req;
0861 
0862     req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
0863                     sizeof(u64) + sizeof(u32) + strlen(what),
0864                     GFP_NOIO, true);
0865     if (!req->request)
0866         goto err_put_req;
0867 
0868     req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
0869                   true);
0870     if (!req->reply)
0871         goto err_put_req;
0872 
0873     req->complete_cb = cb;
0874     req->private_data = private_data;
0875 
0876     mutex_lock(&monc->mutex);
0877     register_generic_request(req);
0878     {
0879         void *p = req->request->front.iov_base;
0880         void *const end = p + req->request->front_alloc_len;
0881 
0882         ceph_encode_64(&p, req->tid); /* handle */
0883         ceph_encode_string(&p, end, what, strlen(what));
0884         WARN_ON(p != end);
0885     }
0886     send_generic_request(monc, req);
0887     mutex_unlock(&monc->mutex);
0888 
0889     return req;
0890 
0891 err_put_req:
0892     put_generic_request(req);
0893     return ERR_PTR(-ENOMEM);
0894 }
0895 
0896 /*
0897  * Send MMonGetVersion and wait for the reply.
0898  *
0899  * @what: one of "mdsmap", "osdmap" or "monmap"
0900  */
0901 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
0902               u64 *newest)
0903 {
0904     struct ceph_mon_generic_request *req;
0905     int ret;
0906 
0907     req = __ceph_monc_get_version(monc, what, NULL, 0);
0908     if (IS_ERR(req))
0909         return PTR_ERR(req);
0910 
0911     ret = wait_generic_request(req);
0912     if (!ret)
0913         *newest = req->u.newest;
0914 
0915     put_generic_request(req);
0916     return ret;
0917 }
0918 EXPORT_SYMBOL(ceph_monc_get_version);
0919 
0920 /*
0921  * Send MMonGetVersion,
0922  *
0923  * @what: one of "mdsmap", "osdmap" or "monmap"
0924  */
0925 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
0926                 ceph_monc_callback_t cb, u64 private_data)
0927 {
0928     struct ceph_mon_generic_request *req;
0929 
0930     req = __ceph_monc_get_version(monc, what, cb, private_data);
0931     if (IS_ERR(req))
0932         return PTR_ERR(req);
0933 
0934     put_generic_request(req);
0935     return 0;
0936 }
0937 EXPORT_SYMBOL(ceph_monc_get_version_async);
0938 
0939 static void handle_command_ack(struct ceph_mon_client *monc,
0940                    struct ceph_msg *msg)
0941 {
0942     struct ceph_mon_generic_request *req;
0943     void *p = msg->front.iov_base;
0944     void *const end = p + msg->front_alloc_len;
0945     u64 tid = le64_to_cpu(msg->hdr.tid);
0946 
0947     dout("%s msg %p tid %llu\n", __func__, msg, tid);
0948 
0949     ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
0950                                 sizeof(u32), bad);
0951     p += sizeof(struct ceph_mon_request_header);
0952 
0953     mutex_lock(&monc->mutex);
0954     req = lookup_generic_request(&monc->generic_request_tree, tid);
0955     if (!req) {
0956         mutex_unlock(&monc->mutex);
0957         return;
0958     }
0959 
0960     req->result = ceph_decode_32(&p);
0961     __finish_generic_request(req);
0962     mutex_unlock(&monc->mutex);
0963 
0964     complete_generic_request(req);
0965     return;
0966 
0967 bad:
0968     pr_err("corrupt mon_command ack, tid %llu\n", tid);
0969     ceph_msg_dump(msg);
0970 }
0971 
0972 static __printf(2, 0)
0973 int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt,
0974              va_list ap)
0975 {
0976     struct ceph_mon_generic_request *req;
0977     struct ceph_mon_command *h;
0978     int ret = -ENOMEM;
0979     int len;
0980 
0981     req = alloc_generic_request(monc, GFP_NOIO);
0982     if (!req)
0983         goto out;
0984 
0985     req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
0986     if (!req->request)
0987         goto out;
0988 
0989     req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
0990                   true);
0991     if (!req->reply)
0992         goto out;
0993 
0994     mutex_lock(&monc->mutex);
0995     register_generic_request(req);
0996     h = req->request->front.iov_base;
0997     h->monhdr.have_version = 0;
0998     h->monhdr.session_mon = cpu_to_le16(-1);
0999     h->monhdr.session_mon_tid = 0;
1000     h->fsid = monc->monmap->fsid;
1001     h->num_strs = cpu_to_le32(1);
1002     len = vsprintf(h->str, fmt, ap);
1003     h->str_len = cpu_to_le32(len);
1004     send_generic_request(monc, req);
1005     mutex_unlock(&monc->mutex);
1006 
1007     ret = wait_generic_request(req);
1008 out:
1009     put_generic_request(req);
1010     return ret;
1011 }
1012 
1013 static __printf(2, 3)
1014 int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...)
1015 {
1016     va_list ap;
1017     int ret;
1018 
1019     va_start(ap, fmt);
1020     ret = do_mon_command_vargs(monc, fmt, ap);
1021     va_end(ap);
1022     return ret;
1023 }
1024 
1025 int ceph_monc_blocklist_add(struct ceph_mon_client *monc,
1026                 struct ceph_entity_addr *client_addr)
1027 {
1028     int ret;
1029 
1030     ret = do_mon_command(monc,
1031                  "{ \"prefix\": \"osd blocklist\", \
1032                 \"blocklistop\": \"add\", \
1033                 \"addr\": \"%pISpc/%u\" }",
1034                  &client_addr->in_addr,
1035                  le32_to_cpu(client_addr->nonce));
1036     if (ret == -EINVAL) {
1037         /*
1038          * The monitor returns EINVAL on an unrecognized command.
1039          * Try the legacy command -- it is exactly the same except
1040          * for the name.
1041          */
1042         ret = do_mon_command(monc,
1043                      "{ \"prefix\": \"osd blacklist\", \
1044                     \"blacklistop\": \"add\", \
1045                     \"addr\": \"%pISpc/%u\" }",
1046                      &client_addr->in_addr,
1047                      le32_to_cpu(client_addr->nonce));
1048     }
1049     if (ret)
1050         return ret;
1051 
1052     /*
1053      * Make sure we have the osdmap that includes the blocklist
1054      * entry.  This is needed to ensure that the OSDs pick up the
1055      * new blocklist before processing any future requests from
1056      * this client.
1057      */
1058     return ceph_wait_for_latest_osdmap(monc->client, 0);
1059 }
1060 EXPORT_SYMBOL(ceph_monc_blocklist_add);
1061 
1062 /*
1063  * Resend pending generic requests.
1064  */
1065 static void __resend_generic_request(struct ceph_mon_client *monc)
1066 {
1067     struct ceph_mon_generic_request *req;
1068     struct rb_node *p;
1069 
1070     for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
1071         req = rb_entry(p, struct ceph_mon_generic_request, node);
1072         ceph_msg_revoke(req->request);
1073         ceph_msg_revoke_incoming(req->reply);
1074         ceph_con_send(&monc->con, ceph_msg_get(req->request));
1075     }
1076 }
1077 
1078 /*
1079  * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
1080  * renew/retry subscription as needed (in case it is timing out, or we
1081  * got an ENOMEM).  And keep the monitor connection alive.
1082  */
1083 static void delayed_work(struct work_struct *work)
1084 {
1085     struct ceph_mon_client *monc =
1086         container_of(work, struct ceph_mon_client, delayed_work.work);
1087 
1088     dout("monc delayed_work\n");
1089     mutex_lock(&monc->mutex);
1090     if (monc->hunting) {
1091         dout("%s continuing hunt\n", __func__);
1092         reopen_session(monc);
1093     } else {
1094         int is_auth = ceph_auth_is_authenticated(monc->auth);
1095         if (ceph_con_keepalive_expired(&monc->con,
1096                            CEPH_MONC_PING_TIMEOUT)) {
1097             dout("monc keepalive timeout\n");
1098             is_auth = 0;
1099             reopen_session(monc);
1100         }
1101 
1102         if (!monc->hunting) {
1103             ceph_con_keepalive(&monc->con);
1104             __validate_auth(monc);
1105             un_backoff(monc);
1106         }
1107 
1108         if (is_auth &&
1109             !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
1110             unsigned long now = jiffies;
1111 
1112             dout("%s renew subs? now %lu renew after %lu\n",
1113                  __func__, now, monc->sub_renew_after);
1114             if (time_after_eq(now, monc->sub_renew_after))
1115                 __send_subscribe(monc);
1116         }
1117     }
1118     __schedule_delayed(monc);
1119     mutex_unlock(&monc->mutex);
1120 }
1121 
1122 /*
1123  * On startup, we build a temporary monmap populated with the IPs
1124  * provided by mount(2).
1125  */
1126 static int build_initial_monmap(struct ceph_mon_client *monc)
1127 {
1128     __le32 my_type = ceph_msgr2(monc->client) ?
1129         CEPH_ENTITY_ADDR_TYPE_MSGR2 : CEPH_ENTITY_ADDR_TYPE_LEGACY;
1130     struct ceph_options *opt = monc->client->options;
1131     int num_mon = opt->num_mon;
1132     int i;
1133 
1134     /* build initial monmap */
1135     monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon),
1136                    GFP_KERNEL);
1137     if (!monc->monmap)
1138         return -ENOMEM;
1139 
1140     for (i = 0; i < num_mon; i++) {
1141         struct ceph_entity_inst *inst = &monc->monmap->mon_inst[i];
1142 
1143         memcpy(&inst->addr.in_addr, &opt->mon_addr[i].in_addr,
1144                sizeof(inst->addr.in_addr));
1145         inst->addr.type = my_type;
1146         inst->addr.nonce = 0;
1147         inst->name.type = CEPH_ENTITY_TYPE_MON;
1148         inst->name.num = cpu_to_le64(i);
1149     }
1150     monc->monmap->num_mon = num_mon;
1151     return 0;
1152 }
1153 
1154 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
1155 {
1156     int err;
1157 
1158     dout("init\n");
1159     memset(monc, 0, sizeof(*monc));
1160     monc->client = cl;
1161     mutex_init(&monc->mutex);
1162 
1163     err = build_initial_monmap(monc);
1164     if (err)
1165         goto out;
1166 
1167     /* connection */
1168     /* authentication */
1169     monc->auth = ceph_auth_init(cl->options->name, cl->options->key,
1170                     cl->options->con_modes);
1171     if (IS_ERR(monc->auth)) {
1172         err = PTR_ERR(monc->auth);
1173         goto out_monmap;
1174     }
1175     monc->auth->want_keys =
1176         CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
1177         CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
1178 
1179     /* msgs */
1180     err = -ENOMEM;
1181     monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
1182                      sizeof(struct ceph_mon_subscribe_ack),
1183                      GFP_KERNEL, true);
1184     if (!monc->m_subscribe_ack)
1185         goto out_auth;
1186 
1187     monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
1188                      GFP_KERNEL, true);
1189     if (!monc->m_subscribe)
1190         goto out_subscribe_ack;
1191 
1192     monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
1193                       GFP_KERNEL, true);
1194     if (!monc->m_auth_reply)
1195         goto out_subscribe;
1196 
1197     monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
1198     monc->pending_auth = 0;
1199     if (!monc->m_auth)
1200         goto out_auth_reply;
1201 
1202     ceph_con_init(&monc->con, monc, &mon_con_ops,
1203               &monc->client->msgr);
1204 
1205     monc->cur_mon = -1;
1206     monc->had_a_connection = false;
1207     monc->hunt_mult = 1;
1208 
1209     INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
1210     monc->generic_request_tree = RB_ROOT;
1211     monc->last_tid = 0;
1212 
1213     monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
1214 
1215     return 0;
1216 
1217 out_auth_reply:
1218     ceph_msg_put(monc->m_auth_reply);
1219 out_subscribe:
1220     ceph_msg_put(monc->m_subscribe);
1221 out_subscribe_ack:
1222     ceph_msg_put(monc->m_subscribe_ack);
1223 out_auth:
1224     ceph_auth_destroy(monc->auth);
1225 out_monmap:
1226     kfree(monc->monmap);
1227 out:
1228     return err;
1229 }
1230 EXPORT_SYMBOL(ceph_monc_init);
1231 
1232 void ceph_monc_stop(struct ceph_mon_client *monc)
1233 {
1234     dout("stop\n");
1235     cancel_delayed_work_sync(&monc->delayed_work);
1236 
1237     mutex_lock(&monc->mutex);
1238     __close_session(monc);
1239     monc->cur_mon = -1;
1240     mutex_unlock(&monc->mutex);
1241 
1242     /*
1243      * flush msgr queue before we destroy ourselves to ensure that:
1244      *  - any work that references our embedded con is finished.
1245      *  - any osd_client or other work that may reference an authorizer
1246      *    finishes before we shut down the auth subsystem.
1247      */
1248     ceph_msgr_flush();
1249 
1250     ceph_auth_destroy(monc->auth);
1251 
1252     WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
1253 
1254     ceph_msg_put(monc->m_auth);
1255     ceph_msg_put(monc->m_auth_reply);
1256     ceph_msg_put(monc->m_subscribe);
1257     ceph_msg_put(monc->m_subscribe_ack);
1258 
1259     kfree(monc->monmap);
1260 }
1261 EXPORT_SYMBOL(ceph_monc_stop);
1262 
1263 static void finish_hunting(struct ceph_mon_client *monc)
1264 {
1265     if (monc->hunting) {
1266         dout("%s found mon%d\n", __func__, monc->cur_mon);
1267         monc->hunting = false;
1268         monc->had_a_connection = true;
1269         un_backoff(monc);
1270         __schedule_delayed(monc);
1271     }
1272 }
1273 
1274 static void finish_auth(struct ceph_mon_client *monc, int auth_err,
1275             bool was_authed)
1276 {
1277     dout("%s auth_err %d was_authed %d\n", __func__, auth_err, was_authed);
1278     WARN_ON(auth_err > 0);
1279 
1280     monc->pending_auth = 0;
1281     if (auth_err) {
1282         monc->client->auth_err = auth_err;
1283         wake_up_all(&monc->client->auth_wq);
1284         return;
1285     }
1286 
1287     if (!was_authed && ceph_auth_is_authenticated(monc->auth)) {
1288         dout("%s authenticated, starting session global_id %llu\n",
1289              __func__, monc->auth->global_id);
1290 
1291         monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
1292         monc->client->msgr.inst.name.num =
1293                     cpu_to_le64(monc->auth->global_id);
1294 
1295         __send_subscribe(monc);
1296         __resend_generic_request(monc);
1297 
1298         pr_info("mon%d %s session established\n", monc->cur_mon,
1299             ceph_pr_addr(&monc->con.peer_addr));
1300     }
1301 }
1302 
1303 static void handle_auth_reply(struct ceph_mon_client *monc,
1304                   struct ceph_msg *msg)
1305 {
1306     bool was_authed;
1307     int ret;
1308 
1309     mutex_lock(&monc->mutex);
1310     was_authed = ceph_auth_is_authenticated(monc->auth);
1311     ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
1312                      msg->front.iov_len,
1313                      monc->m_auth->front.iov_base,
1314                      monc->m_auth->front_alloc_len);
1315     if (ret > 0) {
1316         __send_prepared_auth_request(monc, ret);
1317     } else {
1318         finish_auth(monc, ret, was_authed);
1319         finish_hunting(monc);
1320     }
1321     mutex_unlock(&monc->mutex);
1322 }
1323 
1324 static int __validate_auth(struct ceph_mon_client *monc)
1325 {
1326     int ret;
1327 
1328     if (monc->pending_auth)
1329         return 0;
1330 
1331     ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
1332                   monc->m_auth->front_alloc_len);
1333     if (ret <= 0)
1334         return ret; /* either an error, or no need to authenticate */
1335     __send_prepared_auth_request(monc, ret);
1336     return 0;
1337 }
1338 
1339 int ceph_monc_validate_auth(struct ceph_mon_client *monc)
1340 {
1341     int ret;
1342 
1343     mutex_lock(&monc->mutex);
1344     ret = __validate_auth(monc);
1345     mutex_unlock(&monc->mutex);
1346     return ret;
1347 }
1348 EXPORT_SYMBOL(ceph_monc_validate_auth);
1349 
1350 static int mon_get_auth_request(struct ceph_connection *con,
1351                 void *buf, int *buf_len,
1352                 void **authorizer, int *authorizer_len)
1353 {
1354     struct ceph_mon_client *monc = con->private;
1355     int ret;
1356 
1357     mutex_lock(&monc->mutex);
1358     ret = ceph_auth_get_request(monc->auth, buf, *buf_len);
1359     mutex_unlock(&monc->mutex);
1360     if (ret < 0)
1361         return ret;
1362 
1363     *buf_len = ret;
1364     *authorizer = NULL;
1365     *authorizer_len = 0;
1366     return 0;
1367 }
1368 
1369 static int mon_handle_auth_reply_more(struct ceph_connection *con,
1370                       void *reply, int reply_len,
1371                       void *buf, int *buf_len,
1372                       void **authorizer, int *authorizer_len)
1373 {
1374     struct ceph_mon_client *monc = con->private;
1375     int ret;
1376 
1377     mutex_lock(&monc->mutex);
1378     ret = ceph_auth_handle_reply_more(monc->auth, reply, reply_len,
1379                       buf, *buf_len);
1380     mutex_unlock(&monc->mutex);
1381     if (ret < 0)
1382         return ret;
1383 
1384     *buf_len = ret;
1385     *authorizer = NULL;
1386     *authorizer_len = 0;
1387     return 0;
1388 }
1389 
1390 static int mon_handle_auth_done(struct ceph_connection *con,
1391                 u64 global_id, void *reply, int reply_len,
1392                 u8 *session_key, int *session_key_len,
1393                 u8 *con_secret, int *con_secret_len)
1394 {
1395     struct ceph_mon_client *monc = con->private;
1396     bool was_authed;
1397     int ret;
1398 
1399     mutex_lock(&monc->mutex);
1400     WARN_ON(!monc->hunting);
1401     was_authed = ceph_auth_is_authenticated(monc->auth);
1402     ret = ceph_auth_handle_reply_done(monc->auth, global_id,
1403                       reply, reply_len,
1404                       session_key, session_key_len,
1405                       con_secret, con_secret_len);
1406     finish_auth(monc, ret, was_authed);
1407     if (!ret)
1408         finish_hunting(monc);
1409     mutex_unlock(&monc->mutex);
1410     return 0;
1411 }
1412 
1413 static int mon_handle_auth_bad_method(struct ceph_connection *con,
1414                       int used_proto, int result,
1415                       const int *allowed_protos, int proto_cnt,
1416                       const int *allowed_modes, int mode_cnt)
1417 {
1418     struct ceph_mon_client *monc = con->private;
1419     bool was_authed;
1420 
1421     mutex_lock(&monc->mutex);
1422     WARN_ON(!monc->hunting);
1423     was_authed = ceph_auth_is_authenticated(monc->auth);
1424     ceph_auth_handle_bad_method(monc->auth, used_proto, result,
1425                     allowed_protos, proto_cnt,
1426                     allowed_modes, mode_cnt);
1427     finish_auth(monc, -EACCES, was_authed);
1428     mutex_unlock(&monc->mutex);
1429     return 0;
1430 }
1431 
1432 /*
1433  * handle incoming message
1434  */
1435 static void mon_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1436 {
1437     struct ceph_mon_client *monc = con->private;
1438     int type = le16_to_cpu(msg->hdr.type);
1439 
1440     switch (type) {
1441     case CEPH_MSG_AUTH_REPLY:
1442         handle_auth_reply(monc, msg);
1443         break;
1444 
1445     case CEPH_MSG_MON_SUBSCRIBE_ACK:
1446         handle_subscribe_ack(monc, msg);
1447         break;
1448 
1449     case CEPH_MSG_STATFS_REPLY:
1450         handle_statfs_reply(monc, msg);
1451         break;
1452 
1453     case CEPH_MSG_MON_GET_VERSION_REPLY:
1454         handle_get_version_reply(monc, msg);
1455         break;
1456 
1457     case CEPH_MSG_MON_COMMAND_ACK:
1458         handle_command_ack(monc, msg);
1459         break;
1460 
1461     case CEPH_MSG_MON_MAP:
1462         ceph_monc_handle_map(monc, msg);
1463         break;
1464 
1465     case CEPH_MSG_OSD_MAP:
1466         ceph_osdc_handle_map(&monc->client->osdc, msg);
1467         break;
1468 
1469     default:
1470         /* can the chained handler handle it? */
1471         if (monc->client->extra_mon_dispatch &&
1472             monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1473             break;
1474 
1475         pr_err("received unknown message type %d %s\n", type,
1476                ceph_msg_type_name(type));
1477     }
1478     ceph_msg_put(msg);
1479 }
1480 
1481 /*
1482  * Allocate memory for incoming message
1483  */
1484 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1485                       struct ceph_msg_header *hdr,
1486                       int *skip)
1487 {
1488     struct ceph_mon_client *monc = con->private;
1489     int type = le16_to_cpu(hdr->type);
1490     int front_len = le32_to_cpu(hdr->front_len);
1491     struct ceph_msg *m = NULL;
1492 
1493     *skip = 0;
1494 
1495     switch (type) {
1496     case CEPH_MSG_MON_SUBSCRIBE_ACK:
1497         m = ceph_msg_get(monc->m_subscribe_ack);
1498         break;
1499     case CEPH_MSG_STATFS_REPLY:
1500     case CEPH_MSG_MON_COMMAND_ACK:
1501         return get_generic_reply(con, hdr, skip);
1502     case CEPH_MSG_AUTH_REPLY:
1503         m = ceph_msg_get(monc->m_auth_reply);
1504         break;
1505     case CEPH_MSG_MON_GET_VERSION_REPLY:
1506         if (le64_to_cpu(hdr->tid) != 0)
1507             return get_generic_reply(con, hdr, skip);
1508 
1509         /*
1510          * Older OSDs don't set reply tid even if the original
1511          * request had a non-zero tid.  Work around this weirdness
1512          * by allocating a new message.
1513          */
1514         fallthrough;
1515     case CEPH_MSG_MON_MAP:
1516     case CEPH_MSG_MDS_MAP:
1517     case CEPH_MSG_OSD_MAP:
1518     case CEPH_MSG_FS_MAP_USER:
1519         m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1520         if (!m)
1521             return NULL;    /* ENOMEM--return skip == 0 */
1522         break;
1523     }
1524 
1525     if (!m) {
1526         pr_info("alloc_msg unknown type %d\n", type);
1527         *skip = 1;
1528     } else if (front_len > m->front_alloc_len) {
1529         pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
1530             front_len, m->front_alloc_len,
1531             (unsigned int)con->peer_name.type,
1532             le64_to_cpu(con->peer_name.num));
1533         ceph_msg_put(m);
1534         m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1535     }
1536 
1537     return m;
1538 }
1539 
1540 /*
1541  * If the monitor connection resets, pick a new monitor and resubmit
1542  * any pending requests.
1543  */
1544 static void mon_fault(struct ceph_connection *con)
1545 {
1546     struct ceph_mon_client *monc = con->private;
1547 
1548     mutex_lock(&monc->mutex);
1549     dout("%s mon%d\n", __func__, monc->cur_mon);
1550     if (monc->cur_mon >= 0) {
1551         if (!monc->hunting) {
1552             dout("%s hunting for new mon\n", __func__);
1553             reopen_session(monc);
1554             __schedule_delayed(monc);
1555         } else {
1556             dout("%s already hunting\n", __func__);
1557         }
1558     }
1559     mutex_unlock(&monc->mutex);
1560 }
1561 
1562 /*
1563  * We can ignore refcounting on the connection struct, as all references
1564  * will come from the messenger workqueue, which is drained prior to
1565  * mon_client destruction.
1566  */
1567 static struct ceph_connection *mon_get_con(struct ceph_connection *con)
1568 {
1569     return con;
1570 }
1571 
1572 static void mon_put_con(struct ceph_connection *con)
1573 {
1574 }
1575 
1576 static const struct ceph_connection_operations mon_con_ops = {
1577     .get = mon_get_con,
1578     .put = mon_put_con,
1579     .alloc_msg = mon_alloc_msg,
1580     .dispatch = mon_dispatch,
1581     .fault = mon_fault,
1582     .get_auth_request = mon_get_auth_request,
1583     .handle_auth_reply_more = mon_handle_auth_reply_more,
1584     .handle_auth_done = mon_handle_auth_done,
1585     .handle_auth_bad_method = mon_handle_auth_bad_method,
1586 };