Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0
0002 #include <linux/ceph/ceph_debug.h>
0003 
0004 #include <linux/fs.h>
0005 #include <linux/wait.h>
0006 #include <linux/slab.h>
0007 #include <linux/gfp.h>
0008 #include <linux/sched.h>
0009 #include <linux/debugfs.h>
0010 #include <linux/seq_file.h>
0011 #include <linux/ratelimit.h>
0012 #include <linux/bits.h>
0013 #include <linux/ktime.h>
0014 #include <linux/bitmap.h>
0015 
0016 #include "super.h"
0017 #include "mds_client.h"
0018 
0019 #include <linux/ceph/ceph_features.h>
0020 #include <linux/ceph/messenger.h>
0021 #include <linux/ceph/decode.h>
0022 #include <linux/ceph/pagelist.h>
0023 #include <linux/ceph/auth.h>
0024 #include <linux/ceph/debugfs.h>
0025 
0026 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
0027 
0028 /*
0029  * A cluster of MDS (metadata server) daemons is responsible for
0030  * managing the file system namespace (the directory hierarchy and
0031  * inodes) and for coordinating shared access to storage.  Metadata is
0032  * partitioning hierarchically across a number of servers, and that
0033  * partition varies over time as the cluster adjusts the distribution
0034  * in order to balance load.
0035  *
0036  * The MDS client is primarily responsible to managing synchronous
0037  * metadata requests for operations like open, unlink, and so forth.
0038  * If there is a MDS failure, we find out about it when we (possibly
0039  * request and) receive a new MDS map, and can resubmit affected
0040  * requests.
0041  *
0042  * For the most part, though, we take advantage of a lossless
0043  * communications channel to the MDS, and do not need to worry about
0044  * timing out or resubmitting requests.
0045  *
0046  * We maintain a stateful "session" with each MDS we interact with.
0047  * Within each session, we sent periodic heartbeat messages to ensure
0048  * any capabilities or leases we have been issues remain valid.  If
0049  * the session times out and goes stale, our leases and capabilities
0050  * are no longer valid.
0051  */
0052 
0053 struct ceph_reconnect_state {
0054     struct ceph_mds_session *session;
0055     int nr_caps, nr_realms;
0056     struct ceph_pagelist *pagelist;
0057     unsigned msg_version;
0058     bool allow_multi;
0059 };
0060 
0061 static void __wake_requests(struct ceph_mds_client *mdsc,
0062                 struct list_head *head);
0063 static void ceph_cap_release_work(struct work_struct *work);
0064 static void ceph_cap_reclaim_work(struct work_struct *work);
0065 
0066 static const struct ceph_connection_operations mds_con_ops;
0067 
0068 
0069 /*
0070  * mds reply parsing
0071  */
0072 
0073 static int parse_reply_info_quota(void **p, void *end,
0074                   struct ceph_mds_reply_info_in *info)
0075 {
0076     u8 struct_v, struct_compat;
0077     u32 struct_len;
0078 
0079     ceph_decode_8_safe(p, end, struct_v, bad);
0080     ceph_decode_8_safe(p, end, struct_compat, bad);
0081     /* struct_v is expected to be >= 1. we only
0082      * understand encoding with struct_compat == 1. */
0083     if (!struct_v || struct_compat != 1)
0084         goto bad;
0085     ceph_decode_32_safe(p, end, struct_len, bad);
0086     ceph_decode_need(p, end, struct_len, bad);
0087     end = *p + struct_len;
0088     ceph_decode_64_safe(p, end, info->max_bytes, bad);
0089     ceph_decode_64_safe(p, end, info->max_files, bad);
0090     *p = end;
0091     return 0;
0092 bad:
0093     return -EIO;
0094 }
0095 
0096 /*
0097  * parse individual inode info
0098  */
0099 static int parse_reply_info_in(void **p, void *end,
0100                    struct ceph_mds_reply_info_in *info,
0101                    u64 features)
0102 {
0103     int err = 0;
0104     u8 struct_v = 0;
0105 
0106     if (features == (u64)-1) {
0107         u32 struct_len;
0108         u8 struct_compat;
0109         ceph_decode_8_safe(p, end, struct_v, bad);
0110         ceph_decode_8_safe(p, end, struct_compat, bad);
0111         /* struct_v is expected to be >= 1. we only understand
0112          * encoding with struct_compat == 1. */
0113         if (!struct_v || struct_compat != 1)
0114             goto bad;
0115         ceph_decode_32_safe(p, end, struct_len, bad);
0116         ceph_decode_need(p, end, struct_len, bad);
0117         end = *p + struct_len;
0118     }
0119 
0120     ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
0121     info->in = *p;
0122     *p += sizeof(struct ceph_mds_reply_inode) +
0123         sizeof(*info->in->fragtree.splits) *
0124         le32_to_cpu(info->in->fragtree.nsplits);
0125 
0126     ceph_decode_32_safe(p, end, info->symlink_len, bad);
0127     ceph_decode_need(p, end, info->symlink_len, bad);
0128     info->symlink = *p;
0129     *p += info->symlink_len;
0130 
0131     ceph_decode_copy_safe(p, end, &info->dir_layout,
0132                   sizeof(info->dir_layout), bad);
0133     ceph_decode_32_safe(p, end, info->xattr_len, bad);
0134     ceph_decode_need(p, end, info->xattr_len, bad);
0135     info->xattr_data = *p;
0136     *p += info->xattr_len;
0137 
0138     if (features == (u64)-1) {
0139         /* inline data */
0140         ceph_decode_64_safe(p, end, info->inline_version, bad);
0141         ceph_decode_32_safe(p, end, info->inline_len, bad);
0142         ceph_decode_need(p, end, info->inline_len, bad);
0143         info->inline_data = *p;
0144         *p += info->inline_len;
0145         /* quota */
0146         err = parse_reply_info_quota(p, end, info);
0147         if (err < 0)
0148             goto out_bad;
0149         /* pool namespace */
0150         ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
0151         if (info->pool_ns_len > 0) {
0152             ceph_decode_need(p, end, info->pool_ns_len, bad);
0153             info->pool_ns_data = *p;
0154             *p += info->pool_ns_len;
0155         }
0156 
0157         /* btime */
0158         ceph_decode_need(p, end, sizeof(info->btime), bad);
0159         ceph_decode_copy(p, &info->btime, sizeof(info->btime));
0160 
0161         /* change attribute */
0162         ceph_decode_64_safe(p, end, info->change_attr, bad);
0163 
0164         /* dir pin */
0165         if (struct_v >= 2) {
0166             ceph_decode_32_safe(p, end, info->dir_pin, bad);
0167         } else {
0168             info->dir_pin = -ENODATA;
0169         }
0170 
0171         /* snapshot birth time, remains zero for v<=2 */
0172         if (struct_v >= 3) {
0173             ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
0174             ceph_decode_copy(p, &info->snap_btime,
0175                      sizeof(info->snap_btime));
0176         } else {
0177             memset(&info->snap_btime, 0, sizeof(info->snap_btime));
0178         }
0179 
0180         /* snapshot count, remains zero for v<=3 */
0181         if (struct_v >= 4) {
0182             ceph_decode_64_safe(p, end, info->rsnaps, bad);
0183         } else {
0184             info->rsnaps = 0;
0185         }
0186 
0187         *p = end;
0188     } else {
0189         if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
0190             ceph_decode_64_safe(p, end, info->inline_version, bad);
0191             ceph_decode_32_safe(p, end, info->inline_len, bad);
0192             ceph_decode_need(p, end, info->inline_len, bad);
0193             info->inline_data = *p;
0194             *p += info->inline_len;
0195         } else
0196             info->inline_version = CEPH_INLINE_NONE;
0197 
0198         if (features & CEPH_FEATURE_MDS_QUOTA) {
0199             err = parse_reply_info_quota(p, end, info);
0200             if (err < 0)
0201                 goto out_bad;
0202         } else {
0203             info->max_bytes = 0;
0204             info->max_files = 0;
0205         }
0206 
0207         info->pool_ns_len = 0;
0208         info->pool_ns_data = NULL;
0209         if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
0210             ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
0211             if (info->pool_ns_len > 0) {
0212                 ceph_decode_need(p, end, info->pool_ns_len, bad);
0213                 info->pool_ns_data = *p;
0214                 *p += info->pool_ns_len;
0215             }
0216         }
0217 
0218         if (features & CEPH_FEATURE_FS_BTIME) {
0219             ceph_decode_need(p, end, sizeof(info->btime), bad);
0220             ceph_decode_copy(p, &info->btime, sizeof(info->btime));
0221             ceph_decode_64_safe(p, end, info->change_attr, bad);
0222         }
0223 
0224         info->dir_pin = -ENODATA;
0225         /* info->snap_btime and info->rsnaps remain zero */
0226     }
0227     return 0;
0228 bad:
0229     err = -EIO;
0230 out_bad:
0231     return err;
0232 }
0233 
0234 static int parse_reply_info_dir(void **p, void *end,
0235                 struct ceph_mds_reply_dirfrag **dirfrag,
0236                 u64 features)
0237 {
0238     if (features == (u64)-1) {
0239         u8 struct_v, struct_compat;
0240         u32 struct_len;
0241         ceph_decode_8_safe(p, end, struct_v, bad);
0242         ceph_decode_8_safe(p, end, struct_compat, bad);
0243         /* struct_v is expected to be >= 1. we only understand
0244          * encoding whose struct_compat == 1. */
0245         if (!struct_v || struct_compat != 1)
0246             goto bad;
0247         ceph_decode_32_safe(p, end, struct_len, bad);
0248         ceph_decode_need(p, end, struct_len, bad);
0249         end = *p + struct_len;
0250     }
0251 
0252     ceph_decode_need(p, end, sizeof(**dirfrag), bad);
0253     *dirfrag = *p;
0254     *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
0255     if (unlikely(*p > end))
0256         goto bad;
0257     if (features == (u64)-1)
0258         *p = end;
0259     return 0;
0260 bad:
0261     return -EIO;
0262 }
0263 
0264 static int parse_reply_info_lease(void **p, void *end,
0265                   struct ceph_mds_reply_lease **lease,
0266                   u64 features)
0267 {
0268     if (features == (u64)-1) {
0269         u8 struct_v, struct_compat;
0270         u32 struct_len;
0271         ceph_decode_8_safe(p, end, struct_v, bad);
0272         ceph_decode_8_safe(p, end, struct_compat, bad);
0273         /* struct_v is expected to be >= 1. we only understand
0274          * encoding whose struct_compat == 1. */
0275         if (!struct_v || struct_compat != 1)
0276             goto bad;
0277         ceph_decode_32_safe(p, end, struct_len, bad);
0278         ceph_decode_need(p, end, struct_len, bad);
0279         end = *p + struct_len;
0280     }
0281 
0282     ceph_decode_need(p, end, sizeof(**lease), bad);
0283     *lease = *p;
0284     *p += sizeof(**lease);
0285     if (features == (u64)-1)
0286         *p = end;
0287     return 0;
0288 bad:
0289     return -EIO;
0290 }
0291 
0292 /*
0293  * parse a normal reply, which may contain a (dir+)dentry and/or a
0294  * target inode.
0295  */
0296 static int parse_reply_info_trace(void **p, void *end,
0297                   struct ceph_mds_reply_info_parsed *info,
0298                   u64 features)
0299 {
0300     int err;
0301 
0302     if (info->head->is_dentry) {
0303         err = parse_reply_info_in(p, end, &info->diri, features);
0304         if (err < 0)
0305             goto out_bad;
0306 
0307         err = parse_reply_info_dir(p, end, &info->dirfrag, features);
0308         if (err < 0)
0309             goto out_bad;
0310 
0311         ceph_decode_32_safe(p, end, info->dname_len, bad);
0312         ceph_decode_need(p, end, info->dname_len, bad);
0313         info->dname = *p;
0314         *p += info->dname_len;
0315 
0316         err = parse_reply_info_lease(p, end, &info->dlease, features);
0317         if (err < 0)
0318             goto out_bad;
0319     }
0320 
0321     if (info->head->is_target) {
0322         err = parse_reply_info_in(p, end, &info->targeti, features);
0323         if (err < 0)
0324             goto out_bad;
0325     }
0326 
0327     if (unlikely(*p != end))
0328         goto bad;
0329     return 0;
0330 
0331 bad:
0332     err = -EIO;
0333 out_bad:
0334     pr_err("problem parsing mds trace %d\n", err);
0335     return err;
0336 }
0337 
0338 /*
0339  * parse readdir results
0340  */
0341 static int parse_reply_info_readdir(void **p, void *end,
0342                 struct ceph_mds_reply_info_parsed *info,
0343                 u64 features)
0344 {
0345     u32 num, i = 0;
0346     int err;
0347 
0348     err = parse_reply_info_dir(p, end, &info->dir_dir, features);
0349     if (err < 0)
0350         goto out_bad;
0351 
0352     ceph_decode_need(p, end, sizeof(num) + 2, bad);
0353     num = ceph_decode_32(p);
0354     {
0355         u16 flags = ceph_decode_16(p);
0356         info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
0357         info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
0358         info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
0359         info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
0360     }
0361     if (num == 0)
0362         goto done;
0363 
0364     BUG_ON(!info->dir_entries);
0365     if ((unsigned long)(info->dir_entries + num) >
0366         (unsigned long)info->dir_entries + info->dir_buf_size) {
0367         pr_err("dir contents are larger than expected\n");
0368         WARN_ON(1);
0369         goto bad;
0370     }
0371 
0372     info->dir_nr = num;
0373     while (num) {
0374         struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
0375         /* dentry */
0376         ceph_decode_32_safe(p, end, rde->name_len, bad);
0377         ceph_decode_need(p, end, rde->name_len, bad);
0378         rde->name = *p;
0379         *p += rde->name_len;
0380         dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
0381 
0382         /* dentry lease */
0383         err = parse_reply_info_lease(p, end, &rde->lease, features);
0384         if (err)
0385             goto out_bad;
0386         /* inode */
0387         err = parse_reply_info_in(p, end, &rde->inode, features);
0388         if (err < 0)
0389             goto out_bad;
0390         /* ceph_readdir_prepopulate() will update it */
0391         rde->offset = 0;
0392         i++;
0393         num--;
0394     }
0395 
0396 done:
0397     /* Skip over any unrecognized fields */
0398     *p = end;
0399     return 0;
0400 
0401 bad:
0402     err = -EIO;
0403 out_bad:
0404     pr_err("problem parsing dir contents %d\n", err);
0405     return err;
0406 }
0407 
0408 /*
0409  * parse fcntl F_GETLK results
0410  */
0411 static int parse_reply_info_filelock(void **p, void *end,
0412                      struct ceph_mds_reply_info_parsed *info,
0413                      u64 features)
0414 {
0415     if (*p + sizeof(*info->filelock_reply) > end)
0416         goto bad;
0417 
0418     info->filelock_reply = *p;
0419 
0420     /* Skip over any unrecognized fields */
0421     *p = end;
0422     return 0;
0423 bad:
0424     return -EIO;
0425 }
0426 
0427 
0428 #if BITS_PER_LONG == 64
0429 
0430 #define DELEGATED_INO_AVAILABLE     xa_mk_value(1)
0431 
0432 static int ceph_parse_deleg_inos(void **p, void *end,
0433                  struct ceph_mds_session *s)
0434 {
0435     u32 sets;
0436 
0437     ceph_decode_32_safe(p, end, sets, bad);
0438     dout("got %u sets of delegated inodes\n", sets);
0439     while (sets--) {
0440         u64 start, len;
0441 
0442         ceph_decode_64_safe(p, end, start, bad);
0443         ceph_decode_64_safe(p, end, len, bad);
0444 
0445         /* Don't accept a delegation of system inodes */
0446         if (start < CEPH_INO_SYSTEM_BASE) {
0447             pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
0448                     start, len);
0449             continue;
0450         }
0451         while (len--) {
0452             int err = xa_insert(&s->s_delegated_inos, start++,
0453                         DELEGATED_INO_AVAILABLE,
0454                         GFP_KERNEL);
0455             if (!err) {
0456                 dout("added delegated inode 0x%llx\n",
0457                      start - 1);
0458             } else if (err == -EBUSY) {
0459                 pr_warn("MDS delegated inode 0x%llx more than once.\n",
0460                     start - 1);
0461             } else {
0462                 return err;
0463             }
0464         }
0465     }
0466     return 0;
0467 bad:
0468     return -EIO;
0469 }
0470 
0471 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
0472 {
0473     unsigned long ino;
0474     void *val;
0475 
0476     xa_for_each(&s->s_delegated_inos, ino, val) {
0477         val = xa_erase(&s->s_delegated_inos, ino);
0478         if (val == DELEGATED_INO_AVAILABLE)
0479             return ino;
0480     }
0481     return 0;
0482 }
0483 
0484 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
0485 {
0486     return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
0487              GFP_KERNEL);
0488 }
0489 #else /* BITS_PER_LONG == 64 */
0490 /*
0491  * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
0492  * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
0493  * and bottom words?
0494  */
0495 static int ceph_parse_deleg_inos(void **p, void *end,
0496                  struct ceph_mds_session *s)
0497 {
0498     u32 sets;
0499 
0500     ceph_decode_32_safe(p, end, sets, bad);
0501     if (sets)
0502         ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
0503     return 0;
0504 bad:
0505     return -EIO;
0506 }
0507 
0508 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
0509 {
0510     return 0;
0511 }
0512 
0513 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
0514 {
0515     return 0;
0516 }
0517 #endif /* BITS_PER_LONG == 64 */
0518 
0519 /*
0520  * parse create results
0521  */
0522 static int parse_reply_info_create(void **p, void *end,
0523                   struct ceph_mds_reply_info_parsed *info,
0524                   u64 features, struct ceph_mds_session *s)
0525 {
0526     int ret;
0527 
0528     if (features == (u64)-1 ||
0529         (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
0530         if (*p == end) {
0531             /* Malformed reply? */
0532             info->has_create_ino = false;
0533         } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
0534             info->has_create_ino = true;
0535             /* struct_v, struct_compat, and len */
0536             ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
0537             ceph_decode_64_safe(p, end, info->ino, bad);
0538             ret = ceph_parse_deleg_inos(p, end, s);
0539             if (ret)
0540                 return ret;
0541         } else {
0542             /* legacy */
0543             ceph_decode_64_safe(p, end, info->ino, bad);
0544             info->has_create_ino = true;
0545         }
0546     } else {
0547         if (*p != end)
0548             goto bad;
0549     }
0550 
0551     /* Skip over any unrecognized fields */
0552     *p = end;
0553     return 0;
0554 bad:
0555     return -EIO;
0556 }
0557 
0558 static int parse_reply_info_getvxattr(void **p, void *end,
0559                       struct ceph_mds_reply_info_parsed *info,
0560                       u64 features)
0561 {
0562     u32 value_len;
0563 
0564     ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */
0565     ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */
0566     ceph_decode_skip_32(p, end, bad); /* skip payload length */
0567 
0568     ceph_decode_32_safe(p, end, value_len, bad);
0569 
0570     if (value_len == end - *p) {
0571       info->xattr_info.xattr_value = *p;
0572       info->xattr_info.xattr_value_len = value_len;
0573       *p = end;
0574       return value_len;
0575     }
0576 bad:
0577     return -EIO;
0578 }
0579 
0580 /*
0581  * parse extra results
0582  */
0583 static int parse_reply_info_extra(void **p, void *end,
0584                   struct ceph_mds_reply_info_parsed *info,
0585                   u64 features, struct ceph_mds_session *s)
0586 {
0587     u32 op = le32_to_cpu(info->head->op);
0588 
0589     if (op == CEPH_MDS_OP_GETFILELOCK)
0590         return parse_reply_info_filelock(p, end, info, features);
0591     else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
0592         return parse_reply_info_readdir(p, end, info, features);
0593     else if (op == CEPH_MDS_OP_CREATE)
0594         return parse_reply_info_create(p, end, info, features, s);
0595     else if (op == CEPH_MDS_OP_GETVXATTR)
0596         return parse_reply_info_getvxattr(p, end, info, features);
0597     else
0598         return -EIO;
0599 }
0600 
0601 /*
0602  * parse entire mds reply
0603  */
0604 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
0605                 struct ceph_mds_reply_info_parsed *info,
0606                 u64 features)
0607 {
0608     void *p, *end;
0609     u32 len;
0610     int err;
0611 
0612     info->head = msg->front.iov_base;
0613     p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
0614     end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
0615 
0616     /* trace */
0617     ceph_decode_32_safe(&p, end, len, bad);
0618     if (len > 0) {
0619         ceph_decode_need(&p, end, len, bad);
0620         err = parse_reply_info_trace(&p, p+len, info, features);
0621         if (err < 0)
0622             goto out_bad;
0623     }
0624 
0625     /* extra */
0626     ceph_decode_32_safe(&p, end, len, bad);
0627     if (len > 0) {
0628         ceph_decode_need(&p, end, len, bad);
0629         err = parse_reply_info_extra(&p, p+len, info, features, s);
0630         if (err < 0)
0631             goto out_bad;
0632     }
0633 
0634     /* snap blob */
0635     ceph_decode_32_safe(&p, end, len, bad);
0636     info->snapblob_len = len;
0637     info->snapblob = p;
0638     p += len;
0639 
0640     if (p != end)
0641         goto bad;
0642     return 0;
0643 
0644 bad:
0645     err = -EIO;
0646 out_bad:
0647     pr_err("mds parse_reply err %d\n", err);
0648     return err;
0649 }
0650 
0651 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
0652 {
0653     if (!info->dir_entries)
0654         return;
0655     free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
0656 }
0657 
0658 /*
0659  * In async unlink case the kclient won't wait for the first reply
0660  * from MDS and just drop all the links and unhash the dentry and then
0661  * succeeds immediately.
0662  *
0663  * For any new create/link/rename,etc requests followed by using the
0664  * same file names we must wait for the first reply of the inflight
0665  * unlink request, or the MDS possibly will fail these following
0666  * requests with -EEXIST if the inflight async unlink request was
0667  * delayed for some reasons.
0668  *
0669  * And the worst case is that for the none async openc request it will
0670  * successfully open the file if the CDentry hasn't been unlinked yet,
0671  * but later the previous delayed async unlink request will remove the
0672  * CDenty. That means the just created file is possiblly deleted later
0673  * by accident.
0674  *
0675  * We need to wait for the inflight async unlink requests to finish
0676  * when creating new files/directories by using the same file names.
0677  */
0678 int ceph_wait_on_conflict_unlink(struct dentry *dentry)
0679 {
0680     struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
0681     struct dentry *pdentry = dentry->d_parent;
0682     struct dentry *udentry, *found = NULL;
0683     struct ceph_dentry_info *di;
0684     struct qstr dname;
0685     u32 hash = dentry->d_name.hash;
0686     int err;
0687 
0688     dname.name = dentry->d_name.name;
0689     dname.len = dentry->d_name.len;
0690 
0691     rcu_read_lock();
0692     hash_for_each_possible_rcu(fsc->async_unlink_conflict, di,
0693                    hnode, hash) {
0694         udentry = di->dentry;
0695 
0696         spin_lock(&udentry->d_lock);
0697         if (udentry->d_name.hash != hash)
0698             goto next;
0699         if (unlikely(udentry->d_parent != pdentry))
0700             goto next;
0701         if (!hash_hashed(&di->hnode))
0702             goto next;
0703 
0704         if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
0705             pr_warn("%s dentry %p:%pd async unlink bit is not set\n",
0706                 __func__, dentry, dentry);
0707 
0708         if (!d_same_name(udentry, pdentry, &dname))
0709             goto next;
0710 
0711         spin_unlock(&udentry->d_lock);
0712         found = dget(udentry);
0713         break;
0714 next:
0715         spin_unlock(&udentry->d_lock);
0716     }
0717     rcu_read_unlock();
0718 
0719     if (likely(!found))
0720         return 0;
0721 
0722     dout("%s dentry %p:%pd conflict with old %p:%pd\n", __func__,
0723          dentry, dentry, found, found);
0724 
0725     err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT,
0726               TASK_KILLABLE);
0727     dput(found);
0728     return err;
0729 }
0730 
0731 
0732 /*
0733  * sessions
0734  */
0735 const char *ceph_session_state_name(int s)
0736 {
0737     switch (s) {
0738     case CEPH_MDS_SESSION_NEW: return "new";
0739     case CEPH_MDS_SESSION_OPENING: return "opening";
0740     case CEPH_MDS_SESSION_OPEN: return "open";
0741     case CEPH_MDS_SESSION_HUNG: return "hung";
0742     case CEPH_MDS_SESSION_CLOSING: return "closing";
0743     case CEPH_MDS_SESSION_CLOSED: return "closed";
0744     case CEPH_MDS_SESSION_RESTARTING: return "restarting";
0745     case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
0746     case CEPH_MDS_SESSION_REJECTED: return "rejected";
0747     default: return "???";
0748     }
0749 }
0750 
0751 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
0752 {
0753     if (refcount_inc_not_zero(&s->s_ref))
0754         return s;
0755     return NULL;
0756 }
0757 
0758 void ceph_put_mds_session(struct ceph_mds_session *s)
0759 {
0760     if (IS_ERR_OR_NULL(s))
0761         return;
0762 
0763     if (refcount_dec_and_test(&s->s_ref)) {
0764         if (s->s_auth.authorizer)
0765             ceph_auth_destroy_authorizer(s->s_auth.authorizer);
0766         WARN_ON(mutex_is_locked(&s->s_mutex));
0767         xa_destroy(&s->s_delegated_inos);
0768         kfree(s);
0769     }
0770 }
0771 
0772 /*
0773  * called under mdsc->mutex
0774  */
0775 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
0776                            int mds)
0777 {
0778     if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
0779         return NULL;
0780     return ceph_get_mds_session(mdsc->sessions[mds]);
0781 }
0782 
0783 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
0784 {
0785     if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
0786         return false;
0787     else
0788         return true;
0789 }
0790 
0791 static int __verify_registered_session(struct ceph_mds_client *mdsc,
0792                        struct ceph_mds_session *s)
0793 {
0794     if (s->s_mds >= mdsc->max_sessions ||
0795         mdsc->sessions[s->s_mds] != s)
0796         return -ENOENT;
0797     return 0;
0798 }
0799 
0800 /*
0801  * create+register a new session for given mds.
0802  * called under mdsc->mutex.
0803  */
0804 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
0805                          int mds)
0806 {
0807     struct ceph_mds_session *s;
0808 
0809     if (mds >= mdsc->mdsmap->possible_max_rank)
0810         return ERR_PTR(-EINVAL);
0811 
0812     s = kzalloc(sizeof(*s), GFP_NOFS);
0813     if (!s)
0814         return ERR_PTR(-ENOMEM);
0815 
0816     if (mds >= mdsc->max_sessions) {
0817         int newmax = 1 << get_count_order(mds + 1);
0818         struct ceph_mds_session **sa;
0819 
0820         dout("%s: realloc to %d\n", __func__, newmax);
0821         sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
0822         if (!sa)
0823             goto fail_realloc;
0824         if (mdsc->sessions) {
0825             memcpy(sa, mdsc->sessions,
0826                    mdsc->max_sessions * sizeof(void *));
0827             kfree(mdsc->sessions);
0828         }
0829         mdsc->sessions = sa;
0830         mdsc->max_sessions = newmax;
0831     }
0832 
0833     dout("%s: mds%d\n", __func__, mds);
0834     s->s_mdsc = mdsc;
0835     s->s_mds = mds;
0836     s->s_state = CEPH_MDS_SESSION_NEW;
0837     mutex_init(&s->s_mutex);
0838 
0839     ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
0840 
0841     atomic_set(&s->s_cap_gen, 1);
0842     s->s_cap_ttl = jiffies - 1;
0843 
0844     spin_lock_init(&s->s_cap_lock);
0845     INIT_LIST_HEAD(&s->s_caps);
0846     refcount_set(&s->s_ref, 1);
0847     INIT_LIST_HEAD(&s->s_waiting);
0848     INIT_LIST_HEAD(&s->s_unsafe);
0849     xa_init(&s->s_delegated_inos);
0850     INIT_LIST_HEAD(&s->s_cap_releases);
0851     INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
0852 
0853     INIT_LIST_HEAD(&s->s_cap_dirty);
0854     INIT_LIST_HEAD(&s->s_cap_flushing);
0855 
0856     mdsc->sessions[mds] = s;
0857     atomic_inc(&mdsc->num_sessions);
0858     refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
0859 
0860     ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
0861               ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
0862 
0863     return s;
0864 
0865 fail_realloc:
0866     kfree(s);
0867     return ERR_PTR(-ENOMEM);
0868 }
0869 
0870 /*
0871  * called under mdsc->mutex
0872  */
0873 static void __unregister_session(struct ceph_mds_client *mdsc,
0874                    struct ceph_mds_session *s)
0875 {
0876     dout("__unregister_session mds%d %p\n", s->s_mds, s);
0877     BUG_ON(mdsc->sessions[s->s_mds] != s);
0878     mdsc->sessions[s->s_mds] = NULL;
0879     ceph_con_close(&s->s_con);
0880     ceph_put_mds_session(s);
0881     atomic_dec(&mdsc->num_sessions);
0882 }
0883 
0884 /*
0885  * drop session refs in request.
0886  *
0887  * should be last request ref, or hold mdsc->mutex
0888  */
0889 static void put_request_session(struct ceph_mds_request *req)
0890 {
0891     if (req->r_session) {
0892         ceph_put_mds_session(req->r_session);
0893         req->r_session = NULL;
0894     }
0895 }
0896 
0897 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
0898                 void (*cb)(struct ceph_mds_session *),
0899                 bool check_state)
0900 {
0901     int mds;
0902 
0903     mutex_lock(&mdsc->mutex);
0904     for (mds = 0; mds < mdsc->max_sessions; ++mds) {
0905         struct ceph_mds_session *s;
0906 
0907         s = __ceph_lookup_mds_session(mdsc, mds);
0908         if (!s)
0909             continue;
0910 
0911         if (check_state && !check_session_state(s)) {
0912             ceph_put_mds_session(s);
0913             continue;
0914         }
0915 
0916         mutex_unlock(&mdsc->mutex);
0917         cb(s);
0918         ceph_put_mds_session(s);
0919         mutex_lock(&mdsc->mutex);
0920     }
0921     mutex_unlock(&mdsc->mutex);
0922 }
0923 
0924 void ceph_mdsc_release_request(struct kref *kref)
0925 {
0926     struct ceph_mds_request *req = container_of(kref,
0927                             struct ceph_mds_request,
0928                             r_kref);
0929     ceph_mdsc_release_dir_caps_no_check(req);
0930     destroy_reply_info(&req->r_reply_info);
0931     if (req->r_request)
0932         ceph_msg_put(req->r_request);
0933     if (req->r_reply)
0934         ceph_msg_put(req->r_reply);
0935     if (req->r_inode) {
0936         ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
0937         iput(req->r_inode);
0938     }
0939     if (req->r_parent) {
0940         ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
0941         iput(req->r_parent);
0942     }
0943     iput(req->r_target_inode);
0944     if (req->r_dentry)
0945         dput(req->r_dentry);
0946     if (req->r_old_dentry)
0947         dput(req->r_old_dentry);
0948     if (req->r_old_dentry_dir) {
0949         /*
0950          * track (and drop pins for) r_old_dentry_dir
0951          * separately, since r_old_dentry's d_parent may have
0952          * changed between the dir mutex being dropped and
0953          * this request being freed.
0954          */
0955         ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
0956                   CEPH_CAP_PIN);
0957         iput(req->r_old_dentry_dir);
0958     }
0959     kfree(req->r_path1);
0960     kfree(req->r_path2);
0961     put_cred(req->r_cred);
0962     if (req->r_pagelist)
0963         ceph_pagelist_release(req->r_pagelist);
0964     put_request_session(req);
0965     ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
0966     WARN_ON_ONCE(!list_empty(&req->r_wait));
0967     kmem_cache_free(ceph_mds_request_cachep, req);
0968 }
0969 
0970 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
0971 
0972 /*
0973  * lookup session, bump ref if found.
0974  *
0975  * called under mdsc->mutex.
0976  */
0977 static struct ceph_mds_request *
0978 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
0979 {
0980     struct ceph_mds_request *req;
0981 
0982     req = lookup_request(&mdsc->request_tree, tid);
0983     if (req)
0984         ceph_mdsc_get_request(req);
0985 
0986     return req;
0987 }
0988 
0989 /*
0990  * Register an in-flight request, and assign a tid.  Link to directory
0991  * are modifying (if any).
0992  *
0993  * Called under mdsc->mutex.
0994  */
0995 static void __register_request(struct ceph_mds_client *mdsc,
0996                    struct ceph_mds_request *req,
0997                    struct inode *dir)
0998 {
0999     int ret = 0;
1000 
1001     req->r_tid = ++mdsc->last_tid;
1002     if (req->r_num_caps) {
1003         ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
1004                     req->r_num_caps);
1005         if (ret < 0) {
1006             pr_err("__register_request %p "
1007                    "failed to reserve caps: %d\n", req, ret);
1008             /* set req->r_err to fail early from __do_request */
1009             req->r_err = ret;
1010             return;
1011         }
1012     }
1013     dout("__register_request %p tid %lld\n", req, req->r_tid);
1014     ceph_mdsc_get_request(req);
1015     insert_request(&mdsc->request_tree, req);
1016 
1017     req->r_cred = get_current_cred();
1018 
1019     if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
1020         mdsc->oldest_tid = req->r_tid;
1021 
1022     if (dir) {
1023         struct ceph_inode_info *ci = ceph_inode(dir);
1024 
1025         ihold(dir);
1026         req->r_unsafe_dir = dir;
1027         spin_lock(&ci->i_unsafe_lock);
1028         list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
1029         spin_unlock(&ci->i_unsafe_lock);
1030     }
1031 }
1032 
1033 static void __unregister_request(struct ceph_mds_client *mdsc,
1034                  struct ceph_mds_request *req)
1035 {
1036     dout("__unregister_request %p tid %lld\n", req, req->r_tid);
1037 
1038     /* Never leave an unregistered request on an unsafe list! */
1039     list_del_init(&req->r_unsafe_item);
1040 
1041     if (req->r_tid == mdsc->oldest_tid) {
1042         struct rb_node *p = rb_next(&req->r_node);
1043         mdsc->oldest_tid = 0;
1044         while (p) {
1045             struct ceph_mds_request *next_req =
1046                 rb_entry(p, struct ceph_mds_request, r_node);
1047             if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
1048                 mdsc->oldest_tid = next_req->r_tid;
1049                 break;
1050             }
1051             p = rb_next(p);
1052         }
1053     }
1054 
1055     erase_request(&mdsc->request_tree, req);
1056 
1057     if (req->r_unsafe_dir) {
1058         struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
1059         spin_lock(&ci->i_unsafe_lock);
1060         list_del_init(&req->r_unsafe_dir_item);
1061         spin_unlock(&ci->i_unsafe_lock);
1062     }
1063     if (req->r_target_inode &&
1064         test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
1065         struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
1066         spin_lock(&ci->i_unsafe_lock);
1067         list_del_init(&req->r_unsafe_target_item);
1068         spin_unlock(&ci->i_unsafe_lock);
1069     }
1070 
1071     if (req->r_unsafe_dir) {
1072         iput(req->r_unsafe_dir);
1073         req->r_unsafe_dir = NULL;
1074     }
1075 
1076     complete_all(&req->r_safe_completion);
1077 
1078     ceph_mdsc_put_request(req);
1079 }
1080 
1081 /*
1082  * Walk back up the dentry tree until we hit a dentry representing a
1083  * non-snapshot inode. We do this using the rcu_read_lock (which must be held
1084  * when calling this) to ensure that the objects won't disappear while we're
1085  * working with them. Once we hit a candidate dentry, we attempt to take a
1086  * reference to it, and return that as the result.
1087  */
1088 static struct inode *get_nonsnap_parent(struct dentry *dentry)
1089 {
1090     struct inode *inode = NULL;
1091 
1092     while (dentry && !IS_ROOT(dentry)) {
1093         inode = d_inode_rcu(dentry);
1094         if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
1095             break;
1096         dentry = dentry->d_parent;
1097     }
1098     if (inode)
1099         inode = igrab(inode);
1100     return inode;
1101 }
1102 
1103 /*
1104  * Choose mds to send request to next.  If there is a hint set in the
1105  * request (e.g., due to a prior forward hint from the mds), use that.
1106  * Otherwise, consult frag tree and/or caps to identify the
1107  * appropriate mds.  If all else fails, choose randomly.
1108  *
1109  * Called under mdsc->mutex.
1110  */
1111 static int __choose_mds(struct ceph_mds_client *mdsc,
1112             struct ceph_mds_request *req,
1113             bool *random)
1114 {
1115     struct inode *inode;
1116     struct ceph_inode_info *ci;
1117     struct ceph_cap *cap;
1118     int mode = req->r_direct_mode;
1119     int mds = -1;
1120     u32 hash = req->r_direct_hash;
1121     bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1122 
1123     if (random)
1124         *random = false;
1125 
1126     /*
1127      * is there a specific mds we should try?  ignore hint if we have
1128      * no session and the mds is not up (active or recovering).
1129      */
1130     if (req->r_resend_mds >= 0 &&
1131         (__have_session(mdsc, req->r_resend_mds) ||
1132          ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1133         dout("%s using resend_mds mds%d\n", __func__,
1134              req->r_resend_mds);
1135         return req->r_resend_mds;
1136     }
1137 
1138     if (mode == USE_RANDOM_MDS)
1139         goto random;
1140 
1141     inode = NULL;
1142     if (req->r_inode) {
1143         if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1144             inode = req->r_inode;
1145             ihold(inode);
1146         } else {
1147             /* req->r_dentry is non-null for LSSNAP request */
1148             rcu_read_lock();
1149             inode = get_nonsnap_parent(req->r_dentry);
1150             rcu_read_unlock();
1151             dout("%s using snapdir's parent %p\n", __func__, inode);
1152         }
1153     } else if (req->r_dentry) {
1154         /* ignore race with rename; old or new d_parent is okay */
1155         struct dentry *parent;
1156         struct inode *dir;
1157 
1158         rcu_read_lock();
1159         parent = READ_ONCE(req->r_dentry->d_parent);
1160         dir = req->r_parent ? : d_inode_rcu(parent);
1161 
1162         if (!dir || dir->i_sb != mdsc->fsc->sb) {
1163             /*  not this fs or parent went negative */
1164             inode = d_inode(req->r_dentry);
1165             if (inode)
1166                 ihold(inode);
1167         } else if (ceph_snap(dir) != CEPH_NOSNAP) {
1168             /* direct snapped/virtual snapdir requests
1169              * based on parent dir inode */
1170             inode = get_nonsnap_parent(parent);
1171             dout("%s using nonsnap parent %p\n", __func__, inode);
1172         } else {
1173             /* dentry target */
1174             inode = d_inode(req->r_dentry);
1175             if (!inode || mode == USE_AUTH_MDS) {
1176                 /* dir + name */
1177                 inode = igrab(dir);
1178                 hash = ceph_dentry_hash(dir, req->r_dentry);
1179                 is_hash = true;
1180             } else {
1181                 ihold(inode);
1182             }
1183         }
1184         rcu_read_unlock();
1185     }
1186 
1187     dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1188          hash, mode);
1189     if (!inode)
1190         goto random;
1191     ci = ceph_inode(inode);
1192 
1193     if (is_hash && S_ISDIR(inode->i_mode)) {
1194         struct ceph_inode_frag frag;
1195         int found;
1196 
1197         ceph_choose_frag(ci, hash, &frag, &found);
1198         if (found) {
1199             if (mode == USE_ANY_MDS && frag.ndist > 0) {
1200                 u8 r;
1201 
1202                 /* choose a random replica */
1203                 get_random_bytes(&r, 1);
1204                 r %= frag.ndist;
1205                 mds = frag.dist[r];
1206                 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1207                      __func__, inode, ceph_vinop(inode),
1208                      frag.frag, mds, (int)r, frag.ndist);
1209                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1210                     CEPH_MDS_STATE_ACTIVE &&
1211                     !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1212                     goto out;
1213             }
1214 
1215             /* since this file/dir wasn't known to be
1216              * replicated, then we want to look for the
1217              * authoritative mds. */
1218             if (frag.mds >= 0) {
1219                 /* choose auth mds */
1220                 mds = frag.mds;
1221                 dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1222                      __func__, inode, ceph_vinop(inode),
1223                      frag.frag, mds);
1224                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1225                     CEPH_MDS_STATE_ACTIVE) {
1226                     if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1227                                   mds))
1228                         goto out;
1229                 }
1230             }
1231             mode = USE_AUTH_MDS;
1232         }
1233     }
1234 
1235     spin_lock(&ci->i_ceph_lock);
1236     cap = NULL;
1237     if (mode == USE_AUTH_MDS)
1238         cap = ci->i_auth_cap;
1239     if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1240         cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1241     if (!cap) {
1242         spin_unlock(&ci->i_ceph_lock);
1243         iput(inode);
1244         goto random;
1245     }
1246     mds = cap->session->s_mds;
1247     dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
1248          inode, ceph_vinop(inode), mds,
1249          cap == ci->i_auth_cap ? "auth " : "", cap);
1250     spin_unlock(&ci->i_ceph_lock);
1251 out:
1252     iput(inode);
1253     return mds;
1254 
1255 random:
1256     if (random)
1257         *random = true;
1258 
1259     mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1260     dout("%s chose random mds%d\n", __func__, mds);
1261     return mds;
1262 }
1263 
1264 
1265 /*
1266  * session messages
1267  */
1268 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
1269 {
1270     struct ceph_msg *msg;
1271     struct ceph_mds_session_head *h;
1272 
1273     msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1274                false);
1275     if (!msg) {
1276         pr_err("ENOMEM creating session %s msg\n",
1277                ceph_session_op_name(op));
1278         return NULL;
1279     }
1280     h = msg->front.iov_base;
1281     h->op = cpu_to_le32(op);
1282     h->seq = cpu_to_le64(seq);
1283 
1284     return msg;
1285 }
1286 
1287 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1288 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1289 static int encode_supported_features(void **p, void *end)
1290 {
1291     static const size_t count = ARRAY_SIZE(feature_bits);
1292 
1293     if (count > 0) {
1294         size_t i;
1295         size_t size = FEATURE_BYTES(count);
1296         unsigned long bit;
1297 
1298         if (WARN_ON_ONCE(*p + 4 + size > end))
1299             return -ERANGE;
1300 
1301         ceph_encode_32(p, size);
1302         memset(*p, 0, size);
1303         for (i = 0; i < count; i++) {
1304             bit = feature_bits[i];
1305             ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
1306         }
1307         *p += size;
1308     } else {
1309         if (WARN_ON_ONCE(*p + 4 > end))
1310             return -ERANGE;
1311 
1312         ceph_encode_32(p, 0);
1313     }
1314 
1315     return 0;
1316 }
1317 
1318 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1319 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
1320 static int encode_metric_spec(void **p, void *end)
1321 {
1322     static const size_t count = ARRAY_SIZE(metric_bits);
1323 
1324     /* header */
1325     if (WARN_ON_ONCE(*p + 2 > end))
1326         return -ERANGE;
1327 
1328     ceph_encode_8(p, 1); /* version */
1329     ceph_encode_8(p, 1); /* compat */
1330 
1331     if (count > 0) {
1332         size_t i;
1333         size_t size = METRIC_BYTES(count);
1334 
1335         if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1336             return -ERANGE;
1337 
1338         /* metric spec info length */
1339         ceph_encode_32(p, 4 + size);
1340 
1341         /* metric spec */
1342         ceph_encode_32(p, size);
1343         memset(*p, 0, size);
1344         for (i = 0; i < count; i++)
1345             ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1346         *p += size;
1347     } else {
1348         if (WARN_ON_ONCE(*p + 4 + 4 > end))
1349             return -ERANGE;
1350 
1351         /* metric spec info length */
1352         ceph_encode_32(p, 4);
1353         /* metric spec */
1354         ceph_encode_32(p, 0);
1355     }
1356 
1357     return 0;
1358 }
1359 
1360 /*
1361  * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1362  * to include additional client metadata fields.
1363  */
1364 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1365 {
1366     struct ceph_msg *msg;
1367     struct ceph_mds_session_head *h;
1368     int i;
1369     int extra_bytes = 0;
1370     int metadata_key_count = 0;
1371     struct ceph_options *opt = mdsc->fsc->client->options;
1372     struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1373     size_t size, count;
1374     void *p, *end;
1375     int ret;
1376 
1377     const char* metadata[][2] = {
1378         {"hostname", mdsc->nodename},
1379         {"kernel_version", init_utsname()->release},
1380         {"entity_id", opt->name ? : ""},
1381         {"root", fsopt->server_path ? : "/"},
1382         {NULL, NULL}
1383     };
1384 
1385     /* Calculate serialized length of metadata */
1386     extra_bytes = 4;  /* map length */
1387     for (i = 0; metadata[i][0]; ++i) {
1388         extra_bytes += 8 + strlen(metadata[i][0]) +
1389             strlen(metadata[i][1]);
1390         metadata_key_count++;
1391     }
1392 
1393     /* supported feature */
1394     size = 0;
1395     count = ARRAY_SIZE(feature_bits);
1396     if (count > 0)
1397         size = FEATURE_BYTES(count);
1398     extra_bytes += 4 + size;
1399 
1400     /* metric spec */
1401     size = 0;
1402     count = ARRAY_SIZE(metric_bits);
1403     if (count > 0)
1404         size = METRIC_BYTES(count);
1405     extra_bytes += 2 + 4 + 4 + size;
1406 
1407     /* Allocate the message */
1408     msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1409                GFP_NOFS, false);
1410     if (!msg) {
1411         pr_err("ENOMEM creating session open msg\n");
1412         return ERR_PTR(-ENOMEM);
1413     }
1414     p = msg->front.iov_base;
1415     end = p + msg->front.iov_len;
1416 
1417     h = p;
1418     h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1419     h->seq = cpu_to_le64(seq);
1420 
1421     /*
1422      * Serialize client metadata into waiting buffer space, using
1423      * the format that userspace expects for map<string, string>
1424      *
1425      * ClientSession messages with metadata are v4
1426      */
1427     msg->hdr.version = cpu_to_le16(4);
1428     msg->hdr.compat_version = cpu_to_le16(1);
1429 
1430     /* The write pointer, following the session_head structure */
1431     p += sizeof(*h);
1432 
1433     /* Number of entries in the map */
1434     ceph_encode_32(&p, metadata_key_count);
1435 
1436     /* Two length-prefixed strings for each entry in the map */
1437     for (i = 0; metadata[i][0]; ++i) {
1438         size_t const key_len = strlen(metadata[i][0]);
1439         size_t const val_len = strlen(metadata[i][1]);
1440 
1441         ceph_encode_32(&p, key_len);
1442         memcpy(p, metadata[i][0], key_len);
1443         p += key_len;
1444         ceph_encode_32(&p, val_len);
1445         memcpy(p, metadata[i][1], val_len);
1446         p += val_len;
1447     }
1448 
1449     ret = encode_supported_features(&p, end);
1450     if (ret) {
1451         pr_err("encode_supported_features failed!\n");
1452         ceph_msg_put(msg);
1453         return ERR_PTR(ret);
1454     }
1455 
1456     ret = encode_metric_spec(&p, end);
1457     if (ret) {
1458         pr_err("encode_metric_spec failed!\n");
1459         ceph_msg_put(msg);
1460         return ERR_PTR(ret);
1461     }
1462 
1463     msg->front.iov_len = p - msg->front.iov_base;
1464     msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1465 
1466     return msg;
1467 }
1468 
1469 /*
1470  * send session open request.
1471  *
1472  * called under mdsc->mutex
1473  */
1474 static int __open_session(struct ceph_mds_client *mdsc,
1475               struct ceph_mds_session *session)
1476 {
1477     struct ceph_msg *msg;
1478     int mstate;
1479     int mds = session->s_mds;
1480 
1481     /* wait for mds to go active? */
1482     mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1483     dout("open_session to mds%d (%s)\n", mds,
1484          ceph_mds_state_name(mstate));
1485     session->s_state = CEPH_MDS_SESSION_OPENING;
1486     session->s_renew_requested = jiffies;
1487 
1488     /* send connect message */
1489     msg = create_session_open_msg(mdsc, session->s_seq);
1490     if (IS_ERR(msg))
1491         return PTR_ERR(msg);
1492     ceph_con_send(&session->s_con, msg);
1493     return 0;
1494 }
1495 
1496 /*
1497  * open sessions for any export targets for the given mds
1498  *
1499  * called under mdsc->mutex
1500  */
1501 static struct ceph_mds_session *
1502 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
1503 {
1504     struct ceph_mds_session *session;
1505     int ret;
1506 
1507     session = __ceph_lookup_mds_session(mdsc, target);
1508     if (!session) {
1509         session = register_session(mdsc, target);
1510         if (IS_ERR(session))
1511             return session;
1512     }
1513     if (session->s_state == CEPH_MDS_SESSION_NEW ||
1514         session->s_state == CEPH_MDS_SESSION_CLOSING) {
1515         ret = __open_session(mdsc, session);
1516         if (ret)
1517             return ERR_PTR(ret);
1518     }
1519 
1520     return session;
1521 }
1522 
1523 struct ceph_mds_session *
1524 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1525 {
1526     struct ceph_mds_session *session;
1527 
1528     dout("open_export_target_session to mds%d\n", target);
1529 
1530     mutex_lock(&mdsc->mutex);
1531     session = __open_export_target_session(mdsc, target);
1532     mutex_unlock(&mdsc->mutex);
1533 
1534     return session;
1535 }
1536 
1537 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1538                       struct ceph_mds_session *session)
1539 {
1540     struct ceph_mds_info *mi;
1541     struct ceph_mds_session *ts;
1542     int i, mds = session->s_mds;
1543 
1544     if (mds >= mdsc->mdsmap->possible_max_rank)
1545         return;
1546 
1547     mi = &mdsc->mdsmap->m_info[mds];
1548     dout("open_export_target_sessions for mds%d (%d targets)\n",
1549          session->s_mds, mi->num_export_targets);
1550 
1551     for (i = 0; i < mi->num_export_targets; i++) {
1552         ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1553         ceph_put_mds_session(ts);
1554     }
1555 }
1556 
1557 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1558                        struct ceph_mds_session *session)
1559 {
1560     mutex_lock(&mdsc->mutex);
1561     __open_export_target_sessions(mdsc, session);
1562     mutex_unlock(&mdsc->mutex);
1563 }
1564 
1565 /*
1566  * session caps
1567  */
1568 
1569 static void detach_cap_releases(struct ceph_mds_session *session,
1570                 struct list_head *target)
1571 {
1572     lockdep_assert_held(&session->s_cap_lock);
1573 
1574     list_splice_init(&session->s_cap_releases, target);
1575     session->s_num_cap_releases = 0;
1576     dout("dispose_cap_releases mds%d\n", session->s_mds);
1577 }
1578 
1579 static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1580                  struct list_head *dispose)
1581 {
1582     while (!list_empty(dispose)) {
1583         struct ceph_cap *cap;
1584         /* zero out the in-progress message */
1585         cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1586         list_del(&cap->session_caps);
1587         ceph_put_cap(mdsc, cap);
1588     }
1589 }
1590 
1591 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1592                      struct ceph_mds_session *session)
1593 {
1594     struct ceph_mds_request *req;
1595     struct rb_node *p;
1596 
1597     dout("cleanup_session_requests mds%d\n", session->s_mds);
1598     mutex_lock(&mdsc->mutex);
1599     while (!list_empty(&session->s_unsafe)) {
1600         req = list_first_entry(&session->s_unsafe,
1601                        struct ceph_mds_request, r_unsafe_item);
1602         pr_warn_ratelimited(" dropping unsafe request %llu\n",
1603                     req->r_tid);
1604         if (req->r_target_inode)
1605             mapping_set_error(req->r_target_inode->i_mapping, -EIO);
1606         if (req->r_unsafe_dir)
1607             mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
1608         __unregister_request(mdsc, req);
1609     }
1610     /* zero r_attempts, so kick_requests() will re-send requests */
1611     p = rb_first(&mdsc->request_tree);
1612     while (p) {
1613         req = rb_entry(p, struct ceph_mds_request, r_node);
1614         p = rb_next(p);
1615         if (req->r_session &&
1616             req->r_session->s_mds == session->s_mds)
1617             req->r_attempts = 0;
1618     }
1619     mutex_unlock(&mdsc->mutex);
1620 }
1621 
1622 /*
1623  * Helper to safely iterate over all caps associated with a session, with
1624  * special care taken to handle a racing __ceph_remove_cap().
1625  *
1626  * Caller must hold session s_mutex.
1627  */
1628 int ceph_iterate_session_caps(struct ceph_mds_session *session,
1629                   int (*cb)(struct inode *, struct ceph_cap *,
1630                     void *), void *arg)
1631 {
1632     struct list_head *p;
1633     struct ceph_cap *cap;
1634     struct inode *inode, *last_inode = NULL;
1635     struct ceph_cap *old_cap = NULL;
1636     int ret;
1637 
1638     dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1639     spin_lock(&session->s_cap_lock);
1640     p = session->s_caps.next;
1641     while (p != &session->s_caps) {
1642         cap = list_entry(p, struct ceph_cap, session_caps);
1643         inode = igrab(&cap->ci->netfs.inode);
1644         if (!inode) {
1645             p = p->next;
1646             continue;
1647         }
1648         session->s_cap_iterator = cap;
1649         spin_unlock(&session->s_cap_lock);
1650 
1651         if (last_inode) {
1652             iput(last_inode);
1653             last_inode = NULL;
1654         }
1655         if (old_cap) {
1656             ceph_put_cap(session->s_mdsc, old_cap);
1657             old_cap = NULL;
1658         }
1659 
1660         ret = cb(inode, cap, arg);
1661         last_inode = inode;
1662 
1663         spin_lock(&session->s_cap_lock);
1664         p = p->next;
1665         if (!cap->ci) {
1666             dout("iterate_session_caps  finishing cap %p removal\n",
1667                  cap);
1668             BUG_ON(cap->session != session);
1669             cap->session = NULL;
1670             list_del_init(&cap->session_caps);
1671             session->s_nr_caps--;
1672             atomic64_dec(&session->s_mdsc->metric.total_caps);
1673             if (cap->queue_release)
1674                 __ceph_queue_cap_release(session, cap);
1675             else
1676                 old_cap = cap;  /* put_cap it w/o locks held */
1677         }
1678         if (ret < 0)
1679             goto out;
1680     }
1681     ret = 0;
1682 out:
1683     session->s_cap_iterator = NULL;
1684     spin_unlock(&session->s_cap_lock);
1685 
1686     iput(last_inode);
1687     if (old_cap)
1688         ceph_put_cap(session->s_mdsc, old_cap);
1689 
1690     return ret;
1691 }
1692 
1693 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1694                   void *arg)
1695 {
1696     struct ceph_inode_info *ci = ceph_inode(inode);
1697     bool invalidate = false;
1698     int iputs;
1699 
1700     dout("removing cap %p, ci is %p, inode is %p\n",
1701          cap, ci, &ci->netfs.inode);
1702     spin_lock(&ci->i_ceph_lock);
1703     iputs = ceph_purge_inode_cap(inode, cap, &invalidate);
1704     spin_unlock(&ci->i_ceph_lock);
1705 
1706     wake_up_all(&ci->i_cap_wq);
1707     if (invalidate)
1708         ceph_queue_invalidate(inode);
1709     while (iputs--)
1710         iput(inode);
1711     return 0;
1712 }
1713 
1714 /*
1715  * caller must hold session s_mutex
1716  */
1717 static void remove_session_caps(struct ceph_mds_session *session)
1718 {
1719     struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1720     struct super_block *sb = fsc->sb;
1721     LIST_HEAD(dispose);
1722 
1723     dout("remove_session_caps on %p\n", session);
1724     ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1725 
1726     wake_up_all(&fsc->mdsc->cap_flushing_wq);
1727 
1728     spin_lock(&session->s_cap_lock);
1729     if (session->s_nr_caps > 0) {
1730         struct inode *inode;
1731         struct ceph_cap *cap, *prev = NULL;
1732         struct ceph_vino vino;
1733         /*
1734          * iterate_session_caps() skips inodes that are being
1735          * deleted, we need to wait until deletions are complete.
1736          * __wait_on_freeing_inode() is designed for the job,
1737          * but it is not exported, so use lookup inode function
1738          * to access it.
1739          */
1740         while (!list_empty(&session->s_caps)) {
1741             cap = list_entry(session->s_caps.next,
1742                      struct ceph_cap, session_caps);
1743             if (cap == prev)
1744                 break;
1745             prev = cap;
1746             vino = cap->ci->i_vino;
1747             spin_unlock(&session->s_cap_lock);
1748 
1749             inode = ceph_find_inode(sb, vino);
1750             iput(inode);
1751 
1752             spin_lock(&session->s_cap_lock);
1753         }
1754     }
1755 
1756     // drop cap expires and unlock s_cap_lock
1757     detach_cap_releases(session, &dispose);
1758 
1759     BUG_ON(session->s_nr_caps > 0);
1760     BUG_ON(!list_empty(&session->s_cap_flushing));
1761     spin_unlock(&session->s_cap_lock);
1762     dispose_cap_releases(session->s_mdsc, &dispose);
1763 }
1764 
1765 enum {
1766     RECONNECT,
1767     RENEWCAPS,
1768     FORCE_RO,
1769 };
1770 
1771 /*
1772  * wake up any threads waiting on this session's caps.  if the cap is
1773  * old (didn't get renewed on the client reconnect), remove it now.
1774  *
1775  * caller must hold s_mutex.
1776  */
1777 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1778                   void *arg)
1779 {
1780     struct ceph_inode_info *ci = ceph_inode(inode);
1781     unsigned long ev = (unsigned long)arg;
1782 
1783     if (ev == RECONNECT) {
1784         spin_lock(&ci->i_ceph_lock);
1785         ci->i_wanted_max_size = 0;
1786         ci->i_requested_max_size = 0;
1787         spin_unlock(&ci->i_ceph_lock);
1788     } else if (ev == RENEWCAPS) {
1789         if (cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) {
1790             /* mds did not re-issue stale cap */
1791             spin_lock(&ci->i_ceph_lock);
1792             cap->issued = cap->implemented = CEPH_CAP_PIN;
1793             spin_unlock(&ci->i_ceph_lock);
1794         }
1795     } else if (ev == FORCE_RO) {
1796     }
1797     wake_up_all(&ci->i_cap_wq);
1798     return 0;
1799 }
1800 
1801 static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1802 {
1803     dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1804     ceph_iterate_session_caps(session, wake_up_session_cb,
1805                   (void *)(unsigned long)ev);
1806 }
1807 
1808 /*
1809  * Send periodic message to MDS renewing all currently held caps.  The
1810  * ack will reset the expiration for all caps from this session.
1811  *
1812  * caller holds s_mutex
1813  */
1814 static int send_renew_caps(struct ceph_mds_client *mdsc,
1815                struct ceph_mds_session *session)
1816 {
1817     struct ceph_msg *msg;
1818     int state;
1819 
1820     if (time_after_eq(jiffies, session->s_cap_ttl) &&
1821         time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1822         pr_info("mds%d caps stale\n", session->s_mds);
1823     session->s_renew_requested = jiffies;
1824 
1825     /* do not try to renew caps until a recovering mds has reconnected
1826      * with its clients. */
1827     state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1828     if (state < CEPH_MDS_STATE_RECONNECT) {
1829         dout("send_renew_caps ignoring mds%d (%s)\n",
1830              session->s_mds, ceph_mds_state_name(state));
1831         return 0;
1832     }
1833 
1834     dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1835         ceph_mds_state_name(state));
1836     msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1837                       ++session->s_renew_seq);
1838     if (!msg)
1839         return -ENOMEM;
1840     ceph_con_send(&session->s_con, msg);
1841     return 0;
1842 }
1843 
1844 static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1845                  struct ceph_mds_session *session, u64 seq)
1846 {
1847     struct ceph_msg *msg;
1848 
1849     dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1850          session->s_mds, ceph_session_state_name(session->s_state), seq);
1851     msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1852     if (!msg)
1853         return -ENOMEM;
1854     ceph_con_send(&session->s_con, msg);
1855     return 0;
1856 }
1857 
1858 
1859 /*
1860  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1861  *
1862  * Called under session->s_mutex
1863  */
1864 static void renewed_caps(struct ceph_mds_client *mdsc,
1865              struct ceph_mds_session *session, int is_renew)
1866 {
1867     int was_stale;
1868     int wake = 0;
1869 
1870     spin_lock(&session->s_cap_lock);
1871     was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1872 
1873     session->s_cap_ttl = session->s_renew_requested +
1874         mdsc->mdsmap->m_session_timeout*HZ;
1875 
1876     if (was_stale) {
1877         if (time_before(jiffies, session->s_cap_ttl)) {
1878             pr_info("mds%d caps renewed\n", session->s_mds);
1879             wake = 1;
1880         } else {
1881             pr_info("mds%d caps still stale\n", session->s_mds);
1882         }
1883     }
1884     dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1885          session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1886          time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1887     spin_unlock(&session->s_cap_lock);
1888 
1889     if (wake)
1890         wake_up_session_caps(session, RENEWCAPS);
1891 }
1892 
1893 /*
1894  * send a session close request
1895  */
1896 static int request_close_session(struct ceph_mds_session *session)
1897 {
1898     struct ceph_msg *msg;
1899 
1900     dout("request_close_session mds%d state %s seq %lld\n",
1901          session->s_mds, ceph_session_state_name(session->s_state),
1902          session->s_seq);
1903     msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
1904                       session->s_seq);
1905     if (!msg)
1906         return -ENOMEM;
1907     ceph_con_send(&session->s_con, msg);
1908     return 1;
1909 }
1910 
1911 /*
1912  * Called with s_mutex held.
1913  */
1914 static int __close_session(struct ceph_mds_client *mdsc,
1915              struct ceph_mds_session *session)
1916 {
1917     if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1918         return 0;
1919     session->s_state = CEPH_MDS_SESSION_CLOSING;
1920     return request_close_session(session);
1921 }
1922 
1923 static bool drop_negative_children(struct dentry *dentry)
1924 {
1925     struct dentry *child;
1926     bool all_negative = true;
1927 
1928     if (!d_is_dir(dentry))
1929         goto out;
1930 
1931     spin_lock(&dentry->d_lock);
1932     list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1933         if (d_really_is_positive(child)) {
1934             all_negative = false;
1935             break;
1936         }
1937     }
1938     spin_unlock(&dentry->d_lock);
1939 
1940     if (all_negative)
1941         shrink_dcache_parent(dentry);
1942 out:
1943     return all_negative;
1944 }
1945 
1946 /*
1947  * Trim old(er) caps.
1948  *
1949  * Because we can't cache an inode without one or more caps, we do
1950  * this indirectly: if a cap is unused, we prune its aliases, at which
1951  * point the inode will hopefully get dropped to.
1952  *
1953  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1954  * memory pressure from the MDS, though, so it needn't be perfect.
1955  */
1956 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1957 {
1958     int *remaining = arg;
1959     struct ceph_inode_info *ci = ceph_inode(inode);
1960     int used, wanted, oissued, mine;
1961 
1962     if (*remaining <= 0)
1963         return -1;
1964 
1965     spin_lock(&ci->i_ceph_lock);
1966     mine = cap->issued | cap->implemented;
1967     used = __ceph_caps_used(ci);
1968     wanted = __ceph_caps_file_wanted(ci);
1969     oissued = __ceph_caps_issued_other(ci, cap);
1970 
1971     dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1972          inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1973          ceph_cap_string(used), ceph_cap_string(wanted));
1974     if (cap == ci->i_auth_cap) {
1975         if (ci->i_dirty_caps || ci->i_flushing_caps ||
1976             !list_empty(&ci->i_cap_snaps))
1977             goto out;
1978         if ((used | wanted) & CEPH_CAP_ANY_WR)
1979             goto out;
1980         /* Note: it's possible that i_filelock_ref becomes non-zero
1981          * after dropping auth caps. It doesn't hurt because reply
1982          * of lock mds request will re-add auth caps. */
1983         if (atomic_read(&ci->i_filelock_ref) > 0)
1984             goto out;
1985     }
1986     /* The inode has cached pages, but it's no longer used.
1987      * we can safely drop it */
1988     if (S_ISREG(inode->i_mode) &&
1989         wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1990         !(oissued & CEPH_CAP_FILE_CACHE)) {
1991       used = 0;
1992       oissued = 0;
1993     }
1994     if ((used | wanted) & ~oissued & mine)
1995         goto out;   /* we need these caps */
1996 
1997     if (oissued) {
1998         /* we aren't the only cap.. just remove us */
1999         ceph_remove_cap(cap, true);
2000         (*remaining)--;
2001     } else {
2002         struct dentry *dentry;
2003         /* try dropping referring dentries */
2004         spin_unlock(&ci->i_ceph_lock);
2005         dentry = d_find_any_alias(inode);
2006         if (dentry && drop_negative_children(dentry)) {
2007             int count;
2008             dput(dentry);
2009             d_prune_aliases(inode);
2010             count = atomic_read(&inode->i_count);
2011             if (count == 1)
2012                 (*remaining)--;
2013             dout("trim_caps_cb %p cap %p pruned, count now %d\n",
2014                  inode, cap, count);
2015         } else {
2016             dput(dentry);
2017         }
2018         return 0;
2019     }
2020 
2021 out:
2022     spin_unlock(&ci->i_ceph_lock);
2023     return 0;
2024 }
2025 
2026 /*
2027  * Trim session cap count down to some max number.
2028  */
2029 int ceph_trim_caps(struct ceph_mds_client *mdsc,
2030            struct ceph_mds_session *session,
2031            int max_caps)
2032 {
2033     int trim_caps = session->s_nr_caps - max_caps;
2034 
2035     dout("trim_caps mds%d start: %d / %d, trim %d\n",
2036          session->s_mds, session->s_nr_caps, max_caps, trim_caps);
2037     if (trim_caps > 0) {
2038         int remaining = trim_caps;
2039 
2040         ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2041         dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
2042              session->s_mds, session->s_nr_caps, max_caps,
2043             trim_caps - remaining);
2044     }
2045 
2046     ceph_flush_cap_releases(mdsc, session);
2047     return 0;
2048 }
2049 
2050 static int check_caps_flush(struct ceph_mds_client *mdsc,
2051                 u64 want_flush_tid)
2052 {
2053     int ret = 1;
2054 
2055     spin_lock(&mdsc->cap_dirty_lock);
2056     if (!list_empty(&mdsc->cap_flush_list)) {
2057         struct ceph_cap_flush *cf =
2058             list_first_entry(&mdsc->cap_flush_list,
2059                      struct ceph_cap_flush, g_list);
2060         if (cf->tid <= want_flush_tid) {
2061             dout("check_caps_flush still flushing tid "
2062                  "%llu <= %llu\n", cf->tid, want_flush_tid);
2063             ret = 0;
2064         }
2065     }
2066     spin_unlock(&mdsc->cap_dirty_lock);
2067     return ret;
2068 }
2069 
2070 /*
2071  * flush all dirty inode data to disk.
2072  *
2073  * returns true if we've flushed through want_flush_tid
2074  */
2075 static void wait_caps_flush(struct ceph_mds_client *mdsc,
2076                 u64 want_flush_tid)
2077 {
2078     dout("check_caps_flush want %llu\n", want_flush_tid);
2079 
2080     wait_event(mdsc->cap_flushing_wq,
2081            check_caps_flush(mdsc, want_flush_tid));
2082 
2083     dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
2084 }
2085 
2086 /*
2087  * called under s_mutex
2088  */
2089 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2090                    struct ceph_mds_session *session)
2091 {
2092     struct ceph_msg *msg = NULL;
2093     struct ceph_mds_cap_release *head;
2094     struct ceph_mds_cap_item *item;
2095     struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2096     struct ceph_cap *cap;
2097     LIST_HEAD(tmp_list);
2098     int num_cap_releases;
2099     __le32  barrier, *cap_barrier;
2100 
2101     down_read(&osdc->lock);
2102     barrier = cpu_to_le32(osdc->epoch_barrier);
2103     up_read(&osdc->lock);
2104 
2105     spin_lock(&session->s_cap_lock);
2106 again:
2107     list_splice_init(&session->s_cap_releases, &tmp_list);
2108     num_cap_releases = session->s_num_cap_releases;
2109     session->s_num_cap_releases = 0;
2110     spin_unlock(&session->s_cap_lock);
2111 
2112     while (!list_empty(&tmp_list)) {
2113         if (!msg) {
2114             msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2115                     PAGE_SIZE, GFP_NOFS, false);
2116             if (!msg)
2117                 goto out_err;
2118             head = msg->front.iov_base;
2119             head->num = cpu_to_le32(0);
2120             msg->front.iov_len = sizeof(*head);
2121 
2122             msg->hdr.version = cpu_to_le16(2);
2123             msg->hdr.compat_version = cpu_to_le16(1);
2124         }
2125 
2126         cap = list_first_entry(&tmp_list, struct ceph_cap,
2127                     session_caps);
2128         list_del(&cap->session_caps);
2129         num_cap_releases--;
2130 
2131         head = msg->front.iov_base;
2132         put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2133                    &head->num);
2134         item = msg->front.iov_base + msg->front.iov_len;
2135         item->ino = cpu_to_le64(cap->cap_ino);
2136         item->cap_id = cpu_to_le64(cap->cap_id);
2137         item->migrate_seq = cpu_to_le32(cap->mseq);
2138         item->seq = cpu_to_le32(cap->issue_seq);
2139         msg->front.iov_len += sizeof(*item);
2140 
2141         ceph_put_cap(mdsc, cap);
2142 
2143         if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2144             // Append cap_barrier field
2145             cap_barrier = msg->front.iov_base + msg->front.iov_len;
2146             *cap_barrier = barrier;
2147             msg->front.iov_len += sizeof(*cap_barrier);
2148 
2149             msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2150             dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2151             ceph_con_send(&session->s_con, msg);
2152             msg = NULL;
2153         }
2154     }
2155 
2156     BUG_ON(num_cap_releases != 0);
2157 
2158     spin_lock(&session->s_cap_lock);
2159     if (!list_empty(&session->s_cap_releases))
2160         goto again;
2161     spin_unlock(&session->s_cap_lock);
2162 
2163     if (msg) {
2164         // Append cap_barrier field
2165         cap_barrier = msg->front.iov_base + msg->front.iov_len;
2166         *cap_barrier = barrier;
2167         msg->front.iov_len += sizeof(*cap_barrier);
2168 
2169         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2170         dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2171         ceph_con_send(&session->s_con, msg);
2172     }
2173     return;
2174 out_err:
2175     pr_err("send_cap_releases mds%d, failed to allocate message\n",
2176         session->s_mds);
2177     spin_lock(&session->s_cap_lock);
2178     list_splice(&tmp_list, &session->s_cap_releases);
2179     session->s_num_cap_releases += num_cap_releases;
2180     spin_unlock(&session->s_cap_lock);
2181 }
2182 
2183 static void ceph_cap_release_work(struct work_struct *work)
2184 {
2185     struct ceph_mds_session *session =
2186         container_of(work, struct ceph_mds_session, s_cap_release_work);
2187 
2188     mutex_lock(&session->s_mutex);
2189     if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2190         session->s_state == CEPH_MDS_SESSION_HUNG)
2191         ceph_send_cap_releases(session->s_mdsc, session);
2192     mutex_unlock(&session->s_mutex);
2193     ceph_put_mds_session(session);
2194 }
2195 
2196 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2197                      struct ceph_mds_session *session)
2198 {
2199     if (mdsc->stopping)
2200         return;
2201 
2202     ceph_get_mds_session(session);
2203     if (queue_work(mdsc->fsc->cap_wq,
2204                &session->s_cap_release_work)) {
2205         dout("cap release work queued\n");
2206     } else {
2207         ceph_put_mds_session(session);
2208         dout("failed to queue cap release work\n");
2209     }
2210 }
2211 
2212 /*
2213  * caller holds session->s_cap_lock
2214  */
2215 void __ceph_queue_cap_release(struct ceph_mds_session *session,
2216                   struct ceph_cap *cap)
2217 {
2218     list_add_tail(&cap->session_caps, &session->s_cap_releases);
2219     session->s_num_cap_releases++;
2220 
2221     if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2222         ceph_flush_cap_releases(session->s_mdsc, session);
2223 }
2224 
2225 static void ceph_cap_reclaim_work(struct work_struct *work)
2226 {
2227     struct ceph_mds_client *mdsc =
2228         container_of(work, struct ceph_mds_client, cap_reclaim_work);
2229     int ret = ceph_trim_dentries(mdsc);
2230     if (ret == -EAGAIN)
2231         ceph_queue_cap_reclaim_work(mdsc);
2232 }
2233 
2234 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2235 {
2236     if (mdsc->stopping)
2237         return;
2238 
2239         if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2240                 dout("caps reclaim work queued\n");
2241         } else {
2242                 dout("failed to queue caps release work\n");
2243         }
2244 }
2245 
2246 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2247 {
2248     int val;
2249     if (!nr)
2250         return;
2251     val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2252     if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2253         atomic_set(&mdsc->cap_reclaim_pending, 0);
2254         ceph_queue_cap_reclaim_work(mdsc);
2255     }
2256 }
2257 
2258 /*
2259  * requests
2260  */
2261 
2262 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2263                     struct inode *dir)
2264 {
2265     struct ceph_inode_info *ci = ceph_inode(dir);
2266     struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2267     struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2268     size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2269     unsigned int num_entries;
2270     int order;
2271 
2272     spin_lock(&ci->i_ceph_lock);
2273     num_entries = ci->i_files + ci->i_subdirs;
2274     spin_unlock(&ci->i_ceph_lock);
2275     num_entries = max(num_entries, 1U);
2276     num_entries = min(num_entries, opt->max_readdir);
2277 
2278     order = get_order(size * num_entries);
2279     while (order >= 0) {
2280         rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2281                                  __GFP_NOWARN |
2282                                  __GFP_ZERO,
2283                                  order);
2284         if (rinfo->dir_entries)
2285             break;
2286         order--;
2287     }
2288     if (!rinfo->dir_entries)
2289         return -ENOMEM;
2290 
2291     num_entries = (PAGE_SIZE << order) / size;
2292     num_entries = min(num_entries, opt->max_readdir);
2293 
2294     rinfo->dir_buf_size = PAGE_SIZE << order;
2295     req->r_num_caps = num_entries + 1;
2296     req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2297     req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2298     return 0;
2299 }
2300 
2301 /*
2302  * Create an mds request.
2303  */
2304 struct ceph_mds_request *
2305 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2306 {
2307     struct ceph_mds_request *req;
2308 
2309     req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2310     if (!req)
2311         return ERR_PTR(-ENOMEM);
2312 
2313     mutex_init(&req->r_fill_mutex);
2314     req->r_mdsc = mdsc;
2315     req->r_started = jiffies;
2316     req->r_start_latency = ktime_get();
2317     req->r_resend_mds = -1;
2318     INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2319     INIT_LIST_HEAD(&req->r_unsafe_target_item);
2320     req->r_fmode = -1;
2321     kref_init(&req->r_kref);
2322     RB_CLEAR_NODE(&req->r_node);
2323     INIT_LIST_HEAD(&req->r_wait);
2324     init_completion(&req->r_completion);
2325     init_completion(&req->r_safe_completion);
2326     INIT_LIST_HEAD(&req->r_unsafe_item);
2327 
2328     ktime_get_coarse_real_ts64(&req->r_stamp);
2329 
2330     req->r_op = op;
2331     req->r_direct_mode = mode;
2332     return req;
2333 }
2334 
2335 /*
2336  * return oldest (lowest) request, tid in request tree, 0 if none.
2337  *
2338  * called under mdsc->mutex.
2339  */
2340 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2341 {
2342     if (RB_EMPTY_ROOT(&mdsc->request_tree))
2343         return NULL;
2344     return rb_entry(rb_first(&mdsc->request_tree),
2345             struct ceph_mds_request, r_node);
2346 }
2347 
2348 static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2349 {
2350     return mdsc->oldest_tid;
2351 }
2352 
2353 /*
2354  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
2355  * on build_path_from_dentry in fs/cifs/dir.c.
2356  *
2357  * If @stop_on_nosnap, generate path relative to the first non-snapped
2358  * inode.
2359  *
2360  * Encode hidden .snap dirs as a double /, i.e.
2361  *   foo/.snap/bar -> foo//bar
2362  */
2363 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2364                int stop_on_nosnap)
2365 {
2366     struct dentry *temp;
2367     char *path;
2368     int pos;
2369     unsigned seq;
2370     u64 base;
2371 
2372     if (!dentry)
2373         return ERR_PTR(-EINVAL);
2374 
2375     path = __getname();
2376     if (!path)
2377         return ERR_PTR(-ENOMEM);
2378 retry:
2379     pos = PATH_MAX - 1;
2380     path[pos] = '\0';
2381 
2382     seq = read_seqbegin(&rename_lock);
2383     rcu_read_lock();
2384     temp = dentry;
2385     for (;;) {
2386         struct inode *inode;
2387 
2388         spin_lock(&temp->d_lock);
2389         inode = d_inode(temp);
2390         if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2391             dout("build_path path+%d: %p SNAPDIR\n",
2392                  pos, temp);
2393         } else if (stop_on_nosnap && inode && dentry != temp &&
2394                ceph_snap(inode) == CEPH_NOSNAP) {
2395             spin_unlock(&temp->d_lock);
2396             pos++; /* get rid of any prepended '/' */
2397             break;
2398         } else {
2399             pos -= temp->d_name.len;
2400             if (pos < 0) {
2401                 spin_unlock(&temp->d_lock);
2402                 break;
2403             }
2404             memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2405         }
2406         spin_unlock(&temp->d_lock);
2407         temp = READ_ONCE(temp->d_parent);
2408 
2409         /* Are we at the root? */
2410         if (IS_ROOT(temp))
2411             break;
2412 
2413         /* Are we out of buffer? */
2414         if (--pos < 0)
2415             break;
2416 
2417         path[pos] = '/';
2418     }
2419     base = ceph_ino(d_inode(temp));
2420     rcu_read_unlock();
2421 
2422     if (read_seqretry(&rename_lock, seq))
2423         goto retry;
2424 
2425     if (pos < 0) {
2426         /*
2427          * A rename didn't occur, but somehow we didn't end up where
2428          * we thought we would. Throw a warning and try again.
2429          */
2430         pr_warn("build_path did not end path lookup where "
2431             "expected, pos is %d\n", pos);
2432         goto retry;
2433     }
2434 
2435     *pbase = base;
2436     *plen = PATH_MAX - 1 - pos;
2437     dout("build_path on %p %d built %llx '%.*s'\n",
2438          dentry, d_count(dentry), base, *plen, path + pos);
2439     return path + pos;
2440 }
2441 
2442 static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2443                  const char **ppath, int *ppathlen, u64 *pino,
2444                  bool *pfreepath, bool parent_locked)
2445 {
2446     char *path;
2447 
2448     rcu_read_lock();
2449     if (!dir)
2450         dir = d_inode_rcu(dentry->d_parent);
2451     if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
2452         *pino = ceph_ino(dir);
2453         rcu_read_unlock();
2454         *ppath = dentry->d_name.name;
2455         *ppathlen = dentry->d_name.len;
2456         return 0;
2457     }
2458     rcu_read_unlock();
2459     path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2460     if (IS_ERR(path))
2461         return PTR_ERR(path);
2462     *ppath = path;
2463     *pfreepath = true;
2464     return 0;
2465 }
2466 
2467 static int build_inode_path(struct inode *inode,
2468                 const char **ppath, int *ppathlen, u64 *pino,
2469                 bool *pfreepath)
2470 {
2471     struct dentry *dentry;
2472     char *path;
2473 
2474     if (ceph_snap(inode) == CEPH_NOSNAP) {
2475         *pino = ceph_ino(inode);
2476         *ppathlen = 0;
2477         return 0;
2478     }
2479     dentry = d_find_alias(inode);
2480     path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2481     dput(dentry);
2482     if (IS_ERR(path))
2483         return PTR_ERR(path);
2484     *ppath = path;
2485     *pfreepath = true;
2486     return 0;
2487 }
2488 
2489 /*
2490  * request arguments may be specified via an inode *, a dentry *, or
2491  * an explicit ino+path.
2492  */
2493 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2494                   struct inode *rdiri, const char *rpath,
2495                   u64 rino, const char **ppath, int *pathlen,
2496                   u64 *ino, bool *freepath, bool parent_locked)
2497 {
2498     int r = 0;
2499 
2500     if (rinode) {
2501         r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2502         dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2503              ceph_snap(rinode));
2504     } else if (rdentry) {
2505         r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2506                     freepath, parent_locked);
2507         dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2508              *ppath);
2509     } else if (rpath || rino) {
2510         *ino = rino;
2511         *ppath = rpath;
2512         *pathlen = rpath ? strlen(rpath) : 0;
2513         dout(" path %.*s\n", *pathlen, rpath);
2514     }
2515 
2516     return r;
2517 }
2518 
2519 static void encode_timestamp_and_gids(void **p,
2520                       const struct ceph_mds_request *req)
2521 {
2522     struct ceph_timespec ts;
2523     int i;
2524 
2525     ceph_encode_timespec64(&ts, &req->r_stamp);
2526     ceph_encode_copy(p, &ts, sizeof(ts));
2527 
2528     /* gid_list */
2529     ceph_encode_32(p, req->r_cred->group_info->ngroups);
2530     for (i = 0; i < req->r_cred->group_info->ngroups; i++)
2531         ceph_encode_64(p, from_kgid(&init_user_ns,
2532                         req->r_cred->group_info->gid[i]));
2533 }
2534 
2535 /*
2536  * called under mdsc->mutex
2537  */
2538 static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
2539                            struct ceph_mds_request *req,
2540                            bool drop_cap_releases)
2541 {
2542     int mds = session->s_mds;
2543     struct ceph_mds_client *mdsc = session->s_mdsc;
2544     struct ceph_msg *msg;
2545     struct ceph_mds_request_head_old *head;
2546     const char *path1 = NULL;
2547     const char *path2 = NULL;
2548     u64 ino1 = 0, ino2 = 0;
2549     int pathlen1 = 0, pathlen2 = 0;
2550     bool freepath1 = false, freepath2 = false;
2551     int len;
2552     u16 releases;
2553     void *p, *end;
2554     int ret;
2555     bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
2556 
2557     ret = set_request_path_attr(req->r_inode, req->r_dentry,
2558                   req->r_parent, req->r_path1, req->r_ino1.ino,
2559                   &path1, &pathlen1, &ino1, &freepath1,
2560                   test_bit(CEPH_MDS_R_PARENT_LOCKED,
2561                     &req->r_req_flags));
2562     if (ret < 0) {
2563         msg = ERR_PTR(ret);
2564         goto out;
2565     }
2566 
2567     /* If r_old_dentry is set, then assume that its parent is locked */
2568     ret = set_request_path_attr(NULL, req->r_old_dentry,
2569                   req->r_old_dentry_dir,
2570                   req->r_path2, req->r_ino2.ino,
2571                   &path2, &pathlen2, &ino2, &freepath2, true);
2572     if (ret < 0) {
2573         msg = ERR_PTR(ret);
2574         goto out_free1;
2575     }
2576 
2577     len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head);
2578     len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2579         sizeof(struct ceph_timespec);
2580     len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
2581 
2582     /* calculate (max) length for cap releases */
2583     len += sizeof(struct ceph_mds_request_release) *
2584         (!!req->r_inode_drop + !!req->r_dentry_drop +
2585          !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2586 
2587     if (req->r_dentry_drop)
2588         len += pathlen1;
2589     if (req->r_old_dentry_drop)
2590         len += pathlen2;
2591 
2592     msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2593     if (!msg) {
2594         msg = ERR_PTR(-ENOMEM);
2595         goto out_free2;
2596     }
2597 
2598     msg->hdr.tid = cpu_to_le64(req->r_tid);
2599 
2600     /*
2601      * The old ceph_mds_request_head didn't contain a version field, and
2602      * one was added when we moved the message version from 3->4.
2603      */
2604     if (legacy) {
2605         msg->hdr.version = cpu_to_le16(3);
2606         head = msg->front.iov_base;
2607         p = msg->front.iov_base + sizeof(*head);
2608     } else {
2609         struct ceph_mds_request_head *new_head = msg->front.iov_base;
2610 
2611         msg->hdr.version = cpu_to_le16(4);
2612         new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
2613         head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
2614         p = msg->front.iov_base + sizeof(*new_head);
2615     }
2616 
2617     end = msg->front.iov_base + msg->front.iov_len;
2618 
2619     head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2620     head->op = cpu_to_le32(req->r_op);
2621     head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
2622                          req->r_cred->fsuid));
2623     head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
2624                          req->r_cred->fsgid));
2625     head->ino = cpu_to_le64(req->r_deleg_ino);
2626     head->args = req->r_args;
2627 
2628     ceph_encode_filepath(&p, end, ino1, path1);
2629     ceph_encode_filepath(&p, end, ino2, path2);
2630 
2631     /* make note of release offset, in case we need to replay */
2632     req->r_request_release_offset = p - msg->front.iov_base;
2633 
2634     /* cap releases */
2635     releases = 0;
2636     if (req->r_inode_drop)
2637         releases += ceph_encode_inode_release(&p,
2638               req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2639               mds, req->r_inode_drop, req->r_inode_unless,
2640               req->r_op == CEPH_MDS_OP_READDIR);
2641     if (req->r_dentry_drop)
2642         releases += ceph_encode_dentry_release(&p, req->r_dentry,
2643                 req->r_parent, mds, req->r_dentry_drop,
2644                 req->r_dentry_unless);
2645     if (req->r_old_dentry_drop)
2646         releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2647                 req->r_old_dentry_dir, mds,
2648                 req->r_old_dentry_drop,
2649                 req->r_old_dentry_unless);
2650     if (req->r_old_inode_drop)
2651         releases += ceph_encode_inode_release(&p,
2652               d_inode(req->r_old_dentry),
2653               mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2654 
2655     if (drop_cap_releases) {
2656         releases = 0;
2657         p = msg->front.iov_base + req->r_request_release_offset;
2658     }
2659 
2660     head->num_releases = cpu_to_le16(releases);
2661 
2662     encode_timestamp_and_gids(&p, req);
2663 
2664     if (WARN_ON_ONCE(p > end)) {
2665         ceph_msg_put(msg);
2666         msg = ERR_PTR(-ERANGE);
2667         goto out_free2;
2668     }
2669 
2670     msg->front.iov_len = p - msg->front.iov_base;
2671     msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2672 
2673     if (req->r_pagelist) {
2674         struct ceph_pagelist *pagelist = req->r_pagelist;
2675         ceph_msg_data_add_pagelist(msg, pagelist);
2676         msg->hdr.data_len = cpu_to_le32(pagelist->length);
2677     } else {
2678         msg->hdr.data_len = 0;
2679     }
2680 
2681     msg->hdr.data_off = cpu_to_le16(0);
2682 
2683 out_free2:
2684     if (freepath2)
2685         ceph_mdsc_free_path((char *)path2, pathlen2);
2686 out_free1:
2687     if (freepath1)
2688         ceph_mdsc_free_path((char *)path1, pathlen1);
2689 out:
2690     return msg;
2691 }
2692 
2693 /*
2694  * called under mdsc->mutex if error, under no mutex if
2695  * success.
2696  */
2697 static void complete_request(struct ceph_mds_client *mdsc,
2698                  struct ceph_mds_request *req)
2699 {
2700     req->r_end_latency = ktime_get();
2701 
2702     if (req->r_callback)
2703         req->r_callback(mdsc, req);
2704     complete_all(&req->r_completion);
2705 }
2706 
2707 static struct ceph_mds_request_head_old *
2708 find_old_request_head(void *p, u64 features)
2709 {
2710     bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
2711     struct ceph_mds_request_head *new_head;
2712 
2713     if (legacy)
2714         return (struct ceph_mds_request_head_old *)p;
2715     new_head = (struct ceph_mds_request_head *)p;
2716     return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
2717 }
2718 
2719 /*
2720  * called under mdsc->mutex
2721  */
2722 static int __prepare_send_request(struct ceph_mds_session *session,
2723                   struct ceph_mds_request *req,
2724                   bool drop_cap_releases)
2725 {
2726     int mds = session->s_mds;
2727     struct ceph_mds_client *mdsc = session->s_mdsc;
2728     struct ceph_mds_request_head_old *rhead;
2729     struct ceph_msg *msg;
2730     int flags = 0, max_retry;
2731 
2732     /*
2733      * The type of 'r_attempts' in kernel 'ceph_mds_request'
2734      * is 'int', while in 'ceph_mds_request_head' the type of
2735      * 'num_retry' is '__u8'. So in case the request retries
2736      *  exceeding 256 times, the MDS will receive a incorrect
2737      *  retry seq.
2738      *
2739      * In this case it's ususally a bug in MDS and continue
2740      * retrying the request makes no sense.
2741      *
2742      * In future this could be fixed in ceph code, so avoid
2743      * using the hardcode here.
2744      */
2745     max_retry = sizeof_field(struct ceph_mds_request_head, num_retry);
2746     max_retry = 1 << (max_retry * BITS_PER_BYTE);
2747     if (req->r_attempts >= max_retry) {
2748         pr_warn_ratelimited("%s request tid %llu seq overflow\n",
2749                     __func__, req->r_tid);
2750         return -EMULTIHOP;
2751     }
2752 
2753     req->r_attempts++;
2754     if (req->r_inode) {
2755         struct ceph_cap *cap =
2756             ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2757 
2758         if (cap)
2759             req->r_sent_on_mseq = cap->mseq;
2760         else
2761             req->r_sent_on_mseq = -1;
2762     }
2763     dout("%s %p tid %lld %s (attempt %d)\n", __func__, req,
2764          req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2765 
2766     if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2767         void *p;
2768 
2769         /*
2770          * Replay.  Do not regenerate message (and rebuild
2771          * paths, etc.); just use the original message.
2772          * Rebuilding paths will break for renames because
2773          * d_move mangles the src name.
2774          */
2775         msg = req->r_request;
2776         rhead = find_old_request_head(msg->front.iov_base,
2777                           session->s_con.peer_features);
2778 
2779         flags = le32_to_cpu(rhead->flags);
2780         flags |= CEPH_MDS_FLAG_REPLAY;
2781         rhead->flags = cpu_to_le32(flags);
2782 
2783         if (req->r_target_inode)
2784             rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2785 
2786         rhead->num_retry = req->r_attempts - 1;
2787 
2788         /* remove cap/dentry releases from message */
2789         rhead->num_releases = 0;
2790 
2791         p = msg->front.iov_base + req->r_request_release_offset;
2792         encode_timestamp_and_gids(&p, req);
2793 
2794         msg->front.iov_len = p - msg->front.iov_base;
2795         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2796         return 0;
2797     }
2798 
2799     if (req->r_request) {
2800         ceph_msg_put(req->r_request);
2801         req->r_request = NULL;
2802     }
2803     msg = create_request_message(session, req, drop_cap_releases);
2804     if (IS_ERR(msg)) {
2805         req->r_err = PTR_ERR(msg);
2806         return PTR_ERR(msg);
2807     }
2808     req->r_request = msg;
2809 
2810     rhead = find_old_request_head(msg->front.iov_base,
2811                       session->s_con.peer_features);
2812     rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2813     if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2814         flags |= CEPH_MDS_FLAG_REPLAY;
2815     if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
2816         flags |= CEPH_MDS_FLAG_ASYNC;
2817     if (req->r_parent)
2818         flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2819     rhead->flags = cpu_to_le32(flags);
2820     rhead->num_fwd = req->r_num_fwd;
2821     rhead->num_retry = req->r_attempts - 1;
2822 
2823     dout(" r_parent = %p\n", req->r_parent);
2824     return 0;
2825 }
2826 
2827 /*
2828  * called under mdsc->mutex
2829  */
2830 static int __send_request(struct ceph_mds_session *session,
2831               struct ceph_mds_request *req,
2832               bool drop_cap_releases)
2833 {
2834     int err;
2835 
2836     err = __prepare_send_request(session, req, drop_cap_releases);
2837     if (!err) {
2838         ceph_msg_get(req->r_request);
2839         ceph_con_send(&session->s_con, req->r_request);
2840     }
2841 
2842     return err;
2843 }
2844 
2845 /*
2846  * send request, or put it on the appropriate wait list.
2847  */
2848 static void __do_request(struct ceph_mds_client *mdsc,
2849             struct ceph_mds_request *req)
2850 {
2851     struct ceph_mds_session *session = NULL;
2852     int mds = -1;
2853     int err = 0;
2854     bool random;
2855 
2856     if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2857         if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2858             __unregister_request(mdsc, req);
2859         return;
2860     }
2861 
2862     if (req->r_timeout &&
2863         time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2864         dout("do_request timed out\n");
2865         err = -ETIMEDOUT;
2866         goto finish;
2867     }
2868     if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2869         dout("do_request forced umount\n");
2870         err = -EIO;
2871         goto finish;
2872     }
2873     if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2874         if (mdsc->mdsmap_err) {
2875             err = mdsc->mdsmap_err;
2876             dout("do_request mdsmap err %d\n", err);
2877             goto finish;
2878         }
2879         if (mdsc->mdsmap->m_epoch == 0) {
2880             dout("do_request no mdsmap, waiting for map\n");
2881             list_add(&req->r_wait, &mdsc->waiting_for_map);
2882             return;
2883         }
2884         if (!(mdsc->fsc->mount_options->flags &
2885               CEPH_MOUNT_OPT_MOUNTWAIT) &&
2886             !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2887             err = -EHOSTUNREACH;
2888             goto finish;
2889         }
2890     }
2891 
2892     put_request_session(req);
2893 
2894     mds = __choose_mds(mdsc, req, &random);
2895     if (mds < 0 ||
2896         ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2897         if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2898             err = -EJUKEBOX;
2899             goto finish;
2900         }
2901         dout("do_request no mds or not active, waiting for map\n");
2902         list_add(&req->r_wait, &mdsc->waiting_for_map);
2903         return;
2904     }
2905 
2906     /* get, open session */
2907     session = __ceph_lookup_mds_session(mdsc, mds);
2908     if (!session) {
2909         session = register_session(mdsc, mds);
2910         if (IS_ERR(session)) {
2911             err = PTR_ERR(session);
2912             goto finish;
2913         }
2914     }
2915     req->r_session = ceph_get_mds_session(session);
2916 
2917     dout("do_request mds%d session %p state %s\n", mds, session,
2918          ceph_session_state_name(session->s_state));
2919     if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2920         session->s_state != CEPH_MDS_SESSION_HUNG) {
2921         /*
2922          * We cannot queue async requests since the caps and delegated
2923          * inodes are bound to the session. Just return -EJUKEBOX and
2924          * let the caller retry a sync request in that case.
2925          */
2926         if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2927             err = -EJUKEBOX;
2928             goto out_session;
2929         }
2930 
2931         /*
2932          * If the session has been REJECTED, then return a hard error,
2933          * unless it's a CLEANRECOVER mount, in which case we'll queue
2934          * it to the mdsc queue.
2935          */
2936         if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2937             if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER))
2938                 list_add(&req->r_wait, &mdsc->waiting_for_map);
2939             else
2940                 err = -EACCES;
2941             goto out_session;
2942         }
2943 
2944         if (session->s_state == CEPH_MDS_SESSION_NEW ||
2945             session->s_state == CEPH_MDS_SESSION_CLOSING) {
2946             err = __open_session(mdsc, session);
2947             if (err)
2948                 goto out_session;
2949             /* retry the same mds later */
2950             if (random)
2951                 req->r_resend_mds = mds;
2952         }
2953         list_add(&req->r_wait, &session->s_waiting);
2954         goto out_session;
2955     }
2956 
2957     /* send request */
2958     req->r_resend_mds = -1;   /* forget any previous mds hint */
2959 
2960     if (req->r_request_started == 0)   /* note request start time */
2961         req->r_request_started = jiffies;
2962 
2963     /*
2964      * For async create we will choose the auth MDS of frag in parent
2965      * directory to send the request and ususally this works fine, but
2966      * if the migrated the dirtory to another MDS before it could handle
2967      * it the request will be forwarded.
2968      *
2969      * And then the auth cap will be changed.
2970      */
2971     if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) {
2972         struct ceph_dentry_info *di = ceph_dentry(req->r_dentry);
2973         struct ceph_inode_info *ci;
2974         struct ceph_cap *cap;
2975 
2976         /*
2977          * The request maybe handled very fast and the new inode
2978          * hasn't been linked to the dentry yet. We need to wait
2979          * for the ceph_finish_async_create(), which shouldn't be
2980          * stuck too long or fail in thoery, to finish when forwarding
2981          * the request.
2982          */
2983         if (!d_inode(req->r_dentry)) {
2984             err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT,
2985                       TASK_KILLABLE);
2986             if (err) {
2987                 mutex_lock(&req->r_fill_mutex);
2988                 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
2989                 mutex_unlock(&req->r_fill_mutex);
2990                 goto out_session;
2991             }
2992         }
2993 
2994         ci = ceph_inode(d_inode(req->r_dentry));
2995 
2996         spin_lock(&ci->i_ceph_lock);
2997         cap = ci->i_auth_cap;
2998         if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) {
2999             dout("do_request session changed for auth cap %d -> %d\n",
3000                  cap->session->s_mds, session->s_mds);
3001 
3002             /* Remove the auth cap from old session */
3003             spin_lock(&cap->session->s_cap_lock);
3004             cap->session->s_nr_caps--;
3005             list_del_init(&cap->session_caps);
3006             spin_unlock(&cap->session->s_cap_lock);
3007 
3008             /* Add the auth cap to the new session */
3009             cap->mds = mds;
3010             cap->session = session;
3011             spin_lock(&session->s_cap_lock);
3012             session->s_nr_caps++;
3013             list_add_tail(&cap->session_caps, &session->s_caps);
3014             spin_unlock(&session->s_cap_lock);
3015 
3016             change_auth_cap_ses(ci, session);
3017         }
3018         spin_unlock(&ci->i_ceph_lock);
3019     }
3020 
3021     err = __send_request(session, req, false);
3022 
3023 out_session:
3024     ceph_put_mds_session(session);
3025 finish:
3026     if (err) {
3027         dout("__do_request early error %d\n", err);
3028         req->r_err = err;
3029         complete_request(mdsc, req);
3030         __unregister_request(mdsc, req);
3031     }
3032     return;
3033 }
3034 
3035 /*
3036  * called under mdsc->mutex
3037  */
3038 static void __wake_requests(struct ceph_mds_client *mdsc,
3039                 struct list_head *head)
3040 {
3041     struct ceph_mds_request *req;
3042     LIST_HEAD(tmp_list);
3043 
3044     list_splice_init(head, &tmp_list);
3045 
3046     while (!list_empty(&tmp_list)) {
3047         req = list_entry(tmp_list.next,
3048                  struct ceph_mds_request, r_wait);
3049         list_del_init(&req->r_wait);
3050         dout(" wake request %p tid %llu\n", req, req->r_tid);
3051         __do_request(mdsc, req);
3052     }
3053 }
3054 
3055 /*
3056  * Wake up threads with requests pending for @mds, so that they can
3057  * resubmit their requests to a possibly different mds.
3058  */
3059 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
3060 {
3061     struct ceph_mds_request *req;
3062     struct rb_node *p = rb_first(&mdsc->request_tree);
3063 
3064     dout("kick_requests mds%d\n", mds);
3065     while (p) {
3066         req = rb_entry(p, struct ceph_mds_request, r_node);
3067         p = rb_next(p);
3068         if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3069             continue;
3070         if (req->r_attempts > 0)
3071             continue; /* only new requests */
3072         if (req->r_session &&
3073             req->r_session->s_mds == mds) {
3074             dout(" kicking tid %llu\n", req->r_tid);
3075             list_del_init(&req->r_wait);
3076             __do_request(mdsc, req);
3077         }
3078     }
3079 }
3080 
3081 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
3082                   struct ceph_mds_request *req)
3083 {
3084     int err = 0;
3085 
3086     /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
3087     if (req->r_inode)
3088         ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
3089     if (req->r_parent) {
3090         struct ceph_inode_info *ci = ceph_inode(req->r_parent);
3091         int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
3092                 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
3093         spin_lock(&ci->i_ceph_lock);
3094         ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
3095         __ceph_touch_fmode(ci, mdsc, fmode);
3096         spin_unlock(&ci->i_ceph_lock);
3097     }
3098     if (req->r_old_dentry_dir)
3099         ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
3100                   CEPH_CAP_PIN);
3101 
3102     if (req->r_inode) {
3103         err = ceph_wait_on_async_create(req->r_inode);
3104         if (err) {
3105             dout("%s: wait for async create returned: %d\n",
3106                  __func__, err);
3107             return err;
3108         }
3109     }
3110 
3111     if (!err && req->r_old_inode) {
3112         err = ceph_wait_on_async_create(req->r_old_inode);
3113         if (err) {
3114             dout("%s: wait for async create returned: %d\n",
3115                  __func__, err);
3116             return err;
3117         }
3118     }
3119 
3120     dout("submit_request on %p for inode %p\n", req, dir);
3121     mutex_lock(&mdsc->mutex);
3122     __register_request(mdsc, req, dir);
3123     __do_request(mdsc, req);
3124     err = req->r_err;
3125     mutex_unlock(&mdsc->mutex);
3126     return err;
3127 }
3128 
3129 int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3130                struct ceph_mds_request *req,
3131                ceph_mds_request_wait_callback_t wait_func)
3132 {
3133     int err;
3134 
3135     /* wait */
3136     dout("do_request waiting\n");
3137     if (wait_func) {
3138         err = wait_func(mdsc, req);
3139     } else {
3140         long timeleft = wait_for_completion_killable_timeout(
3141                     &req->r_completion,
3142                     ceph_timeout_jiffies(req->r_timeout));
3143         if (timeleft > 0)
3144             err = 0;
3145         else if (!timeleft)
3146             err = -ETIMEDOUT;  /* timed out */
3147         else
3148             err = timeleft;  /* killed */
3149     }
3150     dout("do_request waited, got %d\n", err);
3151     mutex_lock(&mdsc->mutex);
3152 
3153     /* only abort if we didn't race with a real reply */
3154     if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3155         err = le32_to_cpu(req->r_reply_info.head->result);
3156     } else if (err < 0) {
3157         dout("aborted request %lld with %d\n", req->r_tid, err);
3158 
3159         /*
3160          * ensure we aren't running concurrently with
3161          * ceph_fill_trace or ceph_readdir_prepopulate, which
3162          * rely on locks (dir mutex) held by our caller.
3163          */
3164         mutex_lock(&req->r_fill_mutex);
3165         req->r_err = err;
3166         set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3167         mutex_unlock(&req->r_fill_mutex);
3168 
3169         if (req->r_parent &&
3170             (req->r_op & CEPH_MDS_OP_WRITE))
3171             ceph_invalidate_dir_request(req);
3172     } else {
3173         err = req->r_err;
3174     }
3175 
3176     mutex_unlock(&mdsc->mutex);
3177     return err;
3178 }
3179 
3180 /*
3181  * Synchrously perform an mds request.  Take care of all of the
3182  * session setup, forwarding, retry details.
3183  */
3184 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3185              struct inode *dir,
3186              struct ceph_mds_request *req)
3187 {
3188     int err;
3189 
3190     dout("do_request on %p\n", req);
3191 
3192     /* issue */
3193     err = ceph_mdsc_submit_request(mdsc, dir, req);
3194     if (!err)
3195         err = ceph_mdsc_wait_request(mdsc, req, NULL);
3196     dout("do_request %p done, result %d\n", req, err);
3197     return err;
3198 }
3199 
3200 /*
3201  * Invalidate dir's completeness, dentry lease state on an aborted MDS
3202  * namespace request.
3203  */
3204 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3205 {
3206     struct inode *dir = req->r_parent;
3207     struct inode *old_dir = req->r_old_dentry_dir;
3208 
3209     dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
3210 
3211     ceph_dir_clear_complete(dir);
3212     if (old_dir)
3213         ceph_dir_clear_complete(old_dir);
3214     if (req->r_dentry)
3215         ceph_invalidate_dentry_lease(req->r_dentry);
3216     if (req->r_old_dentry)
3217         ceph_invalidate_dentry_lease(req->r_old_dentry);
3218 }
3219 
3220 /*
3221  * Handle mds reply.
3222  *
3223  * We take the session mutex and parse and process the reply immediately.
3224  * This preserves the logical ordering of replies, capabilities, etc., sent
3225  * by the MDS as they are applied to our local cache.
3226  */
3227 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3228 {
3229     struct ceph_mds_client *mdsc = session->s_mdsc;
3230     struct ceph_mds_request *req;
3231     struct ceph_mds_reply_head *head = msg->front.iov_base;
3232     struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
3233     struct ceph_snap_realm *realm;
3234     u64 tid;
3235     int err, result;
3236     int mds = session->s_mds;
3237 
3238     if (msg->front.iov_len < sizeof(*head)) {
3239         pr_err("mdsc_handle_reply got corrupt (short) reply\n");
3240         ceph_msg_dump(msg);
3241         return;
3242     }
3243 
3244     /* get request, session */
3245     tid = le64_to_cpu(msg->hdr.tid);
3246     mutex_lock(&mdsc->mutex);
3247     req = lookup_get_request(mdsc, tid);
3248     if (!req) {
3249         dout("handle_reply on unknown tid %llu\n", tid);
3250         mutex_unlock(&mdsc->mutex);
3251         return;
3252     }
3253     dout("handle_reply %p\n", req);
3254 
3255     /* correct session? */
3256     if (req->r_session != session) {
3257         pr_err("mdsc_handle_reply got %llu on session mds%d"
3258                " not mds%d\n", tid, session->s_mds,
3259                req->r_session ? req->r_session->s_mds : -1);
3260         mutex_unlock(&mdsc->mutex);
3261         goto out;
3262     }
3263 
3264     /* dup? */
3265     if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3266         (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3267         pr_warn("got a dup %s reply on %llu from mds%d\n",
3268                head->safe ? "safe" : "unsafe", tid, mds);
3269         mutex_unlock(&mdsc->mutex);
3270         goto out;
3271     }
3272     if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3273         pr_warn("got unsafe after safe on %llu from mds%d\n",
3274                tid, mds);
3275         mutex_unlock(&mdsc->mutex);
3276         goto out;
3277     }
3278 
3279     result = le32_to_cpu(head->result);
3280 
3281     if (head->safe) {
3282         set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3283         __unregister_request(mdsc, req);
3284 
3285         /* last request during umount? */
3286         if (mdsc->stopping && !__get_oldest_req(mdsc))
3287             complete_all(&mdsc->safe_umount_waiters);
3288 
3289         if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3290             /*
3291              * We already handled the unsafe response, now do the
3292              * cleanup.  No need to examine the response; the MDS
3293              * doesn't include any result info in the safe
3294              * response.  And even if it did, there is nothing
3295              * useful we could do with a revised return value.
3296              */
3297             dout("got safe reply %llu, mds%d\n", tid, mds);
3298 
3299             mutex_unlock(&mdsc->mutex);
3300             goto out;
3301         }
3302     } else {
3303         set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3304         list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3305     }
3306 
3307     dout("handle_reply tid %lld result %d\n", tid, result);
3308     rinfo = &req->r_reply_info;
3309     if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3310         err = parse_reply_info(session, msg, rinfo, (u64)-1);
3311     else
3312         err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
3313     mutex_unlock(&mdsc->mutex);
3314 
3315     /* Must find target inode outside of mutexes to avoid deadlocks */
3316     if ((err >= 0) && rinfo->head->is_target) {
3317         struct inode *in;
3318         struct ceph_vino tvino = {
3319             .ino  = le64_to_cpu(rinfo->targeti.in->ino),
3320             .snap = le64_to_cpu(rinfo->targeti.in->snapid)
3321         };
3322 
3323         in = ceph_get_inode(mdsc->fsc->sb, tvino);
3324         if (IS_ERR(in)) {
3325             err = PTR_ERR(in);
3326             mutex_lock(&session->s_mutex);
3327             goto out_err;
3328         }
3329         req->r_target_inode = in;
3330     }
3331 
3332     mutex_lock(&session->s_mutex);
3333     if (err < 0) {
3334         pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
3335         ceph_msg_dump(msg);
3336         goto out_err;
3337     }
3338 
3339     /* snap trace */
3340     realm = NULL;
3341     if (rinfo->snapblob_len) {
3342         down_write(&mdsc->snap_rwsem);
3343         ceph_update_snap_trace(mdsc, rinfo->snapblob,
3344                 rinfo->snapblob + rinfo->snapblob_len,
3345                 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3346                 &realm);
3347         downgrade_write(&mdsc->snap_rwsem);
3348     } else {
3349         down_read(&mdsc->snap_rwsem);
3350     }
3351 
3352     /* insert trace into our cache */
3353     mutex_lock(&req->r_fill_mutex);
3354     current->journal_info = req;
3355     err = ceph_fill_trace(mdsc->fsc->sb, req);
3356     if (err == 0) {
3357         if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3358                     req->r_op == CEPH_MDS_OP_LSSNAP))
3359             ceph_readdir_prepopulate(req, req->r_session);
3360     }
3361     current->journal_info = NULL;
3362     mutex_unlock(&req->r_fill_mutex);
3363 
3364     up_read(&mdsc->snap_rwsem);
3365     if (realm)
3366         ceph_put_snap_realm(mdsc, realm);
3367 
3368     if (err == 0) {
3369         if (req->r_target_inode &&
3370             test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3371             struct ceph_inode_info *ci =
3372                 ceph_inode(req->r_target_inode);
3373             spin_lock(&ci->i_unsafe_lock);
3374             list_add_tail(&req->r_unsafe_target_item,
3375                       &ci->i_unsafe_iops);
3376             spin_unlock(&ci->i_unsafe_lock);
3377         }
3378 
3379         ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
3380     }
3381 out_err:
3382     mutex_lock(&mdsc->mutex);
3383     if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3384         if (err) {
3385             req->r_err = err;
3386         } else {
3387             req->r_reply =  ceph_msg_get(msg);
3388             set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
3389         }
3390     } else {
3391         dout("reply arrived after request %lld was aborted\n", tid);
3392     }
3393     mutex_unlock(&mdsc->mutex);
3394 
3395     mutex_unlock(&session->s_mutex);
3396 
3397     /* kick calling process */
3398     complete_request(mdsc, req);
3399 
3400     ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
3401                      req->r_end_latency, err);
3402 out:
3403     ceph_mdsc_put_request(req);
3404     return;
3405 }
3406 
3407 
3408 
3409 /*
3410  * handle mds notification that our request has been forwarded.
3411  */
3412 static void handle_forward(struct ceph_mds_client *mdsc,
3413                struct ceph_mds_session *session,
3414                struct ceph_msg *msg)
3415 {
3416     struct ceph_mds_request *req;
3417     u64 tid = le64_to_cpu(msg->hdr.tid);
3418     u32 next_mds;
3419     u32 fwd_seq;
3420     int err = -EINVAL;
3421     void *p = msg->front.iov_base;
3422     void *end = p + msg->front.iov_len;
3423     bool aborted = false;
3424 
3425     ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3426     next_mds = ceph_decode_32(&p);
3427     fwd_seq = ceph_decode_32(&p);
3428 
3429     mutex_lock(&mdsc->mutex);
3430     req = lookup_get_request(mdsc, tid);
3431     if (!req) {
3432         mutex_unlock(&mdsc->mutex);
3433         dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
3434         return;  /* dup reply? */
3435     }
3436 
3437     if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3438         dout("forward tid %llu aborted, unregistering\n", tid);
3439         __unregister_request(mdsc, req);
3440     } else if (fwd_seq <= req->r_num_fwd) {
3441         /*
3442          * The type of 'num_fwd' in ceph 'MClientRequestForward'
3443          * is 'int32_t', while in 'ceph_mds_request_head' the
3444          * type is '__u8'. So in case the request bounces between
3445          * MDSes exceeding 256 times, the client will get stuck.
3446          *
3447          * In this case it's ususally a bug in MDS and continue
3448          * bouncing the request makes no sense.
3449          *
3450          * In future this could be fixed in ceph code, so avoid
3451          * using the hardcode here.
3452          */
3453         int max = sizeof_field(struct ceph_mds_request_head, num_fwd);
3454         max = 1 << (max * BITS_PER_BYTE);
3455         if (req->r_num_fwd >= max) {
3456             mutex_lock(&req->r_fill_mutex);
3457             req->r_err = -EMULTIHOP;
3458             set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3459             mutex_unlock(&req->r_fill_mutex);
3460             aborted = true;
3461             pr_warn_ratelimited("forward tid %llu seq overflow\n",
3462                         tid);
3463         } else {
3464             dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3465                  tid, next_mds, req->r_num_fwd, fwd_seq);
3466         }
3467     } else {
3468         /* resend. forward race not possible; mds would drop */
3469         dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3470         BUG_ON(req->r_err);
3471         BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3472         req->r_attempts = 0;
3473         req->r_num_fwd = fwd_seq;
3474         req->r_resend_mds = next_mds;
3475         put_request_session(req);
3476         __do_request(mdsc, req);
3477     }
3478     mutex_unlock(&mdsc->mutex);
3479 
3480     /* kick calling process */
3481     if (aborted)
3482         complete_request(mdsc, req);
3483     ceph_mdsc_put_request(req);
3484     return;
3485 
3486 bad:
3487     pr_err("mdsc_handle_forward decode error err=%d\n", err);
3488 }
3489 
3490 static int __decode_session_metadata(void **p, void *end,
3491                      bool *blocklisted)
3492 {
3493     /* map<string,string> */
3494     u32 n;
3495     bool err_str;
3496     ceph_decode_32_safe(p, end, n, bad);
3497     while (n-- > 0) {
3498         u32 len;
3499         ceph_decode_32_safe(p, end, len, bad);
3500         ceph_decode_need(p, end, len, bad);
3501         err_str = !strncmp(*p, "error_string", len);
3502         *p += len;
3503         ceph_decode_32_safe(p, end, len, bad);
3504         ceph_decode_need(p, end, len, bad);
3505         /*
3506          * Match "blocklisted (blacklisted)" from newer MDSes,
3507          * or "blacklisted" from older MDSes.
3508          */
3509         if (err_str && strnstr(*p, "blacklisted", len))
3510             *blocklisted = true;
3511         *p += len;
3512     }
3513     return 0;
3514 bad:
3515     return -1;
3516 }
3517 
3518 /*
3519  * handle a mds session control message
3520  */
3521 static void handle_session(struct ceph_mds_session *session,
3522                struct ceph_msg *msg)
3523 {
3524     struct ceph_mds_client *mdsc = session->s_mdsc;
3525     int mds = session->s_mds;
3526     int msg_version = le16_to_cpu(msg->hdr.version);
3527     void *p = msg->front.iov_base;
3528     void *end = p + msg->front.iov_len;
3529     struct ceph_mds_session_head *h;
3530     u32 op;
3531     u64 seq, features = 0;
3532     int wake = 0;
3533     bool blocklisted = false;
3534 
3535     /* decode */
3536     ceph_decode_need(&p, end, sizeof(*h), bad);
3537     h = p;
3538     p += sizeof(*h);
3539 
3540     op = le32_to_cpu(h->op);
3541     seq = le64_to_cpu(h->seq);
3542 
3543     if (msg_version >= 3) {
3544         u32 len;
3545         /* version >= 2 and < 5, decode metadata, skip otherwise
3546          * as it's handled via flags.
3547          */
3548         if (msg_version >= 5)
3549             ceph_decode_skip_map(&p, end, string, string, bad);
3550         else if (__decode_session_metadata(&p, end, &blocklisted) < 0)
3551             goto bad;
3552 
3553         /* version >= 3, feature bits */
3554         ceph_decode_32_safe(&p, end, len, bad);
3555         if (len) {
3556             ceph_decode_64_safe(&p, end, features, bad);
3557             p += len - sizeof(features);
3558         }
3559     }
3560 
3561     if (msg_version >= 5) {
3562         u32 flags, len;
3563 
3564         /* version >= 4 */
3565         ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */
3566         ceph_decode_32_safe(&p, end, len, bad); /* len */
3567         ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */
3568 
3569         /* version >= 5, flags   */
3570         ceph_decode_32_safe(&p, end, flags, bad);
3571         if (flags & CEPH_SESSION_BLOCKLISTED) {
3572             pr_warn("mds%d session blocklisted\n", session->s_mds);
3573             blocklisted = true;
3574         }
3575     }
3576 
3577     mutex_lock(&mdsc->mutex);
3578     if (op == CEPH_SESSION_CLOSE) {
3579         ceph_get_mds_session(session);
3580         __unregister_session(mdsc, session);
3581     }
3582     /* FIXME: this ttl calculation is generous */
3583     session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3584     mutex_unlock(&mdsc->mutex);
3585 
3586     mutex_lock(&session->s_mutex);
3587 
3588     dout("handle_session mds%d %s %p state %s seq %llu\n",
3589          mds, ceph_session_op_name(op), session,
3590          ceph_session_state_name(session->s_state), seq);
3591 
3592     if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3593         session->s_state = CEPH_MDS_SESSION_OPEN;
3594         pr_info("mds%d came back\n", session->s_mds);
3595     }
3596 
3597     switch (op) {
3598     case CEPH_SESSION_OPEN:
3599         if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3600             pr_info("mds%d reconnect success\n", session->s_mds);
3601 
3602         if (session->s_state == CEPH_MDS_SESSION_OPEN) {
3603             pr_notice("mds%d is already opened\n", session->s_mds);
3604         } else {
3605             session->s_state = CEPH_MDS_SESSION_OPEN;
3606             session->s_features = features;
3607             renewed_caps(mdsc, session, 0);
3608             if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT,
3609                      &session->s_features))
3610                 metric_schedule_delayed(&mdsc->metric);
3611         }
3612 
3613         /*
3614          * The connection maybe broken and the session in client
3615          * side has been reinitialized, need to update the seq
3616          * anyway.
3617          */
3618         if (!session->s_seq && seq)
3619             session->s_seq = seq;
3620 
3621         wake = 1;
3622         if (mdsc->stopping)
3623             __close_session(mdsc, session);
3624         break;
3625 
3626     case CEPH_SESSION_RENEWCAPS:
3627         if (session->s_renew_seq == seq)
3628             renewed_caps(mdsc, session, 1);
3629         break;
3630 
3631     case CEPH_SESSION_CLOSE:
3632         if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3633             pr_info("mds%d reconnect denied\n", session->s_mds);
3634         session->s_state = CEPH_MDS_SESSION_CLOSED;
3635         cleanup_session_requests(mdsc, session);
3636         remove_session_caps(session);
3637         wake = 2; /* for good measure */
3638         wake_up_all(&mdsc->session_close_wq);
3639         break;
3640 
3641     case CEPH_SESSION_STALE:
3642         pr_info("mds%d caps went stale, renewing\n",
3643             session->s_mds);
3644         atomic_inc(&session->s_cap_gen);
3645         session->s_cap_ttl = jiffies - 1;
3646         send_renew_caps(mdsc, session);
3647         break;
3648 
3649     case CEPH_SESSION_RECALL_STATE:
3650         ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3651         break;
3652 
3653     case CEPH_SESSION_FLUSHMSG:
3654         send_flushmsg_ack(mdsc, session, seq);
3655         break;
3656 
3657     case CEPH_SESSION_FORCE_RO:
3658         dout("force_session_readonly %p\n", session);
3659         spin_lock(&session->s_cap_lock);
3660         session->s_readonly = true;
3661         spin_unlock(&session->s_cap_lock);
3662         wake_up_session_caps(session, FORCE_RO);
3663         break;
3664 
3665     case CEPH_SESSION_REJECT:
3666         WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3667         pr_info("mds%d rejected session\n", session->s_mds);
3668         session->s_state = CEPH_MDS_SESSION_REJECTED;
3669         cleanup_session_requests(mdsc, session);
3670         remove_session_caps(session);
3671         if (blocklisted)
3672             mdsc->fsc->blocklisted = true;
3673         wake = 2; /* for good measure */
3674         break;
3675 
3676     default:
3677         pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3678         WARN_ON(1);
3679     }
3680 
3681     mutex_unlock(&session->s_mutex);
3682     if (wake) {
3683         mutex_lock(&mdsc->mutex);
3684         __wake_requests(mdsc, &session->s_waiting);
3685         if (wake == 2)
3686             kick_requests(mdsc, mds);
3687         mutex_unlock(&mdsc->mutex);
3688     }
3689     if (op == CEPH_SESSION_CLOSE)
3690         ceph_put_mds_session(session);
3691     return;
3692 
3693 bad:
3694     pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3695            (int)msg->front.iov_len);
3696     ceph_msg_dump(msg);
3697     return;
3698 }
3699 
3700 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
3701 {
3702     int dcaps;
3703 
3704     dcaps = xchg(&req->r_dir_caps, 0);
3705     if (dcaps) {
3706         dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3707         ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
3708     }
3709 }
3710 
3711 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
3712 {
3713     int dcaps;
3714 
3715     dcaps = xchg(&req->r_dir_caps, 0);
3716     if (dcaps) {
3717         dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3718         ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
3719                         dcaps);
3720     }
3721 }
3722 
3723 /*
3724  * called under session->mutex.
3725  */
3726 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3727                    struct ceph_mds_session *session)
3728 {
3729     struct ceph_mds_request *req, *nreq;
3730     struct rb_node *p;
3731 
3732     dout("replay_unsafe_requests mds%d\n", session->s_mds);
3733 
3734     mutex_lock(&mdsc->mutex);
3735     list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
3736         __send_request(session, req, true);
3737 
3738     /*
3739      * also re-send old requests when MDS enters reconnect stage. So that MDS
3740      * can process completed request in clientreplay stage.
3741      */
3742     p = rb_first(&mdsc->request_tree);
3743     while (p) {
3744         req = rb_entry(p, struct ceph_mds_request, r_node);
3745         p = rb_next(p);
3746         if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3747             continue;
3748         if (req->r_attempts == 0)
3749             continue; /* only old requests */
3750         if (!req->r_session)
3751             continue;
3752         if (req->r_session->s_mds != session->s_mds)
3753             continue;
3754 
3755         ceph_mdsc_release_dir_caps_no_check(req);
3756 
3757         __send_request(session, req, true);
3758     }
3759     mutex_unlock(&mdsc->mutex);
3760 }
3761 
3762 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3763 {
3764     struct ceph_msg *reply;
3765     struct ceph_pagelist *_pagelist;
3766     struct page *page;
3767     __le32 *addr;
3768     int err = -ENOMEM;
3769 
3770     if (!recon_state->allow_multi)
3771         return -ENOSPC;
3772 
3773     /* can't handle message that contains both caps and realm */
3774     BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3775 
3776     /* pre-allocate new pagelist */
3777     _pagelist = ceph_pagelist_alloc(GFP_NOFS);
3778     if (!_pagelist)
3779         return -ENOMEM;
3780 
3781     reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3782     if (!reply)
3783         goto fail_msg;
3784 
3785     /* placeholder for nr_caps */
3786     err = ceph_pagelist_encode_32(_pagelist, 0);
3787     if (err < 0)
3788         goto fail;
3789 
3790     if (recon_state->nr_caps) {
3791         /* currently encoding caps */
3792         err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3793         if (err)
3794             goto fail;
3795     } else {
3796         /* placeholder for nr_realms (currently encoding relams) */
3797         err = ceph_pagelist_encode_32(_pagelist, 0);
3798         if (err < 0)
3799             goto fail;
3800     }
3801 
3802     err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3803     if (err)
3804         goto fail;
3805 
3806     page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3807     addr = kmap_atomic(page);
3808     if (recon_state->nr_caps) {
3809         /* currently encoding caps */
3810         *addr = cpu_to_le32(recon_state->nr_caps);
3811     } else {
3812         /* currently encoding relams */
3813         *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3814     }
3815     kunmap_atomic(addr);
3816 
3817     reply->hdr.version = cpu_to_le16(5);
3818     reply->hdr.compat_version = cpu_to_le16(4);
3819 
3820     reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3821     ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3822 
3823     ceph_con_send(&recon_state->session->s_con, reply);
3824     ceph_pagelist_release(recon_state->pagelist);
3825 
3826     recon_state->pagelist = _pagelist;
3827     recon_state->nr_caps = 0;
3828     recon_state->nr_realms = 0;
3829     recon_state->msg_version = 5;
3830     return 0;
3831 fail:
3832     ceph_msg_put(reply);
3833 fail_msg:
3834     ceph_pagelist_release(_pagelist);
3835     return err;
3836 }
3837 
3838 static struct dentry* d_find_primary(struct inode *inode)
3839 {
3840     struct dentry *alias, *dn = NULL;
3841 
3842     if (hlist_empty(&inode->i_dentry))
3843         return NULL;
3844 
3845     spin_lock(&inode->i_lock);
3846     if (hlist_empty(&inode->i_dentry))
3847         goto out_unlock;
3848 
3849     if (S_ISDIR(inode->i_mode)) {
3850         alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
3851         if (!IS_ROOT(alias))
3852             dn = dget(alias);
3853         goto out_unlock;
3854     }
3855 
3856     hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
3857         spin_lock(&alias->d_lock);
3858         if (!d_unhashed(alias) &&
3859             (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
3860             dn = dget_dlock(alias);
3861         }
3862         spin_unlock(&alias->d_lock);
3863         if (dn)
3864             break;
3865     }
3866 out_unlock:
3867     spin_unlock(&inode->i_lock);
3868     return dn;
3869 }
3870 
3871 /*
3872  * Encode information about a cap for a reconnect with the MDS.
3873  */
3874 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
3875               void *arg)
3876 {
3877     union {
3878         struct ceph_mds_cap_reconnect v2;
3879         struct ceph_mds_cap_reconnect_v1 v1;
3880     } rec;
3881     struct ceph_inode_info *ci = cap->ci;
3882     struct ceph_reconnect_state *recon_state = arg;
3883     struct ceph_pagelist *pagelist = recon_state->pagelist;
3884     struct dentry *dentry;
3885     char *path;
3886     int pathlen = 0, err;
3887     u64 pathbase;
3888     u64 snap_follows;
3889 
3890     dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3891          inode, ceph_vinop(inode), cap, cap->cap_id,
3892          ceph_cap_string(cap->issued));
3893 
3894     dentry = d_find_primary(inode);
3895     if (dentry) {
3896         /* set pathbase to parent dir when msg_version >= 2 */
3897         path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase,
3898                         recon_state->msg_version >= 2);
3899         dput(dentry);
3900         if (IS_ERR(path)) {
3901             err = PTR_ERR(path);
3902             goto out_err;
3903         }
3904     } else {
3905         path = NULL;
3906         pathbase = 0;
3907     }
3908 
3909     spin_lock(&ci->i_ceph_lock);
3910     cap->seq = 0;        /* reset cap seq */
3911     cap->issue_seq = 0;  /* and issue_seq */
3912     cap->mseq = 0;       /* and migrate_seq */
3913     cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
3914 
3915     /* These are lost when the session goes away */
3916     if (S_ISDIR(inode->i_mode)) {
3917         if (cap->issued & CEPH_CAP_DIR_CREATE) {
3918             ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
3919             memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
3920         }
3921         cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
3922     }
3923 
3924     if (recon_state->msg_version >= 2) {
3925         rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3926         rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3927         rec.v2.issued = cpu_to_le32(cap->issued);
3928         rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3929         rec.v2.pathbase = cpu_to_le64(pathbase);
3930         rec.v2.flock_len = (__force __le32)
3931             ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3932     } else {
3933         rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3934         rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3935         rec.v1.issued = cpu_to_le32(cap->issued);
3936         rec.v1.size = cpu_to_le64(i_size_read(inode));
3937         ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3938         ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3939         rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3940         rec.v1.pathbase = cpu_to_le64(pathbase);
3941     }
3942 
3943     if (list_empty(&ci->i_cap_snaps)) {
3944         snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3945     } else {
3946         struct ceph_cap_snap *capsnap =
3947             list_first_entry(&ci->i_cap_snaps,
3948                      struct ceph_cap_snap, ci_item);
3949         snap_follows = capsnap->follows;
3950     }
3951     spin_unlock(&ci->i_ceph_lock);
3952 
3953     if (recon_state->msg_version >= 2) {
3954         int num_fcntl_locks, num_flock_locks;
3955         struct ceph_filelock *flocks = NULL;
3956         size_t struct_len, total_len = sizeof(u64);
3957         u8 struct_v = 0;
3958 
3959 encode_again:
3960         if (rec.v2.flock_len) {
3961             ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3962         } else {
3963             num_fcntl_locks = 0;
3964             num_flock_locks = 0;
3965         }
3966         if (num_fcntl_locks + num_flock_locks > 0) {
3967             flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3968                            sizeof(struct ceph_filelock),
3969                            GFP_NOFS);
3970             if (!flocks) {
3971                 err = -ENOMEM;
3972                 goto out_err;
3973             }
3974             err = ceph_encode_locks_to_buffer(inode, flocks,
3975                               num_fcntl_locks,
3976                               num_flock_locks);
3977             if (err) {
3978                 kfree(flocks);
3979                 flocks = NULL;
3980                 if (err == -ENOSPC)
3981                     goto encode_again;
3982                 goto out_err;
3983             }
3984         } else {
3985             kfree(flocks);
3986             flocks = NULL;
3987         }
3988 
3989         if (recon_state->msg_version >= 3) {
3990             /* version, compat_version and struct_len */
3991             total_len += 2 * sizeof(u8) + sizeof(u32);
3992             struct_v = 2;
3993         }
3994         /*
3995          * number of encoded locks is stable, so copy to pagelist
3996          */
3997         struct_len = 2 * sizeof(u32) +
3998                 (num_fcntl_locks + num_flock_locks) *
3999                 sizeof(struct ceph_filelock);
4000         rec.v2.flock_len = cpu_to_le32(struct_len);
4001 
4002         struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
4003 
4004         if (struct_v >= 2)
4005             struct_len += sizeof(u64); /* snap_follows */
4006 
4007         total_len += struct_len;
4008 
4009         if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
4010             err = send_reconnect_partial(recon_state);
4011             if (err)
4012                 goto out_freeflocks;
4013             pagelist = recon_state->pagelist;
4014         }
4015 
4016         err = ceph_pagelist_reserve(pagelist, total_len);
4017         if (err)
4018             goto out_freeflocks;
4019 
4020         ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4021         if (recon_state->msg_version >= 3) {
4022             ceph_pagelist_encode_8(pagelist, struct_v);
4023             ceph_pagelist_encode_8(pagelist, 1);
4024             ceph_pagelist_encode_32(pagelist, struct_len);
4025         }
4026         ceph_pagelist_encode_string(pagelist, path, pathlen);
4027         ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
4028         ceph_locks_to_pagelist(flocks, pagelist,
4029                        num_fcntl_locks, num_flock_locks);
4030         if (struct_v >= 2)
4031             ceph_pagelist_encode_64(pagelist, snap_follows);
4032 out_freeflocks:
4033         kfree(flocks);
4034     } else {
4035         err = ceph_pagelist_reserve(pagelist,
4036                         sizeof(u64) + sizeof(u32) +
4037                         pathlen + sizeof(rec.v1));
4038         if (err)
4039             goto out_err;
4040 
4041         ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4042         ceph_pagelist_encode_string(pagelist, path, pathlen);
4043         ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
4044     }
4045 
4046 out_err:
4047     ceph_mdsc_free_path(path, pathlen);
4048     if (!err)
4049         recon_state->nr_caps++;
4050     return err;
4051 }
4052 
4053 static int encode_snap_realms(struct ceph_mds_client *mdsc,
4054                   struct ceph_reconnect_state *recon_state)
4055 {
4056     struct rb_node *p;
4057     struct ceph_pagelist *pagelist = recon_state->pagelist;
4058     int err = 0;
4059 
4060     if (recon_state->msg_version >= 4) {
4061         err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
4062         if (err < 0)
4063             goto fail;
4064     }
4065 
4066     /*
4067      * snaprealms.  we provide mds with the ino, seq (version), and
4068      * parent for all of our realms.  If the mds has any newer info,
4069      * it will tell us.
4070      */
4071     for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
4072         struct ceph_snap_realm *realm =
4073                rb_entry(p, struct ceph_snap_realm, node);
4074         struct ceph_mds_snaprealm_reconnect sr_rec;
4075 
4076         if (recon_state->msg_version >= 4) {
4077             size_t need = sizeof(u8) * 2 + sizeof(u32) +
4078                       sizeof(sr_rec);
4079 
4080             if (pagelist->length + need > RECONNECT_MAX_SIZE) {
4081                 err = send_reconnect_partial(recon_state);
4082                 if (err)
4083                     goto fail;
4084                 pagelist = recon_state->pagelist;
4085             }
4086 
4087             err = ceph_pagelist_reserve(pagelist, need);
4088             if (err)
4089                 goto fail;
4090 
4091             ceph_pagelist_encode_8(pagelist, 1);
4092             ceph_pagelist_encode_8(pagelist, 1);
4093             ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
4094         }
4095 
4096         dout(" adding snap realm %llx seq %lld parent %llx\n",
4097              realm->ino, realm->seq, realm->parent_ino);
4098         sr_rec.ino = cpu_to_le64(realm->ino);
4099         sr_rec.seq = cpu_to_le64(realm->seq);
4100         sr_rec.parent = cpu_to_le64(realm->parent_ino);
4101 
4102         err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
4103         if (err)
4104             goto fail;
4105 
4106         recon_state->nr_realms++;
4107     }
4108 fail:
4109     return err;
4110 }
4111 
4112 
4113 /*
4114  * If an MDS fails and recovers, clients need to reconnect in order to
4115  * reestablish shared state.  This includes all caps issued through
4116  * this session _and_ the snap_realm hierarchy.  Because it's not
4117  * clear which snap realms the mds cares about, we send everything we
4118  * know about.. that ensures we'll then get any new info the
4119  * recovering MDS might have.
4120  *
4121  * This is a relatively heavyweight operation, but it's rare.
4122  */
4123 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
4124                    struct ceph_mds_session *session)
4125 {
4126     struct ceph_msg *reply;
4127     int mds = session->s_mds;
4128     int err = -ENOMEM;
4129     struct ceph_reconnect_state recon_state = {
4130         .session = session,
4131     };
4132     LIST_HEAD(dispose);
4133 
4134     pr_info("mds%d reconnect start\n", mds);
4135 
4136     recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
4137     if (!recon_state.pagelist)
4138         goto fail_nopagelist;
4139 
4140     reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4141     if (!reply)
4142         goto fail_nomsg;
4143 
4144     xa_destroy(&session->s_delegated_inos);
4145 
4146     mutex_lock(&session->s_mutex);
4147     session->s_state = CEPH_MDS_SESSION_RECONNECTING;
4148     session->s_seq = 0;
4149 
4150     dout("session %p state %s\n", session,
4151          ceph_session_state_name(session->s_state));
4152 
4153     atomic_inc(&session->s_cap_gen);
4154 
4155     spin_lock(&session->s_cap_lock);
4156     /* don't know if session is readonly */
4157     session->s_readonly = 0;
4158     /*
4159      * notify __ceph_remove_cap() that we are composing cap reconnect.
4160      * If a cap get released before being added to the cap reconnect,
4161      * __ceph_remove_cap() should skip queuing cap release.
4162      */
4163     session->s_cap_reconnect = 1;
4164     /* drop old cap expires; we're about to reestablish that state */
4165     detach_cap_releases(session, &dispose);
4166     spin_unlock(&session->s_cap_lock);
4167     dispose_cap_releases(mdsc, &dispose);
4168 
4169     /* trim unused caps to reduce MDS's cache rejoin time */
4170     if (mdsc->fsc->sb->s_root)
4171         shrink_dcache_parent(mdsc->fsc->sb->s_root);
4172 
4173     ceph_con_close(&session->s_con);
4174     ceph_con_open(&session->s_con,
4175               CEPH_ENTITY_TYPE_MDS, mds,
4176               ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4177 
4178     /* replay unsafe requests */
4179     replay_unsafe_requests(mdsc, session);
4180 
4181     ceph_early_kick_flushing_caps(mdsc, session);
4182 
4183     down_read(&mdsc->snap_rwsem);
4184 
4185     /* placeholder for nr_caps */
4186     err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4187     if (err)
4188         goto fail;
4189 
4190     if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4191         recon_state.msg_version = 3;
4192         recon_state.allow_multi = true;
4193     } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4194         recon_state.msg_version = 3;
4195     } else {
4196         recon_state.msg_version = 2;
4197     }
4198     /* trsaverse this session's caps */
4199     err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4200 
4201     spin_lock(&session->s_cap_lock);
4202     session->s_cap_reconnect = 0;
4203     spin_unlock(&session->s_cap_lock);
4204 
4205     if (err < 0)
4206         goto fail;
4207 
4208     /* check if all realms can be encoded into current message */
4209     if (mdsc->num_snap_realms) {
4210         size_t total_len =
4211             recon_state.pagelist->length +
4212             mdsc->num_snap_realms *
4213             sizeof(struct ceph_mds_snaprealm_reconnect);
4214         if (recon_state.msg_version >= 4) {
4215             /* number of realms */
4216             total_len += sizeof(u32);
4217             /* version, compat_version and struct_len */
4218             total_len += mdsc->num_snap_realms *
4219                      (2 * sizeof(u8) + sizeof(u32));
4220         }
4221         if (total_len > RECONNECT_MAX_SIZE) {
4222             if (!recon_state.allow_multi) {
4223                 err = -ENOSPC;
4224                 goto fail;
4225             }
4226             if (recon_state.nr_caps) {
4227                 err = send_reconnect_partial(&recon_state);
4228                 if (err)
4229                     goto fail;
4230             }
4231             recon_state.msg_version = 5;
4232         }
4233     }
4234 
4235     err = encode_snap_realms(mdsc, &recon_state);
4236     if (err < 0)
4237         goto fail;
4238 
4239     if (recon_state.msg_version >= 5) {
4240         err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4241         if (err < 0)
4242             goto fail;
4243     }
4244 
4245     if (recon_state.nr_caps || recon_state.nr_realms) {
4246         struct page *page =
4247             list_first_entry(&recon_state.pagelist->head,
4248                     struct page, lru);
4249         __le32 *addr = kmap_atomic(page);
4250         if (recon_state.nr_caps) {
4251             WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4252             *addr = cpu_to_le32(recon_state.nr_caps);
4253         } else if (recon_state.msg_version >= 4) {
4254             *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
4255         }
4256         kunmap_atomic(addr);
4257     }
4258 
4259     reply->hdr.version = cpu_to_le16(recon_state.msg_version);
4260     if (recon_state.msg_version >= 4)
4261         reply->hdr.compat_version = cpu_to_le16(4);
4262 
4263     reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
4264     ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
4265 
4266     ceph_con_send(&session->s_con, reply);
4267 
4268     mutex_unlock(&session->s_mutex);
4269 
4270     mutex_lock(&mdsc->mutex);
4271     __wake_requests(mdsc, &session->s_waiting);
4272     mutex_unlock(&mdsc->mutex);
4273 
4274     up_read(&mdsc->snap_rwsem);
4275     ceph_pagelist_release(recon_state.pagelist);
4276     return;
4277 
4278 fail:
4279     ceph_msg_put(reply);
4280     up_read(&mdsc->snap_rwsem);
4281     mutex_unlock(&session->s_mutex);
4282 fail_nomsg:
4283     ceph_pagelist_release(recon_state.pagelist);
4284 fail_nopagelist:
4285     pr_err("error %d preparing reconnect for mds%d\n", err, mds);
4286     return;
4287 }
4288 
4289 
4290 /*
4291  * compare old and new mdsmaps, kicking requests
4292  * and closing out old connections as necessary
4293  *
4294  * called under mdsc->mutex.
4295  */
4296 static void check_new_map(struct ceph_mds_client *mdsc,
4297               struct ceph_mdsmap *newmap,
4298               struct ceph_mdsmap *oldmap)
4299 {
4300     int i, j, err;
4301     int oldstate, newstate;
4302     struct ceph_mds_session *s;
4303     unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0};
4304 
4305     dout("check_new_map new %u old %u\n",
4306          newmap->m_epoch, oldmap->m_epoch);
4307 
4308     if (newmap->m_info) {
4309         for (i = 0; i < newmap->possible_max_rank; i++) {
4310             for (j = 0; j < newmap->m_info[i].num_export_targets; j++)
4311                 set_bit(newmap->m_info[i].export_targets[j], targets);
4312         }
4313     }
4314 
4315     for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4316         if (!mdsc->sessions[i])
4317             continue;
4318         s = mdsc->sessions[i];
4319         oldstate = ceph_mdsmap_get_state(oldmap, i);
4320         newstate = ceph_mdsmap_get_state(newmap, i);
4321 
4322         dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
4323              i, ceph_mds_state_name(oldstate),
4324              ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
4325              ceph_mds_state_name(newstate),
4326              ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
4327              ceph_session_state_name(s->s_state));
4328 
4329         if (i >= newmap->possible_max_rank) {
4330             /* force close session for stopped mds */
4331             ceph_get_mds_session(s);
4332             __unregister_session(mdsc, s);
4333             __wake_requests(mdsc, &s->s_waiting);
4334             mutex_unlock(&mdsc->mutex);
4335 
4336             mutex_lock(&s->s_mutex);
4337             cleanup_session_requests(mdsc, s);
4338             remove_session_caps(s);
4339             mutex_unlock(&s->s_mutex);
4340 
4341             ceph_put_mds_session(s);
4342 
4343             mutex_lock(&mdsc->mutex);
4344             kick_requests(mdsc, i);
4345             continue;
4346         }
4347 
4348         if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
4349                ceph_mdsmap_get_addr(newmap, i),
4350                sizeof(struct ceph_entity_addr))) {
4351             /* just close it */
4352             mutex_unlock(&mdsc->mutex);
4353             mutex_lock(&s->s_mutex);
4354             mutex_lock(&mdsc->mutex);
4355             ceph_con_close(&s->s_con);
4356             mutex_unlock(&s->s_mutex);
4357             s->s_state = CEPH_MDS_SESSION_RESTARTING;
4358         } else if (oldstate == newstate) {
4359             continue;  /* nothing new with this mds */
4360         }
4361 
4362         /*
4363          * send reconnect?
4364          */
4365         if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
4366             newstate >= CEPH_MDS_STATE_RECONNECT) {
4367             mutex_unlock(&mdsc->mutex);
4368             clear_bit(i, targets);
4369             send_mds_reconnect(mdsc, s);
4370             mutex_lock(&mdsc->mutex);
4371         }
4372 
4373         /*
4374          * kick request on any mds that has gone active.
4375          */
4376         if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4377             newstate >= CEPH_MDS_STATE_ACTIVE) {
4378             if (oldstate != CEPH_MDS_STATE_CREATING &&
4379                 oldstate != CEPH_MDS_STATE_STARTING)
4380                 pr_info("mds%d recovery completed\n", s->s_mds);
4381             kick_requests(mdsc, i);
4382             mutex_unlock(&mdsc->mutex);
4383             mutex_lock(&s->s_mutex);
4384             mutex_lock(&mdsc->mutex);
4385             ceph_kick_flushing_caps(mdsc, s);
4386             mutex_unlock(&s->s_mutex);
4387             wake_up_session_caps(s, RECONNECT);
4388         }
4389     }
4390 
4391     /*
4392      * Only open and reconnect sessions that don't exist yet.
4393      */
4394     for (i = 0; i < newmap->possible_max_rank; i++) {
4395         /*
4396          * In case the import MDS is crashed just after
4397          * the EImportStart journal is flushed, so when
4398          * a standby MDS takes over it and is replaying
4399          * the EImportStart journal the new MDS daemon
4400          * will wait the client to reconnect it, but the
4401          * client may never register/open the session yet.
4402          *
4403          * Will try to reconnect that MDS daemon if the
4404          * rank number is in the export targets array and
4405          * is the up:reconnect state.
4406          */
4407         newstate = ceph_mdsmap_get_state(newmap, i);
4408         if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT)
4409             continue;
4410 
4411         /*
4412          * The session maybe registered and opened by some
4413          * requests which were choosing random MDSes during
4414          * the mdsc->mutex's unlock/lock gap below in rare
4415          * case. But the related MDS daemon will just queue
4416          * that requests and be still waiting for the client's
4417          * reconnection request in up:reconnect state.
4418          */
4419         s = __ceph_lookup_mds_session(mdsc, i);
4420         if (likely(!s)) {
4421             s = __open_export_target_session(mdsc, i);
4422             if (IS_ERR(s)) {
4423                 err = PTR_ERR(s);
4424                 pr_err("failed to open export target session, err %d\n",
4425                        err);
4426                 continue;
4427             }
4428         }
4429         dout("send reconnect to export target mds.%d\n", i);
4430         mutex_unlock(&mdsc->mutex);
4431         send_mds_reconnect(mdsc, s);
4432         ceph_put_mds_session(s);
4433         mutex_lock(&mdsc->mutex);
4434     }
4435 
4436     for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4437         s = mdsc->sessions[i];
4438         if (!s)
4439             continue;
4440         if (!ceph_mdsmap_is_laggy(newmap, i))
4441             continue;
4442         if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4443             s->s_state == CEPH_MDS_SESSION_HUNG ||
4444             s->s_state == CEPH_MDS_SESSION_CLOSING) {
4445             dout(" connecting to export targets of laggy mds%d\n",
4446                  i);
4447             __open_export_target_sessions(mdsc, s);
4448         }
4449     }
4450 }
4451 
4452 
4453 
4454 /*
4455  * leases
4456  */
4457 
4458 /*
4459  * caller must hold session s_mutex, dentry->d_lock
4460  */
4461 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4462 {
4463     struct ceph_dentry_info *di = ceph_dentry(dentry);
4464 
4465     ceph_put_mds_session(di->lease_session);
4466     di->lease_session = NULL;
4467 }
4468 
4469 static void handle_lease(struct ceph_mds_client *mdsc,
4470              struct ceph_mds_session *session,
4471              struct ceph_msg *msg)
4472 {
4473     struct super_block *sb = mdsc->fsc->sb;
4474     struct inode *inode;
4475     struct dentry *parent, *dentry;
4476     struct ceph_dentry_info *di;
4477     int mds = session->s_mds;
4478     struct ceph_mds_lease *h = msg->front.iov_base;
4479     u32 seq;
4480     struct ceph_vino vino;
4481     struct qstr dname;
4482     int release = 0;
4483 
4484     dout("handle_lease from mds%d\n", mds);
4485 
4486     /* decode */
4487     if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4488         goto bad;
4489     vino.ino = le64_to_cpu(h->ino);
4490     vino.snap = CEPH_NOSNAP;
4491     seq = le32_to_cpu(h->seq);
4492     dname.len = get_unaligned_le32(h + 1);
4493     if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
4494         goto bad;
4495     dname.name = (void *)(h + 1) + sizeof(u32);
4496 
4497     /* lookup inode */
4498     inode = ceph_find_inode(sb, vino);
4499     dout("handle_lease %s, ino %llx %p %.*s\n",
4500          ceph_lease_op_name(h->action), vino.ino, inode,
4501          dname.len, dname.name);
4502 
4503     mutex_lock(&session->s_mutex);
4504     inc_session_sequence(session);
4505 
4506     if (!inode) {
4507         dout("handle_lease no inode %llx\n", vino.ino);
4508         goto release;
4509     }
4510 
4511     /* dentry */
4512     parent = d_find_alias(inode);
4513     if (!parent) {
4514         dout("no parent dentry on inode %p\n", inode);
4515         WARN_ON(1);
4516         goto release;  /* hrm... */
4517     }
4518     dname.hash = full_name_hash(parent, dname.name, dname.len);
4519     dentry = d_lookup(parent, &dname);
4520     dput(parent);
4521     if (!dentry)
4522         goto release;
4523 
4524     spin_lock(&dentry->d_lock);
4525     di = ceph_dentry(dentry);
4526     switch (h->action) {
4527     case CEPH_MDS_LEASE_REVOKE:
4528         if (di->lease_session == session) {
4529             if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4530                 h->seq = cpu_to_le32(di->lease_seq);
4531             __ceph_mdsc_drop_dentry_lease(dentry);
4532         }
4533         release = 1;
4534         break;
4535 
4536     case CEPH_MDS_LEASE_RENEW:
4537         if (di->lease_session == session &&
4538             di->lease_gen == atomic_read(&session->s_cap_gen) &&
4539             di->lease_renew_from &&
4540             di->lease_renew_after == 0) {
4541             unsigned long duration =
4542                 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
4543 
4544             di->lease_seq = seq;
4545             di->time = di->lease_renew_from + duration;
4546             di->lease_renew_after = di->lease_renew_from +
4547                 (duration >> 1);
4548             di->lease_renew_from = 0;
4549         }
4550         break;
4551     }
4552     spin_unlock(&dentry->d_lock);
4553     dput(dentry);
4554 
4555     if (!release)
4556         goto out;
4557 
4558 release:
4559     /* let's just reuse the same message */
4560     h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4561     ceph_msg_get(msg);
4562     ceph_con_send(&session->s_con, msg);
4563 
4564 out:
4565     mutex_unlock(&session->s_mutex);
4566     iput(inode);
4567     return;
4568 
4569 bad:
4570     pr_err("corrupt lease message\n");
4571     ceph_msg_dump(msg);
4572 }
4573 
4574 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
4575                   struct dentry *dentry, char action,
4576                   u32 seq)
4577 {
4578     struct ceph_msg *msg;
4579     struct ceph_mds_lease *lease;
4580     struct inode *dir;
4581     int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
4582 
4583     dout("lease_send_msg identry %p %s to mds%d\n",
4584          dentry, ceph_lease_op_name(action), session->s_mds);
4585 
4586     msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
4587     if (!msg)
4588         return;
4589     lease = msg->front.iov_base;
4590     lease->action = action;
4591     lease->seq = cpu_to_le32(seq);
4592 
4593     spin_lock(&dentry->d_lock);
4594     dir = d_inode(dentry->d_parent);
4595     lease->ino = cpu_to_le64(ceph_ino(dir));
4596     lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
4597 
4598     put_unaligned_le32(dentry->d_name.len, lease + 1);
4599     memcpy((void *)(lease + 1) + 4,
4600            dentry->d_name.name, dentry->d_name.len);
4601     spin_unlock(&dentry->d_lock);
4602 
4603     ceph_con_send(&session->s_con, msg);
4604 }
4605 
4606 /*
4607  * lock unlock the session, to wait ongoing session activities
4608  */
4609 static void lock_unlock_session(struct ceph_mds_session *s)
4610 {
4611     mutex_lock(&s->s_mutex);
4612     mutex_unlock(&s->s_mutex);
4613 }
4614 
4615 static void maybe_recover_session(struct ceph_mds_client *mdsc)
4616 {
4617     struct ceph_fs_client *fsc = mdsc->fsc;
4618 
4619     if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
4620         return;
4621 
4622     if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
4623         return;
4624 
4625     if (!READ_ONCE(fsc->blocklisted))
4626         return;
4627 
4628     pr_info("auto reconnect after blocklisted\n");
4629     ceph_force_reconnect(fsc->sb);
4630 }
4631 
4632 bool check_session_state(struct ceph_mds_session *s)
4633 {
4634     switch (s->s_state) {
4635     case CEPH_MDS_SESSION_OPEN:
4636         if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4637             s->s_state = CEPH_MDS_SESSION_HUNG;
4638             pr_info("mds%d hung\n", s->s_mds);
4639         }
4640         break;
4641     case CEPH_MDS_SESSION_CLOSING:
4642     case CEPH_MDS_SESSION_NEW:
4643     case CEPH_MDS_SESSION_RESTARTING:
4644     case CEPH_MDS_SESSION_CLOSED:
4645     case CEPH_MDS_SESSION_REJECTED:
4646         return false;
4647     }
4648 
4649     return true;
4650 }
4651 
4652 /*
4653  * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
4654  * then we need to retransmit that request.
4655  */
4656 void inc_session_sequence(struct ceph_mds_session *s)
4657 {
4658     lockdep_assert_held(&s->s_mutex);
4659 
4660     s->s_seq++;
4661 
4662     if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4663         int ret;
4664 
4665         dout("resending session close request for mds%d\n", s->s_mds);
4666         ret = request_close_session(s);
4667         if (ret < 0)
4668             pr_err("unable to close session to mds%d: %d\n",
4669                    s->s_mds, ret);
4670     }
4671 }
4672 
4673 /*
4674  * delayed work -- periodically trim expired leases, renew caps with mds.  If
4675  * the @delay parameter is set to 0 or if it's more than 5 secs, the default
4676  * workqueue delay value of 5 secs will be used.
4677  */
4678 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
4679 {
4680     unsigned long max_delay = HZ * 5;
4681 
4682     /* 5 secs default delay */
4683     if (!delay || (delay > max_delay))
4684         delay = max_delay;
4685     schedule_delayed_work(&mdsc->delayed_work,
4686                   round_jiffies_relative(delay));
4687 }
4688 
4689 static void delayed_work(struct work_struct *work)
4690 {
4691     struct ceph_mds_client *mdsc =
4692         container_of(work, struct ceph_mds_client, delayed_work.work);
4693     unsigned long delay;
4694     int renew_interval;
4695     int renew_caps;
4696     int i;
4697 
4698     dout("mdsc delayed_work\n");
4699 
4700     if (mdsc->stopping)
4701         return;
4702 
4703     mutex_lock(&mdsc->mutex);
4704     renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4705     renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4706                    mdsc->last_renew_caps);
4707     if (renew_caps)
4708         mdsc->last_renew_caps = jiffies;
4709 
4710     for (i = 0; i < mdsc->max_sessions; i++) {
4711         struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4712         if (!s)
4713             continue;
4714 
4715         if (!check_session_state(s)) {
4716             ceph_put_mds_session(s);
4717             continue;
4718         }
4719         mutex_unlock(&mdsc->mutex);
4720 
4721         mutex_lock(&s->s_mutex);
4722         if (renew_caps)
4723             send_renew_caps(mdsc, s);
4724         else
4725             ceph_con_keepalive(&s->s_con);
4726         if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4727             s->s_state == CEPH_MDS_SESSION_HUNG)
4728             ceph_send_cap_releases(mdsc, s);
4729         mutex_unlock(&s->s_mutex);
4730         ceph_put_mds_session(s);
4731 
4732         mutex_lock(&mdsc->mutex);
4733     }
4734     mutex_unlock(&mdsc->mutex);
4735 
4736     delay = ceph_check_delayed_caps(mdsc);
4737 
4738     ceph_queue_cap_reclaim_work(mdsc);
4739 
4740     ceph_trim_snapid_map(mdsc);
4741 
4742     maybe_recover_session(mdsc);
4743 
4744     schedule_delayed(mdsc, delay);
4745 }
4746 
4747 int ceph_mdsc_init(struct ceph_fs_client *fsc)
4748 
4749 {
4750     struct ceph_mds_client *mdsc;
4751     int err;
4752 
4753     mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4754     if (!mdsc)
4755         return -ENOMEM;
4756     mdsc->fsc = fsc;
4757     mutex_init(&mdsc->mutex);
4758     mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
4759     if (!mdsc->mdsmap) {
4760         err = -ENOMEM;
4761         goto err_mdsc;
4762     }
4763 
4764     init_completion(&mdsc->safe_umount_waiters);
4765     init_waitqueue_head(&mdsc->session_close_wq);
4766     INIT_LIST_HEAD(&mdsc->waiting_for_map);
4767     mdsc->quotarealms_inodes = RB_ROOT;
4768     mutex_init(&mdsc->quotarealms_inodes_mutex);
4769     init_rwsem(&mdsc->snap_rwsem);
4770     mdsc->snap_realms = RB_ROOT;
4771     INIT_LIST_HEAD(&mdsc->snap_empty);
4772     spin_lock_init(&mdsc->snap_empty_lock);
4773     mdsc->request_tree = RB_ROOT;
4774     INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4775     mdsc->last_renew_caps = jiffies;
4776     INIT_LIST_HEAD(&mdsc->cap_delay_list);
4777     INIT_LIST_HEAD(&mdsc->cap_wait_list);
4778     spin_lock_init(&mdsc->cap_delay_lock);
4779     INIT_LIST_HEAD(&mdsc->snap_flush_list);
4780     spin_lock_init(&mdsc->snap_flush_lock);
4781     mdsc->last_cap_flush_tid = 1;
4782     INIT_LIST_HEAD(&mdsc->cap_flush_list);
4783     INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4784     spin_lock_init(&mdsc->cap_dirty_lock);
4785     init_waitqueue_head(&mdsc->cap_flushing_wq);
4786     INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4787     err = ceph_metric_init(&mdsc->metric);
4788     if (err)
4789         goto err_mdsmap;
4790 
4791     spin_lock_init(&mdsc->dentry_list_lock);
4792     INIT_LIST_HEAD(&mdsc->dentry_leases);
4793     INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
4794 
4795     ceph_caps_init(mdsc);
4796     ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4797 
4798     spin_lock_init(&mdsc->snapid_map_lock);
4799     mdsc->snapid_map_tree = RB_ROOT;
4800     INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4801 
4802     init_rwsem(&mdsc->pool_perm_rwsem);
4803     mdsc->pool_perm_tree = RB_ROOT;
4804 
4805     strscpy(mdsc->nodename, utsname()->nodename,
4806         sizeof(mdsc->nodename));
4807 
4808     fsc->mdsc = mdsc;
4809     return 0;
4810 
4811 err_mdsmap:
4812     kfree(mdsc->mdsmap);
4813 err_mdsc:
4814     kfree(mdsc);
4815     return err;
4816 }
4817 
4818 /*
4819  * Wait for safe replies on open mds requests.  If we time out, drop
4820  * all requests from the tree to avoid dangling dentry refs.
4821  */
4822 static void wait_requests(struct ceph_mds_client *mdsc)
4823 {
4824     struct ceph_options *opts = mdsc->fsc->client->options;
4825     struct ceph_mds_request *req;
4826 
4827     mutex_lock(&mdsc->mutex);
4828     if (__get_oldest_req(mdsc)) {
4829         mutex_unlock(&mdsc->mutex);
4830 
4831         dout("wait_requests waiting for requests\n");
4832         wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4833                     ceph_timeout_jiffies(opts->mount_timeout));
4834 
4835         /* tear down remaining requests */
4836         mutex_lock(&mdsc->mutex);
4837         while ((req = __get_oldest_req(mdsc))) {
4838             dout("wait_requests timed out on tid %llu\n",
4839                  req->r_tid);
4840             list_del_init(&req->r_wait);
4841             __unregister_request(mdsc, req);
4842         }
4843     }
4844     mutex_unlock(&mdsc->mutex);
4845     dout("wait_requests done\n");
4846 }
4847 
4848 void send_flush_mdlog(struct ceph_mds_session *s)
4849 {
4850     struct ceph_msg *msg;
4851 
4852     /*
4853      * Pre-luminous MDS crashes when it sees an unknown session request
4854      */
4855     if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
4856         return;
4857 
4858     mutex_lock(&s->s_mutex);
4859     dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds,
4860          ceph_session_state_name(s->s_state), s->s_seq);
4861     msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
4862                       s->s_seq);
4863     if (!msg) {
4864         pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n",
4865                s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
4866     } else {
4867         ceph_con_send(&s->s_con, msg);
4868     }
4869     mutex_unlock(&s->s_mutex);
4870 }
4871 
4872 /*
4873  * called before mount is ro, and before dentries are torn down.
4874  * (hmm, does this still race with new lookups?)
4875  */
4876 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4877 {
4878     dout("pre_umount\n");
4879     mdsc->stopping = 1;
4880 
4881     ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
4882     ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
4883     ceph_flush_dirty_caps(mdsc);
4884     wait_requests(mdsc);
4885 
4886     /*
4887      * wait for reply handlers to drop their request refs and
4888      * their inode/dcache refs
4889      */
4890     ceph_msgr_flush();
4891 
4892     ceph_cleanup_quotarealms_inodes(mdsc);
4893 }
4894 
4895 /*
4896  * flush the mdlog and wait for all write mds requests to flush.
4897  */
4898 static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc,
4899                          u64 want_tid)
4900 {
4901     struct ceph_mds_request *req = NULL, *nextreq;
4902     struct ceph_mds_session *last_session = NULL;
4903     struct rb_node *n;
4904 
4905     mutex_lock(&mdsc->mutex);
4906     dout("%s want %lld\n", __func__, want_tid);
4907 restart:
4908     req = __get_oldest_req(mdsc);
4909     while (req && req->r_tid <= want_tid) {
4910         /* find next request */
4911         n = rb_next(&req->r_node);
4912         if (n)
4913             nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4914         else
4915             nextreq = NULL;
4916         if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4917             (req->r_op & CEPH_MDS_OP_WRITE)) {
4918             struct ceph_mds_session *s = req->r_session;
4919 
4920             if (!s) {
4921                 req = nextreq;
4922                 continue;
4923             }
4924 
4925             /* write op */
4926             ceph_mdsc_get_request(req);
4927             if (nextreq)
4928                 ceph_mdsc_get_request(nextreq);
4929             s = ceph_get_mds_session(s);
4930             mutex_unlock(&mdsc->mutex);
4931 
4932             /* send flush mdlog request to MDS */
4933             if (last_session != s) {
4934                 send_flush_mdlog(s);
4935                 ceph_put_mds_session(last_session);
4936                 last_session = s;
4937             } else {
4938                 ceph_put_mds_session(s);
4939             }
4940             dout("%s wait on %llu (want %llu)\n", __func__,
4941                  req->r_tid, want_tid);
4942             wait_for_completion(&req->r_safe_completion);
4943 
4944             mutex_lock(&mdsc->mutex);
4945             ceph_mdsc_put_request(req);
4946             if (!nextreq)
4947                 break;  /* next dne before, so we're done! */
4948             if (RB_EMPTY_NODE(&nextreq->r_node)) {
4949                 /* next request was removed from tree */
4950                 ceph_mdsc_put_request(nextreq);
4951                 goto restart;
4952             }
4953             ceph_mdsc_put_request(nextreq);  /* won't go away */
4954         }
4955         req = nextreq;
4956     }
4957     mutex_unlock(&mdsc->mutex);
4958     ceph_put_mds_session(last_session);
4959     dout("%s done\n", __func__);
4960 }
4961 
4962 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4963 {
4964     u64 want_tid, want_flush;
4965 
4966     if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
4967         return;
4968 
4969     dout("sync\n");
4970     mutex_lock(&mdsc->mutex);
4971     want_tid = mdsc->last_tid;
4972     mutex_unlock(&mdsc->mutex);
4973 
4974     ceph_flush_dirty_caps(mdsc);
4975     spin_lock(&mdsc->cap_dirty_lock);
4976     want_flush = mdsc->last_cap_flush_tid;
4977     if (!list_empty(&mdsc->cap_flush_list)) {
4978         struct ceph_cap_flush *cf =
4979             list_last_entry(&mdsc->cap_flush_list,
4980                     struct ceph_cap_flush, g_list);
4981         cf->wake = true;
4982     }
4983     spin_unlock(&mdsc->cap_dirty_lock);
4984 
4985     dout("sync want tid %lld flush_seq %lld\n",
4986          want_tid, want_flush);
4987 
4988     flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid);
4989     wait_caps_flush(mdsc, want_flush);
4990 }
4991 
4992 /*
4993  * true if all sessions are closed, or we force unmount
4994  */
4995 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4996 {
4997     if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4998         return true;
4999     return atomic_read(&mdsc->num_sessions) <= skipped;
5000 }
5001 
5002 /*
5003  * called after sb is ro.
5004  */
5005 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
5006 {
5007     struct ceph_options *opts = mdsc->fsc->client->options;
5008     struct ceph_mds_session *session;
5009     int i;
5010     int skipped = 0;
5011 
5012     dout("close_sessions\n");
5013 
5014     /* close sessions */
5015     mutex_lock(&mdsc->mutex);
5016     for (i = 0; i < mdsc->max_sessions; i++) {
5017         session = __ceph_lookup_mds_session(mdsc, i);
5018         if (!session)
5019             continue;
5020         mutex_unlock(&mdsc->mutex);
5021         mutex_lock(&session->s_mutex);
5022         if (__close_session(mdsc, session) <= 0)
5023             skipped++;
5024         mutex_unlock(&session->s_mutex);
5025         ceph_put_mds_session(session);
5026         mutex_lock(&mdsc->mutex);
5027     }
5028     mutex_unlock(&mdsc->mutex);
5029 
5030     dout("waiting for sessions to close\n");
5031     wait_event_timeout(mdsc->session_close_wq,
5032                done_closing_sessions(mdsc, skipped),
5033                ceph_timeout_jiffies(opts->mount_timeout));
5034 
5035     /* tear down remaining sessions */
5036     mutex_lock(&mdsc->mutex);
5037     for (i = 0; i < mdsc->max_sessions; i++) {
5038         if (mdsc->sessions[i]) {
5039             session = ceph_get_mds_session(mdsc->sessions[i]);
5040             __unregister_session(mdsc, session);
5041             mutex_unlock(&mdsc->mutex);
5042             mutex_lock(&session->s_mutex);
5043             remove_session_caps(session);
5044             mutex_unlock(&session->s_mutex);
5045             ceph_put_mds_session(session);
5046             mutex_lock(&mdsc->mutex);
5047         }
5048     }
5049     WARN_ON(!list_empty(&mdsc->cap_delay_list));
5050     mutex_unlock(&mdsc->mutex);
5051 
5052     ceph_cleanup_snapid_map(mdsc);
5053     ceph_cleanup_global_and_empty_realms(mdsc);
5054 
5055     cancel_work_sync(&mdsc->cap_reclaim_work);
5056     cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
5057 
5058     dout("stopped\n");
5059 }
5060 
5061 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
5062 {
5063     struct ceph_mds_session *session;
5064     int mds;
5065 
5066     dout("force umount\n");
5067 
5068     mutex_lock(&mdsc->mutex);
5069     for (mds = 0; mds < mdsc->max_sessions; mds++) {
5070         session = __ceph_lookup_mds_session(mdsc, mds);
5071         if (!session)
5072             continue;
5073 
5074         if (session->s_state == CEPH_MDS_SESSION_REJECTED)
5075             __unregister_session(mdsc, session);
5076         __wake_requests(mdsc, &session->s_waiting);
5077         mutex_unlock(&mdsc->mutex);
5078 
5079         mutex_lock(&session->s_mutex);
5080         __close_session(mdsc, session);
5081         if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
5082             cleanup_session_requests(mdsc, session);
5083             remove_session_caps(session);
5084         }
5085         mutex_unlock(&session->s_mutex);
5086         ceph_put_mds_session(session);
5087 
5088         mutex_lock(&mdsc->mutex);
5089         kick_requests(mdsc, mds);
5090     }
5091     __wake_requests(mdsc, &mdsc->waiting_for_map);
5092     mutex_unlock(&mdsc->mutex);
5093 }
5094 
5095 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
5096 {
5097     dout("stop\n");
5098     /*
5099      * Make sure the delayed work stopped before releasing
5100      * the resources.
5101      *
5102      * Because the cancel_delayed_work_sync() will only
5103      * guarantee that the work finishes executing. But the
5104      * delayed work will re-arm itself again after that.
5105      */
5106     flush_delayed_work(&mdsc->delayed_work);
5107 
5108     if (mdsc->mdsmap)
5109         ceph_mdsmap_destroy(mdsc->mdsmap);
5110     kfree(mdsc->sessions);
5111     ceph_caps_finalize(mdsc);
5112     ceph_pool_perm_destroy(mdsc);
5113 }
5114 
5115 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
5116 {
5117     struct ceph_mds_client *mdsc = fsc->mdsc;
5118     dout("mdsc_destroy %p\n", mdsc);
5119 
5120     if (!mdsc)
5121         return;
5122 
5123     /* flush out any connection work with references to us */
5124     ceph_msgr_flush();
5125 
5126     ceph_mdsc_stop(mdsc);
5127 
5128     ceph_metric_destroy(&mdsc->metric);
5129 
5130     fsc->mdsc = NULL;
5131     kfree(mdsc);
5132     dout("mdsc_destroy %p done\n", mdsc);
5133 }
5134 
5135 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
5136 {
5137     struct ceph_fs_client *fsc = mdsc->fsc;
5138     const char *mds_namespace = fsc->mount_options->mds_namespace;
5139     void *p = msg->front.iov_base;
5140     void *end = p + msg->front.iov_len;
5141     u32 epoch;
5142     u32 num_fs;
5143     u32 mount_fscid = (u32)-1;
5144     int err = -EINVAL;
5145 
5146     ceph_decode_need(&p, end, sizeof(u32), bad);
5147     epoch = ceph_decode_32(&p);
5148 
5149     dout("handle_fsmap epoch %u\n", epoch);
5150 
5151     /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
5152     ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
5153 
5154     ceph_decode_32_safe(&p, end, num_fs, bad);
5155     while (num_fs-- > 0) {
5156         void *info_p, *info_end;
5157         u32 info_len;
5158         u32 fscid, namelen;
5159 
5160         ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
5161         p += 2;     // info_v, info_cv
5162         info_len = ceph_decode_32(&p);
5163         ceph_decode_need(&p, end, info_len, bad);
5164         info_p = p;
5165         info_end = p + info_len;
5166         p = info_end;
5167 
5168         ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
5169         fscid = ceph_decode_32(&info_p);
5170         namelen = ceph_decode_32(&info_p);
5171         ceph_decode_need(&info_p, info_end, namelen, bad);
5172 
5173         if (mds_namespace &&
5174             strlen(mds_namespace) == namelen &&
5175             !strncmp(mds_namespace, (char *)info_p, namelen)) {
5176             mount_fscid = fscid;
5177             break;
5178         }
5179     }
5180 
5181     ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
5182     if (mount_fscid != (u32)-1) {
5183         fsc->client->monc.fs_cluster_id = mount_fscid;
5184         ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
5185                    0, true);
5186         ceph_monc_renew_subs(&fsc->client->monc);
5187     } else {
5188         err = -ENOENT;
5189         goto err_out;
5190     }
5191     return;
5192 
5193 bad:
5194     pr_err("error decoding fsmap %d. Shutting down mount.\n", err);
5195     ceph_umount_begin(mdsc->fsc->sb);
5196 err_out:
5197     mutex_lock(&mdsc->mutex);
5198     mdsc->mdsmap_err = err;
5199     __wake_requests(mdsc, &mdsc->waiting_for_map);
5200     mutex_unlock(&mdsc->mutex);
5201 }
5202 
5203 /*
5204  * handle mds map update.
5205  */
5206 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
5207 {
5208     u32 epoch;
5209     u32 maplen;
5210     void *p = msg->front.iov_base;
5211     void *end = p + msg->front.iov_len;
5212     struct ceph_mdsmap *newmap, *oldmap;
5213     struct ceph_fsid fsid;
5214     int err = -EINVAL;
5215 
5216     ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
5217     ceph_decode_copy(&p, &fsid, sizeof(fsid));
5218     if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
5219         return;
5220     epoch = ceph_decode_32(&p);
5221     maplen = ceph_decode_32(&p);
5222     dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
5223 
5224     /* do we need it? */
5225     mutex_lock(&mdsc->mutex);
5226     if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
5227         dout("handle_map epoch %u <= our %u\n",
5228              epoch, mdsc->mdsmap->m_epoch);
5229         mutex_unlock(&mdsc->mutex);
5230         return;
5231     }
5232 
5233     newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client));
5234     if (IS_ERR(newmap)) {
5235         err = PTR_ERR(newmap);
5236         goto bad_unlock;
5237     }
5238 
5239     /* swap into place */
5240     if (mdsc->mdsmap) {
5241         oldmap = mdsc->mdsmap;
5242         mdsc->mdsmap = newmap;
5243         check_new_map(mdsc, newmap, oldmap);
5244         ceph_mdsmap_destroy(oldmap);
5245     } else {
5246         mdsc->mdsmap = newmap;  /* first mds map */
5247     }
5248     mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
5249                     MAX_LFS_FILESIZE);
5250 
5251     __wake_requests(mdsc, &mdsc->waiting_for_map);
5252     ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
5253               mdsc->mdsmap->m_epoch);
5254 
5255     mutex_unlock(&mdsc->mutex);
5256     schedule_delayed(mdsc, 0);
5257     return;
5258 
5259 bad_unlock:
5260     mutex_unlock(&mdsc->mutex);
5261 bad:
5262     pr_err("error decoding mdsmap %d. Shutting down mount.\n", err);
5263     ceph_umount_begin(mdsc->fsc->sb);
5264     return;
5265 }
5266 
5267 static struct ceph_connection *mds_get_con(struct ceph_connection *con)
5268 {
5269     struct ceph_mds_session *s = con->private;
5270 
5271     if (ceph_get_mds_session(s))
5272         return con;
5273     return NULL;
5274 }
5275 
5276 static void mds_put_con(struct ceph_connection *con)
5277 {
5278     struct ceph_mds_session *s = con->private;
5279 
5280     ceph_put_mds_session(s);
5281 }
5282 
5283 /*
5284  * if the client is unresponsive for long enough, the mds will kill
5285  * the session entirely.
5286  */
5287 static void mds_peer_reset(struct ceph_connection *con)
5288 {
5289     struct ceph_mds_session *s = con->private;
5290     struct ceph_mds_client *mdsc = s->s_mdsc;
5291 
5292     pr_warn("mds%d closed our session\n", s->s_mds);
5293     send_mds_reconnect(mdsc, s);
5294 }
5295 
5296 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5297 {
5298     struct ceph_mds_session *s = con->private;
5299     struct ceph_mds_client *mdsc = s->s_mdsc;
5300     int type = le16_to_cpu(msg->hdr.type);
5301 
5302     mutex_lock(&mdsc->mutex);
5303     if (__verify_registered_session(mdsc, s) < 0) {
5304         mutex_unlock(&mdsc->mutex);
5305         goto out;
5306     }
5307     mutex_unlock(&mdsc->mutex);
5308 
5309     switch (type) {
5310     case CEPH_MSG_MDS_MAP:
5311         ceph_mdsc_handle_mdsmap(mdsc, msg);
5312         break;
5313     case CEPH_MSG_FS_MAP_USER:
5314         ceph_mdsc_handle_fsmap(mdsc, msg);
5315         break;
5316     case CEPH_MSG_CLIENT_SESSION:
5317         handle_session(s, msg);
5318         break;
5319     case CEPH_MSG_CLIENT_REPLY:
5320         handle_reply(s, msg);
5321         break;
5322     case CEPH_MSG_CLIENT_REQUEST_FORWARD:
5323         handle_forward(mdsc, s, msg);
5324         break;
5325     case CEPH_MSG_CLIENT_CAPS:
5326         ceph_handle_caps(s, msg);
5327         break;
5328     case CEPH_MSG_CLIENT_SNAP:
5329         ceph_handle_snap(mdsc, s, msg);
5330         break;
5331     case CEPH_MSG_CLIENT_LEASE:
5332         handle_lease(mdsc, s, msg);
5333         break;
5334     case CEPH_MSG_CLIENT_QUOTA:
5335         ceph_handle_quota(mdsc, s, msg);
5336         break;
5337 
5338     default:
5339         pr_err("received unknown message type %d %s\n", type,
5340                ceph_msg_type_name(type));
5341     }
5342 out:
5343     ceph_msg_put(msg);
5344 }
5345 
5346 /*
5347  * authentication
5348  */
5349 
5350 /*
5351  * Note: returned pointer is the address of a structure that's
5352  * managed separately.  Caller must *not* attempt to free it.
5353  */
5354 static struct ceph_auth_handshake *
5355 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
5356 {
5357     struct ceph_mds_session *s = con->private;
5358     struct ceph_mds_client *mdsc = s->s_mdsc;
5359     struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5360     struct ceph_auth_handshake *auth = &s->s_auth;
5361     int ret;
5362 
5363     ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5364                      force_new, proto, NULL, NULL);
5365     if (ret)
5366         return ERR_PTR(ret);
5367 
5368     return auth;
5369 }
5370 
5371 static int mds_add_authorizer_challenge(struct ceph_connection *con,
5372                     void *challenge_buf, int challenge_buf_len)
5373 {
5374     struct ceph_mds_session *s = con->private;
5375     struct ceph_mds_client *mdsc = s->s_mdsc;
5376     struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5377 
5378     return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
5379                         challenge_buf, challenge_buf_len);
5380 }
5381 
5382 static int mds_verify_authorizer_reply(struct ceph_connection *con)
5383 {
5384     struct ceph_mds_session *s = con->private;
5385     struct ceph_mds_client *mdsc = s->s_mdsc;
5386     struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5387     struct ceph_auth_handshake *auth = &s->s_auth;
5388 
5389     return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
5390         auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
5391         NULL, NULL, NULL, NULL);
5392 }
5393 
5394 static int mds_invalidate_authorizer(struct ceph_connection *con)
5395 {
5396     struct ceph_mds_session *s = con->private;
5397     struct ceph_mds_client *mdsc = s->s_mdsc;
5398     struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5399 
5400     ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
5401 
5402     return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
5403 }
5404 
5405 static int mds_get_auth_request(struct ceph_connection *con,
5406                 void *buf, int *buf_len,
5407                 void **authorizer, int *authorizer_len)
5408 {
5409     struct ceph_mds_session *s = con->private;
5410     struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5411     struct ceph_auth_handshake *auth = &s->s_auth;
5412     int ret;
5413 
5414     ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5415                        buf, buf_len);
5416     if (ret)
5417         return ret;
5418 
5419     *authorizer = auth->authorizer_buf;
5420     *authorizer_len = auth->authorizer_buf_len;
5421     return 0;
5422 }
5423 
5424 static int mds_handle_auth_reply_more(struct ceph_connection *con,
5425                       void *reply, int reply_len,
5426                       void *buf, int *buf_len,
5427                       void **authorizer, int *authorizer_len)
5428 {
5429     struct ceph_mds_session *s = con->private;
5430     struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5431     struct ceph_auth_handshake *auth = &s->s_auth;
5432     int ret;
5433 
5434     ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
5435                           buf, buf_len);
5436     if (ret)
5437         return ret;
5438 
5439     *authorizer = auth->authorizer_buf;
5440     *authorizer_len = auth->authorizer_buf_len;
5441     return 0;
5442 }
5443 
5444 static int mds_handle_auth_done(struct ceph_connection *con,
5445                 u64 global_id, void *reply, int reply_len,
5446                 u8 *session_key, int *session_key_len,
5447                 u8 *con_secret, int *con_secret_len)
5448 {
5449     struct ceph_mds_session *s = con->private;
5450     struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5451     struct ceph_auth_handshake *auth = &s->s_auth;
5452 
5453     return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
5454                            session_key, session_key_len,
5455                            con_secret, con_secret_len);
5456 }
5457 
5458 static int mds_handle_auth_bad_method(struct ceph_connection *con,
5459                       int used_proto, int result,
5460                       const int *allowed_protos, int proto_cnt,
5461                       const int *allowed_modes, int mode_cnt)
5462 {
5463     struct ceph_mds_session *s = con->private;
5464     struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
5465     int ret;
5466 
5467     if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
5468                         used_proto, result,
5469                         allowed_protos, proto_cnt,
5470                         allowed_modes, mode_cnt)) {
5471         ret = ceph_monc_validate_auth(monc);
5472         if (ret)
5473             return ret;
5474     }
5475 
5476     return -EACCES;
5477 }
5478 
5479 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
5480                 struct ceph_msg_header *hdr, int *skip)
5481 {
5482     struct ceph_msg *msg;
5483     int type = (int) le16_to_cpu(hdr->type);
5484     int front_len = (int) le32_to_cpu(hdr->front_len);
5485 
5486     if (con->in_msg)
5487         return con->in_msg;
5488 
5489     *skip = 0;
5490     msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
5491     if (!msg) {
5492         pr_err("unable to allocate msg type %d len %d\n",
5493                type, front_len);
5494         return NULL;
5495     }
5496 
5497     return msg;
5498 }
5499 
5500 static int mds_sign_message(struct ceph_msg *msg)
5501 {
5502        struct ceph_mds_session *s = msg->con->private;
5503        struct ceph_auth_handshake *auth = &s->s_auth;
5504 
5505        return ceph_auth_sign_message(auth, msg);
5506 }
5507 
5508 static int mds_check_message_signature(struct ceph_msg *msg)
5509 {
5510        struct ceph_mds_session *s = msg->con->private;
5511        struct ceph_auth_handshake *auth = &s->s_auth;
5512 
5513        return ceph_auth_check_message_signature(auth, msg);
5514 }
5515 
5516 static const struct ceph_connection_operations mds_con_ops = {
5517     .get = mds_get_con,
5518     .put = mds_put_con,
5519     .alloc_msg = mds_alloc_msg,
5520     .dispatch = mds_dispatch,
5521     .peer_reset = mds_peer_reset,
5522     .get_authorizer = mds_get_authorizer,
5523     .add_authorizer_challenge = mds_add_authorizer_challenge,
5524     .verify_authorizer_reply = mds_verify_authorizer_reply,
5525     .invalidate_authorizer = mds_invalidate_authorizer,
5526     .sign_message = mds_sign_message,
5527     .check_message_signature = mds_check_message_signature,
5528     .get_auth_request = mds_get_auth_request,
5529     .handle_auth_reply_more = mds_handle_auth_reply_more,
5530     .handle_auth_done = mds_handle_auth_done,
5531     .handle_auth_bad_method = mds_handle_auth_bad_method,
5532 };
5533 
5534 /* eof */