Back to home page

OSCL-LXR

 
 

    


0001 
0002 /*
0003    rbd.c -- Export ceph rados objects as a Linux block device
0004 
0005 
0006    based on drivers/block/osdblk.c:
0007 
0008    Copyright 2009 Red Hat, Inc.
0009 
0010    This program is free software; you can redistribute it and/or modify
0011    it under the terms of the GNU General Public License as published by
0012    the Free Software Foundation.
0013 
0014    This program is distributed in the hope that it will be useful,
0015    but WITHOUT ANY WARRANTY; without even the implied warranty of
0016    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0017    GNU General Public License for more details.
0018 
0019    You should have received a copy of the GNU General Public License
0020    along with this program; see the file COPYING.  If not, write to
0021    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
0022 
0023 
0024 
0025    For usage instructions, please refer to:
0026 
0027                  Documentation/ABI/testing/sysfs-bus-rbd
0028 
0029  */
0030 
0031 #include <linux/ceph/libceph.h>
0032 #include <linux/ceph/osd_client.h>
0033 #include <linux/ceph/mon_client.h>
0034 #include <linux/ceph/cls_lock_client.h>
0035 #include <linux/ceph/striper.h>
0036 #include <linux/ceph/decode.h>
0037 #include <linux/fs_parser.h>
0038 #include <linux/bsearch.h>
0039 
0040 #include <linux/kernel.h>
0041 #include <linux/device.h>
0042 #include <linux/module.h>
0043 #include <linux/blk-mq.h>
0044 #include <linux/fs.h>
0045 #include <linux/blkdev.h>
0046 #include <linux/slab.h>
0047 #include <linux/idr.h>
0048 #include <linux/workqueue.h>
0049 
0050 #include "rbd_types.h"
0051 
0052 #define RBD_DEBUG   /* Activate rbd_assert() calls */
0053 
0054 /*
0055  * Increment the given counter and return its updated value.
0056  * If the counter is already 0 it will not be incremented.
0057  * If the counter is already at its maximum value returns
0058  * -EINVAL without updating it.
0059  */
0060 static int atomic_inc_return_safe(atomic_t *v)
0061 {
0062     unsigned int counter;
0063 
0064     counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
0065     if (counter <= (unsigned int)INT_MAX)
0066         return (int)counter;
0067 
0068     atomic_dec(v);
0069 
0070     return -EINVAL;
0071 }
0072 
0073 /* Decrement the counter.  Return the resulting value, or -EINVAL */
0074 static int atomic_dec_return_safe(atomic_t *v)
0075 {
0076     int counter;
0077 
0078     counter = atomic_dec_return(v);
0079     if (counter >= 0)
0080         return counter;
0081 
0082     atomic_inc(v);
0083 
0084     return -EINVAL;
0085 }
0086 
0087 #define RBD_DRV_NAME "rbd"
0088 
0089 #define RBD_MINORS_PER_MAJOR        256
0090 #define RBD_SINGLE_MAJOR_PART_SHIFT 4
0091 
0092 #define RBD_MAX_PARENT_CHAIN_LEN    16
0093 
0094 #define RBD_SNAP_DEV_NAME_PREFIX    "snap_"
0095 #define RBD_MAX_SNAP_NAME_LEN   \
0096             (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
0097 
0098 #define RBD_MAX_SNAP_COUNT  510 /* allows max snapc to fit in 4KB */
0099 
0100 #define RBD_SNAP_HEAD_NAME  "-"
0101 
0102 #define BAD_SNAP_INDEX  U32_MAX     /* invalid index into snap array */
0103 
0104 /* This allows a single page to hold an image name sent by OSD */
0105 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
0106 #define RBD_IMAGE_ID_LEN_MAX    64
0107 
0108 #define RBD_OBJ_PREFIX_LEN_MAX  64
0109 
0110 #define RBD_NOTIFY_TIMEOUT  5   /* seconds */
0111 #define RBD_RETRY_DELAY     msecs_to_jiffies(1000)
0112 
0113 /* Feature bits */
0114 
0115 #define RBD_FEATURE_LAYERING        (1ULL<<0)
0116 #define RBD_FEATURE_STRIPINGV2      (1ULL<<1)
0117 #define RBD_FEATURE_EXCLUSIVE_LOCK  (1ULL<<2)
0118 #define RBD_FEATURE_OBJECT_MAP      (1ULL<<3)
0119 #define RBD_FEATURE_FAST_DIFF       (1ULL<<4)
0120 #define RBD_FEATURE_DEEP_FLATTEN    (1ULL<<5)
0121 #define RBD_FEATURE_DATA_POOL       (1ULL<<7)
0122 #define RBD_FEATURE_OPERATIONS      (1ULL<<8)
0123 
0124 #define RBD_FEATURES_ALL    (RBD_FEATURE_LAYERING |     \
0125                  RBD_FEATURE_STRIPINGV2 |   \
0126                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
0127                  RBD_FEATURE_OBJECT_MAP |   \
0128                  RBD_FEATURE_FAST_DIFF |    \
0129                  RBD_FEATURE_DEEP_FLATTEN | \
0130                  RBD_FEATURE_DATA_POOL |    \
0131                  RBD_FEATURE_OPERATIONS)
0132 
0133 /* Features supported by this (client software) implementation. */
0134 
0135 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
0136 
0137 /*
0138  * An RBD device name will be "rbd#", where the "rbd" comes from
0139  * RBD_DRV_NAME above, and # is a unique integer identifier.
0140  */
0141 #define DEV_NAME_LEN        32
0142 
0143 /*
0144  * block device image metadata (in-memory version)
0145  */
0146 struct rbd_image_header {
0147     /* These six fields never change for a given rbd image */
0148     char *object_prefix;
0149     __u8 obj_order;
0150     u64 stripe_unit;
0151     u64 stripe_count;
0152     s64 data_pool_id;
0153     u64 features;       /* Might be changeable someday? */
0154 
0155     /* The remaining fields need to be updated occasionally */
0156     u64 image_size;
0157     struct ceph_snap_context *snapc;
0158     char *snap_names;   /* format 1 only */
0159     u64 *snap_sizes;    /* format 1 only */
0160 };
0161 
0162 /*
0163  * An rbd image specification.
0164  *
0165  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
0166  * identify an image.  Each rbd_dev structure includes a pointer to
0167  * an rbd_spec structure that encapsulates this identity.
0168  *
0169  * Each of the id's in an rbd_spec has an associated name.  For a
0170  * user-mapped image, the names are supplied and the id's associated
0171  * with them are looked up.  For a layered image, a parent image is
0172  * defined by the tuple, and the names are looked up.
0173  *
0174  * An rbd_dev structure contains a parent_spec pointer which is
0175  * non-null if the image it represents is a child in a layered
0176  * image.  This pointer will refer to the rbd_spec structure used
0177  * by the parent rbd_dev for its own identity (i.e., the structure
0178  * is shared between the parent and child).
0179  *
0180  * Since these structures are populated once, during the discovery
0181  * phase of image construction, they are effectively immutable so
0182  * we make no effort to synchronize access to them.
0183  *
0184  * Note that code herein does not assume the image name is known (it
0185  * could be a null pointer).
0186  */
0187 struct rbd_spec {
0188     u64     pool_id;
0189     const char  *pool_name;
0190     const char  *pool_ns;   /* NULL if default, never "" */
0191 
0192     const char  *image_id;
0193     const char  *image_name;
0194 
0195     u64     snap_id;
0196     const char  *snap_name;
0197 
0198     struct kref kref;
0199 };
0200 
0201 /*
0202  * an instance of the client.  multiple devices may share an rbd client.
0203  */
0204 struct rbd_client {
0205     struct ceph_client  *client;
0206     struct kref     kref;
0207     struct list_head    node;
0208 };
0209 
0210 struct pending_result {
0211     int         result;     /* first nonzero result */
0212     int         num_pending;
0213 };
0214 
0215 struct rbd_img_request;
0216 
0217 enum obj_request_type {
0218     OBJ_REQUEST_NODATA = 1,
0219     OBJ_REQUEST_BIO,    /* pointer into provided bio (list) */
0220     OBJ_REQUEST_BVECS,  /* pointer into provided bio_vec array */
0221     OBJ_REQUEST_OWN_BVECS,  /* private bio_vec array, doesn't own pages */
0222 };
0223 
0224 enum obj_operation_type {
0225     OBJ_OP_READ = 1,
0226     OBJ_OP_WRITE,
0227     OBJ_OP_DISCARD,
0228     OBJ_OP_ZEROOUT,
0229 };
0230 
0231 #define RBD_OBJ_FLAG_DELETION           (1U << 0)
0232 #define RBD_OBJ_FLAG_COPYUP_ENABLED     (1U << 1)
0233 #define RBD_OBJ_FLAG_COPYUP_ZEROS       (1U << 2)
0234 #define RBD_OBJ_FLAG_MAY_EXIST          (1U << 3)
0235 #define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT   (1U << 4)
0236 
0237 enum rbd_obj_read_state {
0238     RBD_OBJ_READ_START = 1,
0239     RBD_OBJ_READ_OBJECT,
0240     RBD_OBJ_READ_PARENT,
0241 };
0242 
0243 /*
0244  * Writes go through the following state machine to deal with
0245  * layering:
0246  *
0247  *            . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
0248  *            .                 |                                    .
0249  *            .                 v                                    .
0250  *            .    RBD_OBJ_WRITE_READ_FROM_PARENT. . .               .
0251  *            .                 |                    .               .
0252  *            .                 v                    v (deep-copyup  .
0253  *    (image  .   RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC   .  not needed)  .
0254  * flattened) v                 |                    .               .
0255  *            .                 v                    .               .
0256  *            . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . .      (copyup  .
0257  *                              |                        not needed) v
0258  *                              v                                    .
0259  *                            done . . . . . . . . . . . . . . . . . .
0260  *                              ^
0261  *                              |
0262  *                     RBD_OBJ_WRITE_FLAT
0263  *
0264  * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
0265  * assert_exists guard is needed or not (in some cases it's not needed
0266  * even if there is a parent).
0267  */
0268 enum rbd_obj_write_state {
0269     RBD_OBJ_WRITE_START = 1,
0270     RBD_OBJ_WRITE_PRE_OBJECT_MAP,
0271     RBD_OBJ_WRITE_OBJECT,
0272     __RBD_OBJ_WRITE_COPYUP,
0273     RBD_OBJ_WRITE_COPYUP,
0274     RBD_OBJ_WRITE_POST_OBJECT_MAP,
0275 };
0276 
0277 enum rbd_obj_copyup_state {
0278     RBD_OBJ_COPYUP_START = 1,
0279     RBD_OBJ_COPYUP_READ_PARENT,
0280     __RBD_OBJ_COPYUP_OBJECT_MAPS,
0281     RBD_OBJ_COPYUP_OBJECT_MAPS,
0282     __RBD_OBJ_COPYUP_WRITE_OBJECT,
0283     RBD_OBJ_COPYUP_WRITE_OBJECT,
0284 };
0285 
0286 struct rbd_obj_request {
0287     struct ceph_object_extent ex;
0288     unsigned int        flags;  /* RBD_OBJ_FLAG_* */
0289     union {
0290         enum rbd_obj_read_state  read_state;    /* for reads */
0291         enum rbd_obj_write_state write_state;   /* for writes */
0292     };
0293 
0294     struct rbd_img_request  *img_request;
0295     struct ceph_file_extent *img_extents;
0296     u32         num_img_extents;
0297 
0298     union {
0299         struct ceph_bio_iter    bio_pos;
0300         struct {
0301             struct ceph_bvec_iter   bvec_pos;
0302             u32         bvec_count;
0303             u32         bvec_idx;
0304         };
0305     };
0306 
0307     enum rbd_obj_copyup_state copyup_state;
0308     struct bio_vec      *copyup_bvecs;
0309     u32         copyup_bvec_count;
0310 
0311     struct list_head    osd_reqs;   /* w/ r_private_item */
0312 
0313     struct mutex        state_mutex;
0314     struct pending_result   pending;
0315     struct kref     kref;
0316 };
0317 
0318 enum img_req_flags {
0319     IMG_REQ_CHILD,      /* initiator: block = 0, child image = 1 */
0320     IMG_REQ_LAYERED,    /* ENOENT handling: normal = 0, layered = 1 */
0321 };
0322 
0323 enum rbd_img_state {
0324     RBD_IMG_START = 1,
0325     RBD_IMG_EXCLUSIVE_LOCK,
0326     __RBD_IMG_OBJECT_REQUESTS,
0327     RBD_IMG_OBJECT_REQUESTS,
0328 };
0329 
0330 struct rbd_img_request {
0331     struct rbd_device   *rbd_dev;
0332     enum obj_operation_type op_type;
0333     enum obj_request_type   data_type;
0334     unsigned long       flags;
0335     enum rbd_img_state  state;
0336     union {
0337         u64         snap_id;    /* for reads */
0338         struct ceph_snap_context *snapc;    /* for writes */
0339     };
0340     struct rbd_obj_request  *obj_request;   /* obj req initiator */
0341 
0342     struct list_head    lock_item;
0343     struct list_head    object_extents; /* obj_req.ex structs */
0344 
0345     struct mutex        state_mutex;
0346     struct pending_result   pending;
0347     struct work_struct  work;
0348     int         work_result;
0349 };
0350 
0351 #define for_each_obj_request(ireq, oreq) \
0352     list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
0353 #define for_each_obj_request_safe(ireq, oreq, n) \
0354     list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
0355 
0356 enum rbd_watch_state {
0357     RBD_WATCH_STATE_UNREGISTERED,
0358     RBD_WATCH_STATE_REGISTERED,
0359     RBD_WATCH_STATE_ERROR,
0360 };
0361 
0362 enum rbd_lock_state {
0363     RBD_LOCK_STATE_UNLOCKED,
0364     RBD_LOCK_STATE_LOCKED,
0365     RBD_LOCK_STATE_RELEASING,
0366 };
0367 
0368 /* WatchNotify::ClientId */
0369 struct rbd_client_id {
0370     u64 gid;
0371     u64 handle;
0372 };
0373 
0374 struct rbd_mapping {
0375     u64                     size;
0376 };
0377 
0378 /*
0379  * a single device
0380  */
0381 struct rbd_device {
0382     int         dev_id;     /* blkdev unique id */
0383 
0384     int         major;      /* blkdev assigned major */
0385     int         minor;
0386     struct gendisk      *disk;      /* blkdev's gendisk and rq */
0387 
0388     u32         image_format;   /* Either 1 or 2 */
0389     struct rbd_client   *rbd_client;
0390 
0391     char            name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
0392 
0393     spinlock_t      lock;       /* queue, flags, open_count */
0394 
0395     struct rbd_image_header header;
0396     unsigned long       flags;      /* possibly lock protected */
0397     struct rbd_spec     *spec;
0398     struct rbd_options  *opts;
0399     char            *config_info;   /* add{,_single_major} string */
0400 
0401     struct ceph_object_id   header_oid;
0402     struct ceph_object_locator header_oloc;
0403 
0404     struct ceph_file_layout layout;     /* used for all rbd requests */
0405 
0406     struct mutex        watch_mutex;
0407     enum rbd_watch_state    watch_state;
0408     struct ceph_osd_linger_request *watch_handle;
0409     u64         watch_cookie;
0410     struct delayed_work watch_dwork;
0411 
0412     struct rw_semaphore lock_rwsem;
0413     enum rbd_lock_state lock_state;
0414     char            lock_cookie[32];
0415     struct rbd_client_id    owner_cid;
0416     struct work_struct  acquired_lock_work;
0417     struct work_struct  released_lock_work;
0418     struct delayed_work lock_dwork;
0419     struct work_struct  unlock_work;
0420     spinlock_t      lock_lists_lock;
0421     struct list_head    acquiring_list;
0422     struct list_head    running_list;
0423     struct completion   acquire_wait;
0424     int         acquire_err;
0425     struct completion   releasing_wait;
0426 
0427     spinlock_t      object_map_lock;
0428     u8          *object_map;
0429     u64         object_map_size;    /* in objects */
0430     u64         object_map_flags;
0431 
0432     struct workqueue_struct *task_wq;
0433 
0434     struct rbd_spec     *parent_spec;
0435     u64         parent_overlap;
0436     atomic_t        parent_ref;
0437     struct rbd_device   *parent;
0438 
0439     /* Block layer tags. */
0440     struct blk_mq_tag_set   tag_set;
0441 
0442     /* protects updating the header */
0443     struct rw_semaphore     header_rwsem;
0444 
0445     struct rbd_mapping  mapping;
0446 
0447     struct list_head    node;
0448 
0449     /* sysfs related */
0450     struct device       dev;
0451     unsigned long       open_count; /* protected by lock */
0452 };
0453 
0454 /*
0455  * Flag bits for rbd_dev->flags:
0456  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
0457  *   by rbd_dev->lock
0458  */
0459 enum rbd_dev_flags {
0460     RBD_DEV_FLAG_EXISTS,    /* rbd_dev_device_setup() ran */
0461     RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
0462     RBD_DEV_FLAG_READONLY,  /* -o ro or snapshot */
0463 };
0464 
0465 static DEFINE_MUTEX(client_mutex);  /* Serialize client creation */
0466 
0467 static LIST_HEAD(rbd_dev_list);    /* devices */
0468 static DEFINE_SPINLOCK(rbd_dev_list_lock);
0469 
0470 static LIST_HEAD(rbd_client_list);      /* clients */
0471 static DEFINE_SPINLOCK(rbd_client_list_lock);
0472 
0473 /* Slab caches for frequently-allocated structures */
0474 
0475 static struct kmem_cache    *rbd_img_request_cache;
0476 static struct kmem_cache    *rbd_obj_request_cache;
0477 
0478 static int rbd_major;
0479 static DEFINE_IDA(rbd_dev_id_ida);
0480 
0481 static struct workqueue_struct *rbd_wq;
0482 
0483 static struct ceph_snap_context rbd_empty_snapc = {
0484     .nref = REFCOUNT_INIT(1),
0485 };
0486 
0487 /*
0488  * single-major requires >= 0.75 version of userspace rbd utility.
0489  */
0490 static bool single_major = true;
0491 module_param(single_major, bool, 0444);
0492 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
0493 
0494 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
0495 static ssize_t remove_store(struct bus_type *bus, const char *buf,
0496                 size_t count);
0497 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
0498                       size_t count);
0499 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
0500                      size_t count);
0501 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
0502 
0503 static int rbd_dev_id_to_minor(int dev_id)
0504 {
0505     return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
0506 }
0507 
0508 static int minor_to_rbd_dev_id(int minor)
0509 {
0510     return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
0511 }
0512 
0513 static bool rbd_is_ro(struct rbd_device *rbd_dev)
0514 {
0515     return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
0516 }
0517 
0518 static bool rbd_is_snap(struct rbd_device *rbd_dev)
0519 {
0520     return rbd_dev->spec->snap_id != CEPH_NOSNAP;
0521 }
0522 
0523 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
0524 {
0525     lockdep_assert_held(&rbd_dev->lock_rwsem);
0526 
0527     return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
0528            rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
0529 }
0530 
0531 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
0532 {
0533     bool is_lock_owner;
0534 
0535     down_read(&rbd_dev->lock_rwsem);
0536     is_lock_owner = __rbd_is_lock_owner(rbd_dev);
0537     up_read(&rbd_dev->lock_rwsem);
0538     return is_lock_owner;
0539 }
0540 
0541 static ssize_t supported_features_show(struct bus_type *bus, char *buf)
0542 {
0543     return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
0544 }
0545 
0546 static BUS_ATTR_WO(add);
0547 static BUS_ATTR_WO(remove);
0548 static BUS_ATTR_WO(add_single_major);
0549 static BUS_ATTR_WO(remove_single_major);
0550 static BUS_ATTR_RO(supported_features);
0551 
0552 static struct attribute *rbd_bus_attrs[] = {
0553     &bus_attr_add.attr,
0554     &bus_attr_remove.attr,
0555     &bus_attr_add_single_major.attr,
0556     &bus_attr_remove_single_major.attr,
0557     &bus_attr_supported_features.attr,
0558     NULL,
0559 };
0560 
0561 static umode_t rbd_bus_is_visible(struct kobject *kobj,
0562                   struct attribute *attr, int index)
0563 {
0564     if (!single_major &&
0565         (attr == &bus_attr_add_single_major.attr ||
0566          attr == &bus_attr_remove_single_major.attr))
0567         return 0;
0568 
0569     return attr->mode;
0570 }
0571 
0572 static const struct attribute_group rbd_bus_group = {
0573     .attrs = rbd_bus_attrs,
0574     .is_visible = rbd_bus_is_visible,
0575 };
0576 __ATTRIBUTE_GROUPS(rbd_bus);
0577 
0578 static struct bus_type rbd_bus_type = {
0579     .name       = "rbd",
0580     .bus_groups = rbd_bus_groups,
0581 };
0582 
0583 static void rbd_root_dev_release(struct device *dev)
0584 {
0585 }
0586 
0587 static struct device rbd_root_dev = {
0588     .init_name =    "rbd",
0589     .release =      rbd_root_dev_release,
0590 };
0591 
0592 static __printf(2, 3)
0593 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
0594 {
0595     struct va_format vaf;
0596     va_list args;
0597 
0598     va_start(args, fmt);
0599     vaf.fmt = fmt;
0600     vaf.va = &args;
0601 
0602     if (!rbd_dev)
0603         printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
0604     else if (rbd_dev->disk)
0605         printk(KERN_WARNING "%s: %s: %pV\n",
0606             RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
0607     else if (rbd_dev->spec && rbd_dev->spec->image_name)
0608         printk(KERN_WARNING "%s: image %s: %pV\n",
0609             RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
0610     else if (rbd_dev->spec && rbd_dev->spec->image_id)
0611         printk(KERN_WARNING "%s: id %s: %pV\n",
0612             RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
0613     else    /* punt */
0614         printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
0615             RBD_DRV_NAME, rbd_dev, &vaf);
0616     va_end(args);
0617 }
0618 
0619 #ifdef RBD_DEBUG
0620 #define rbd_assert(expr)                        \
0621         if (unlikely(!(expr))) {                \
0622             printk(KERN_ERR "\nAssertion failure in %s() "  \
0623                         "at line %d:\n\n"   \
0624                     "\trbd_assert(%s);\n\n",    \
0625                     __func__, __LINE__, #expr); \
0626             BUG();                      \
0627         }
0628 #else /* !RBD_DEBUG */
0629 #  define rbd_assert(expr)  ((void) 0)
0630 #endif /* !RBD_DEBUG */
0631 
0632 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
0633 
0634 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
0635 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
0636 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
0637 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
0638 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
0639                     u64 snap_id);
0640 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
0641                 u8 *order, u64 *snap_size);
0642 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
0643 
0644 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
0645 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
0646 
0647 /*
0648  * Return true if nothing else is pending.
0649  */
0650 static bool pending_result_dec(struct pending_result *pending, int *result)
0651 {
0652     rbd_assert(pending->num_pending > 0);
0653 
0654     if (*result && !pending->result)
0655         pending->result = *result;
0656     if (--pending->num_pending)
0657         return false;
0658 
0659     *result = pending->result;
0660     return true;
0661 }
0662 
0663 static int rbd_open(struct block_device *bdev, fmode_t mode)
0664 {
0665     struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
0666     bool removing = false;
0667 
0668     spin_lock_irq(&rbd_dev->lock);
0669     if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
0670         removing = true;
0671     else
0672         rbd_dev->open_count++;
0673     spin_unlock_irq(&rbd_dev->lock);
0674     if (removing)
0675         return -ENOENT;
0676 
0677     (void) get_device(&rbd_dev->dev);
0678 
0679     return 0;
0680 }
0681 
0682 static void rbd_release(struct gendisk *disk, fmode_t mode)
0683 {
0684     struct rbd_device *rbd_dev = disk->private_data;
0685     unsigned long open_count_before;
0686 
0687     spin_lock_irq(&rbd_dev->lock);
0688     open_count_before = rbd_dev->open_count--;
0689     spin_unlock_irq(&rbd_dev->lock);
0690     rbd_assert(open_count_before > 0);
0691 
0692     put_device(&rbd_dev->dev);
0693 }
0694 
0695 static const struct block_device_operations rbd_bd_ops = {
0696     .owner          = THIS_MODULE,
0697     .open           = rbd_open,
0698     .release        = rbd_release,
0699 };
0700 
0701 /*
0702  * Initialize an rbd client instance.  Success or not, this function
0703  * consumes ceph_opts.  Caller holds client_mutex.
0704  */
0705 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
0706 {
0707     struct rbd_client *rbdc;
0708     int ret = -ENOMEM;
0709 
0710     dout("%s:\n", __func__);
0711     rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
0712     if (!rbdc)
0713         goto out_opt;
0714 
0715     kref_init(&rbdc->kref);
0716     INIT_LIST_HEAD(&rbdc->node);
0717 
0718     rbdc->client = ceph_create_client(ceph_opts, rbdc);
0719     if (IS_ERR(rbdc->client))
0720         goto out_rbdc;
0721     ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
0722 
0723     ret = ceph_open_session(rbdc->client);
0724     if (ret < 0)
0725         goto out_client;
0726 
0727     spin_lock(&rbd_client_list_lock);
0728     list_add_tail(&rbdc->node, &rbd_client_list);
0729     spin_unlock(&rbd_client_list_lock);
0730 
0731     dout("%s: rbdc %p\n", __func__, rbdc);
0732 
0733     return rbdc;
0734 out_client:
0735     ceph_destroy_client(rbdc->client);
0736 out_rbdc:
0737     kfree(rbdc);
0738 out_opt:
0739     if (ceph_opts)
0740         ceph_destroy_options(ceph_opts);
0741     dout("%s: error %d\n", __func__, ret);
0742 
0743     return ERR_PTR(ret);
0744 }
0745 
0746 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
0747 {
0748     kref_get(&rbdc->kref);
0749 
0750     return rbdc;
0751 }
0752 
0753 /*
0754  * Find a ceph client with specific addr and configuration.  If
0755  * found, bump its reference count.
0756  */
0757 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
0758 {
0759     struct rbd_client *rbdc = NULL, *iter;
0760 
0761     if (ceph_opts->flags & CEPH_OPT_NOSHARE)
0762         return NULL;
0763 
0764     spin_lock(&rbd_client_list_lock);
0765     list_for_each_entry(iter, &rbd_client_list, node) {
0766         if (!ceph_compare_options(ceph_opts, iter->client)) {
0767             __rbd_get_client(iter);
0768 
0769             rbdc = iter;
0770             break;
0771         }
0772     }
0773     spin_unlock(&rbd_client_list_lock);
0774 
0775     return rbdc;
0776 }
0777 
0778 /*
0779  * (Per device) rbd map options
0780  */
0781 enum {
0782     Opt_queue_depth,
0783     Opt_alloc_size,
0784     Opt_lock_timeout,
0785     /* int args above */
0786     Opt_pool_ns,
0787     Opt_compression_hint,
0788     /* string args above */
0789     Opt_read_only,
0790     Opt_read_write,
0791     Opt_lock_on_read,
0792     Opt_exclusive,
0793     Opt_notrim,
0794 };
0795 
0796 enum {
0797     Opt_compression_hint_none,
0798     Opt_compression_hint_compressible,
0799     Opt_compression_hint_incompressible,
0800 };
0801 
0802 static const struct constant_table rbd_param_compression_hint[] = {
0803     {"none",        Opt_compression_hint_none},
0804     {"compressible",    Opt_compression_hint_compressible},
0805     {"incompressible",  Opt_compression_hint_incompressible},
0806     {}
0807 };
0808 
0809 static const struct fs_parameter_spec rbd_parameters[] = {
0810     fsparam_u32 ("alloc_size",          Opt_alloc_size),
0811     fsparam_enum    ("compression_hint",        Opt_compression_hint,
0812              rbd_param_compression_hint),
0813     fsparam_flag    ("exclusive",           Opt_exclusive),
0814     fsparam_flag    ("lock_on_read",        Opt_lock_on_read),
0815     fsparam_u32 ("lock_timeout",        Opt_lock_timeout),
0816     fsparam_flag    ("notrim",          Opt_notrim),
0817     fsparam_string  ("_pool_ns",            Opt_pool_ns),
0818     fsparam_u32 ("queue_depth",         Opt_queue_depth),
0819     fsparam_flag    ("read_only",           Opt_read_only),
0820     fsparam_flag    ("read_write",          Opt_read_write),
0821     fsparam_flag    ("ro",              Opt_read_only),
0822     fsparam_flag    ("rw",              Opt_read_write),
0823     {}
0824 };
0825 
0826 struct rbd_options {
0827     int queue_depth;
0828     int alloc_size;
0829     unsigned long   lock_timeout;
0830     bool    read_only;
0831     bool    lock_on_read;
0832     bool    exclusive;
0833     bool    trim;
0834 
0835     u32 alloc_hint_flags;  /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
0836 };
0837 
0838 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_DEFAULT_RQ
0839 #define RBD_ALLOC_SIZE_DEFAULT  (64 * 1024)
0840 #define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
0841 #define RBD_READ_ONLY_DEFAULT   false
0842 #define RBD_LOCK_ON_READ_DEFAULT false
0843 #define RBD_EXCLUSIVE_DEFAULT   false
0844 #define RBD_TRIM_DEFAULT    true
0845 
0846 struct rbd_parse_opts_ctx {
0847     struct rbd_spec     *spec;
0848     struct ceph_options *copts;
0849     struct rbd_options  *opts;
0850 };
0851 
0852 static char* obj_op_name(enum obj_operation_type op_type)
0853 {
0854     switch (op_type) {
0855     case OBJ_OP_READ:
0856         return "read";
0857     case OBJ_OP_WRITE:
0858         return "write";
0859     case OBJ_OP_DISCARD:
0860         return "discard";
0861     case OBJ_OP_ZEROOUT:
0862         return "zeroout";
0863     default:
0864         return "???";
0865     }
0866 }
0867 
0868 /*
0869  * Destroy ceph client
0870  *
0871  * Caller must hold rbd_client_list_lock.
0872  */
0873 static void rbd_client_release(struct kref *kref)
0874 {
0875     struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
0876 
0877     dout("%s: rbdc %p\n", __func__, rbdc);
0878     spin_lock(&rbd_client_list_lock);
0879     list_del(&rbdc->node);
0880     spin_unlock(&rbd_client_list_lock);
0881 
0882     ceph_destroy_client(rbdc->client);
0883     kfree(rbdc);
0884 }
0885 
0886 /*
0887  * Drop reference to ceph client node. If it's not referenced anymore, release
0888  * it.
0889  */
0890 static void rbd_put_client(struct rbd_client *rbdc)
0891 {
0892     if (rbdc)
0893         kref_put(&rbdc->kref, rbd_client_release);
0894 }
0895 
0896 /*
0897  * Get a ceph client with specific addr and configuration, if one does
0898  * not exist create it.  Either way, ceph_opts is consumed by this
0899  * function.
0900  */
0901 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
0902 {
0903     struct rbd_client *rbdc;
0904     int ret;
0905 
0906     mutex_lock(&client_mutex);
0907     rbdc = rbd_client_find(ceph_opts);
0908     if (rbdc) {
0909         ceph_destroy_options(ceph_opts);
0910 
0911         /*
0912          * Using an existing client.  Make sure ->pg_pools is up to
0913          * date before we look up the pool id in do_rbd_add().
0914          */
0915         ret = ceph_wait_for_latest_osdmap(rbdc->client,
0916                     rbdc->client->options->mount_timeout);
0917         if (ret) {
0918             rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
0919             rbd_put_client(rbdc);
0920             rbdc = ERR_PTR(ret);
0921         }
0922     } else {
0923         rbdc = rbd_client_create(ceph_opts);
0924     }
0925     mutex_unlock(&client_mutex);
0926 
0927     return rbdc;
0928 }
0929 
0930 static bool rbd_image_format_valid(u32 image_format)
0931 {
0932     return image_format == 1 || image_format == 2;
0933 }
0934 
0935 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
0936 {
0937     size_t size;
0938     u32 snap_count;
0939 
0940     /* The header has to start with the magic rbd header text */
0941     if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
0942         return false;
0943 
0944     /* The bio layer requires at least sector-sized I/O */
0945 
0946     if (ondisk->options.order < SECTOR_SHIFT)
0947         return false;
0948 
0949     /* If we use u64 in a few spots we may be able to loosen this */
0950 
0951     if (ondisk->options.order > 8 * sizeof (int) - 1)
0952         return false;
0953 
0954     /*
0955      * The size of a snapshot header has to fit in a size_t, and
0956      * that limits the number of snapshots.
0957      */
0958     snap_count = le32_to_cpu(ondisk->snap_count);
0959     size = SIZE_MAX - sizeof (struct ceph_snap_context);
0960     if (snap_count > size / sizeof (__le64))
0961         return false;
0962 
0963     /*
0964      * Not only that, but the size of the entire the snapshot
0965      * header must also be representable in a size_t.
0966      */
0967     size -= snap_count * sizeof (__le64);
0968     if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
0969         return false;
0970 
0971     return true;
0972 }
0973 
0974 /*
0975  * returns the size of an object in the image
0976  */
0977 static u32 rbd_obj_bytes(struct rbd_image_header *header)
0978 {
0979     return 1U << header->obj_order;
0980 }
0981 
0982 static void rbd_init_layout(struct rbd_device *rbd_dev)
0983 {
0984     if (rbd_dev->header.stripe_unit == 0 ||
0985         rbd_dev->header.stripe_count == 0) {
0986         rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
0987         rbd_dev->header.stripe_count = 1;
0988     }
0989 
0990     rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
0991     rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
0992     rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
0993     rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
0994               rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
0995     RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
0996 }
0997 
0998 /*
0999  * Fill an rbd image header with information from the given format 1
1000  * on-disk header.
1001  */
1002 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1003                  struct rbd_image_header_ondisk *ondisk)
1004 {
1005     struct rbd_image_header *header = &rbd_dev->header;
1006     bool first_time = header->object_prefix == NULL;
1007     struct ceph_snap_context *snapc;
1008     char *object_prefix = NULL;
1009     char *snap_names = NULL;
1010     u64 *snap_sizes = NULL;
1011     u32 snap_count;
1012     int ret = -ENOMEM;
1013     u32 i;
1014 
1015     /* Allocate this now to avoid having to handle failure below */
1016 
1017     if (first_time) {
1018         object_prefix = kstrndup(ondisk->object_prefix,
1019                      sizeof(ondisk->object_prefix),
1020                      GFP_KERNEL);
1021         if (!object_prefix)
1022             return -ENOMEM;
1023     }
1024 
1025     /* Allocate the snapshot context and fill it in */
1026 
1027     snap_count = le32_to_cpu(ondisk->snap_count);
1028     snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1029     if (!snapc)
1030         goto out_err;
1031     snapc->seq = le64_to_cpu(ondisk->snap_seq);
1032     if (snap_count) {
1033         struct rbd_image_snap_ondisk *snaps;
1034         u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1035 
1036         /* We'll keep a copy of the snapshot names... */
1037 
1038         if (snap_names_len > (u64)SIZE_MAX)
1039             goto out_2big;
1040         snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1041         if (!snap_names)
1042             goto out_err;
1043 
1044         /* ...as well as the array of their sizes. */
1045         snap_sizes = kmalloc_array(snap_count,
1046                        sizeof(*header->snap_sizes),
1047                        GFP_KERNEL);
1048         if (!snap_sizes)
1049             goto out_err;
1050 
1051         /*
1052          * Copy the names, and fill in each snapshot's id
1053          * and size.
1054          *
1055          * Note that rbd_dev_v1_header_info() guarantees the
1056          * ondisk buffer we're working with has
1057          * snap_names_len bytes beyond the end of the
1058          * snapshot id array, this memcpy() is safe.
1059          */
1060         memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1061         snaps = ondisk->snaps;
1062         for (i = 0; i < snap_count; i++) {
1063             snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1064             snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1065         }
1066     }
1067 
1068     /* We won't fail any more, fill in the header */
1069 
1070     if (first_time) {
1071         header->object_prefix = object_prefix;
1072         header->obj_order = ondisk->options.order;
1073         rbd_init_layout(rbd_dev);
1074     } else {
1075         ceph_put_snap_context(header->snapc);
1076         kfree(header->snap_names);
1077         kfree(header->snap_sizes);
1078     }
1079 
1080     /* The remaining fields always get updated (when we refresh) */
1081 
1082     header->image_size = le64_to_cpu(ondisk->image_size);
1083     header->snapc = snapc;
1084     header->snap_names = snap_names;
1085     header->snap_sizes = snap_sizes;
1086 
1087     return 0;
1088 out_2big:
1089     ret = -EIO;
1090 out_err:
1091     kfree(snap_sizes);
1092     kfree(snap_names);
1093     ceph_put_snap_context(snapc);
1094     kfree(object_prefix);
1095 
1096     return ret;
1097 }
1098 
1099 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1100 {
1101     const char *snap_name;
1102 
1103     rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1104 
1105     /* Skip over names until we find the one we are looking for */
1106 
1107     snap_name = rbd_dev->header.snap_names;
1108     while (which--)
1109         snap_name += strlen(snap_name) + 1;
1110 
1111     return kstrdup(snap_name, GFP_KERNEL);
1112 }
1113 
1114 /*
1115  * Snapshot id comparison function for use with qsort()/bsearch().
1116  * Note that result is for snapshots in *descending* order.
1117  */
1118 static int snapid_compare_reverse(const void *s1, const void *s2)
1119 {
1120     u64 snap_id1 = *(u64 *)s1;
1121     u64 snap_id2 = *(u64 *)s2;
1122 
1123     if (snap_id1 < snap_id2)
1124         return 1;
1125     return snap_id1 == snap_id2 ? 0 : -1;
1126 }
1127 
1128 /*
1129  * Search a snapshot context to see if the given snapshot id is
1130  * present.
1131  *
1132  * Returns the position of the snapshot id in the array if it's found,
1133  * or BAD_SNAP_INDEX otherwise.
1134  *
1135  * Note: The snapshot array is in kept sorted (by the osd) in
1136  * reverse order, highest snapshot id first.
1137  */
1138 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1139 {
1140     struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1141     u64 *found;
1142 
1143     found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1144                 sizeof (snap_id), snapid_compare_reverse);
1145 
1146     return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1147 }
1148 
1149 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1150                     u64 snap_id)
1151 {
1152     u32 which;
1153     const char *snap_name;
1154 
1155     which = rbd_dev_snap_index(rbd_dev, snap_id);
1156     if (which == BAD_SNAP_INDEX)
1157         return ERR_PTR(-ENOENT);
1158 
1159     snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1160     return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1161 }
1162 
1163 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1164 {
1165     if (snap_id == CEPH_NOSNAP)
1166         return RBD_SNAP_HEAD_NAME;
1167 
1168     rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1169     if (rbd_dev->image_format == 1)
1170         return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1171 
1172     return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1173 }
1174 
1175 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1176                 u64 *snap_size)
1177 {
1178     rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1179     if (snap_id == CEPH_NOSNAP) {
1180         *snap_size = rbd_dev->header.image_size;
1181     } else if (rbd_dev->image_format == 1) {
1182         u32 which;
1183 
1184         which = rbd_dev_snap_index(rbd_dev, snap_id);
1185         if (which == BAD_SNAP_INDEX)
1186             return -ENOENT;
1187 
1188         *snap_size = rbd_dev->header.snap_sizes[which];
1189     } else {
1190         u64 size = 0;
1191         int ret;
1192 
1193         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1194         if (ret)
1195             return ret;
1196 
1197         *snap_size = size;
1198     }
1199     return 0;
1200 }
1201 
1202 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1203 {
1204     u64 snap_id = rbd_dev->spec->snap_id;
1205     u64 size = 0;
1206     int ret;
1207 
1208     ret = rbd_snap_size(rbd_dev, snap_id, &size);
1209     if (ret)
1210         return ret;
1211 
1212     rbd_dev->mapping.size = size;
1213     return 0;
1214 }
1215 
1216 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1217 {
1218     rbd_dev->mapping.size = 0;
1219 }
1220 
1221 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1222 {
1223     struct ceph_bio_iter it = *bio_pos;
1224 
1225     ceph_bio_iter_advance(&it, off);
1226     ceph_bio_iter_advance_step(&it, bytes, ({
1227         memzero_bvec(&bv);
1228     }));
1229 }
1230 
1231 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1232 {
1233     struct ceph_bvec_iter it = *bvec_pos;
1234 
1235     ceph_bvec_iter_advance(&it, off);
1236     ceph_bvec_iter_advance_step(&it, bytes, ({
1237         memzero_bvec(&bv);
1238     }));
1239 }
1240 
1241 /*
1242  * Zero a range in @obj_req data buffer defined by a bio (list) or
1243  * (private) bio_vec array.
1244  *
1245  * @off is relative to the start of the data buffer.
1246  */
1247 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1248                    u32 bytes)
1249 {
1250     dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1251 
1252     switch (obj_req->img_request->data_type) {
1253     case OBJ_REQUEST_BIO:
1254         zero_bios(&obj_req->bio_pos, off, bytes);
1255         break;
1256     case OBJ_REQUEST_BVECS:
1257     case OBJ_REQUEST_OWN_BVECS:
1258         zero_bvecs(&obj_req->bvec_pos, off, bytes);
1259         break;
1260     default:
1261         BUG();
1262     }
1263 }
1264 
1265 static void rbd_obj_request_destroy(struct kref *kref);
1266 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1267 {
1268     rbd_assert(obj_request != NULL);
1269     dout("%s: obj %p (was %d)\n", __func__, obj_request,
1270         kref_read(&obj_request->kref));
1271     kref_put(&obj_request->kref, rbd_obj_request_destroy);
1272 }
1273 
1274 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1275                     struct rbd_obj_request *obj_request)
1276 {
1277     rbd_assert(obj_request->img_request == NULL);
1278 
1279     /* Image request now owns object's original reference */
1280     obj_request->img_request = img_request;
1281     dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1282 }
1283 
1284 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1285                     struct rbd_obj_request *obj_request)
1286 {
1287     dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1288     list_del(&obj_request->ex.oe_item);
1289     rbd_assert(obj_request->img_request == img_request);
1290     rbd_obj_request_put(obj_request);
1291 }
1292 
1293 static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1294 {
1295     struct rbd_obj_request *obj_req = osd_req->r_priv;
1296 
1297     dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1298          __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1299          obj_req->ex.oe_off, obj_req->ex.oe_len);
1300     ceph_osdc_start_request(osd_req->r_osdc, osd_req);
1301 }
1302 
1303 /*
1304  * The default/initial value for all image request flags is 0.  Each
1305  * is conditionally set to 1 at image request initialization time
1306  * and currently never change thereafter.
1307  */
1308 static void img_request_layered_set(struct rbd_img_request *img_request)
1309 {
1310     set_bit(IMG_REQ_LAYERED, &img_request->flags);
1311 }
1312 
1313 static bool img_request_layered_test(struct rbd_img_request *img_request)
1314 {
1315     return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1316 }
1317 
1318 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1319 {
1320     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1321 
1322     return !obj_req->ex.oe_off &&
1323            obj_req->ex.oe_len == rbd_dev->layout.object_size;
1324 }
1325 
1326 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1327 {
1328     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1329 
1330     return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1331                     rbd_dev->layout.object_size;
1332 }
1333 
1334 /*
1335  * Must be called after rbd_obj_calc_img_extents().
1336  */
1337 static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
1338 {
1339     if (!obj_req->num_img_extents ||
1340         (rbd_obj_is_entire(obj_req) &&
1341          !obj_req->img_request->snapc->num_snaps))
1342         return false;
1343 
1344     return true;
1345 }
1346 
1347 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1348 {
1349     return ceph_file_extents_bytes(obj_req->img_extents,
1350                        obj_req->num_img_extents);
1351 }
1352 
1353 static bool rbd_img_is_write(struct rbd_img_request *img_req)
1354 {
1355     switch (img_req->op_type) {
1356     case OBJ_OP_READ:
1357         return false;
1358     case OBJ_OP_WRITE:
1359     case OBJ_OP_DISCARD:
1360     case OBJ_OP_ZEROOUT:
1361         return true;
1362     default:
1363         BUG();
1364     }
1365 }
1366 
1367 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1368 {
1369     struct rbd_obj_request *obj_req = osd_req->r_priv;
1370     int result;
1371 
1372     dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1373          osd_req->r_result, obj_req);
1374 
1375     /*
1376      * Writes aren't allowed to return a data payload.  In some
1377      * guarded write cases (e.g. stat + zero on an empty object)
1378      * a stat response makes it through, but we don't care.
1379      */
1380     if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1381         result = 0;
1382     else
1383         result = osd_req->r_result;
1384 
1385     rbd_obj_handle_request(obj_req, result);
1386 }
1387 
1388 static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1389 {
1390     struct rbd_obj_request *obj_request = osd_req->r_priv;
1391     struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1392     struct ceph_options *opt = rbd_dev->rbd_client->client->options;
1393 
1394     osd_req->r_flags = CEPH_OSD_FLAG_READ | opt->read_from_replica;
1395     osd_req->r_snapid = obj_request->img_request->snap_id;
1396 }
1397 
1398 static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1399 {
1400     struct rbd_obj_request *obj_request = osd_req->r_priv;
1401 
1402     osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1403     ktime_get_real_ts64(&osd_req->r_mtime);
1404     osd_req->r_data_offset = obj_request->ex.oe_off;
1405 }
1406 
1407 static struct ceph_osd_request *
1408 __rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1409               struct ceph_snap_context *snapc, int num_ops)
1410 {
1411     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1412     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1413     struct ceph_osd_request *req;
1414     const char *name_format = rbd_dev->image_format == 1 ?
1415                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1416     int ret;
1417 
1418     req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1419     if (!req)
1420         return ERR_PTR(-ENOMEM);
1421 
1422     list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1423     req->r_callback = rbd_osd_req_callback;
1424     req->r_priv = obj_req;
1425 
1426     /*
1427      * Data objects may be stored in a separate pool, but always in
1428      * the same namespace in that pool as the header in its pool.
1429      */
1430     ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1431     req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1432 
1433     ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1434                    rbd_dev->header.object_prefix,
1435                    obj_req->ex.oe_objno);
1436     if (ret)
1437         return ERR_PTR(ret);
1438 
1439     return req;
1440 }
1441 
1442 static struct ceph_osd_request *
1443 rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1444 {
1445     return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1446                      num_ops);
1447 }
1448 
1449 static struct rbd_obj_request *rbd_obj_request_create(void)
1450 {
1451     struct rbd_obj_request *obj_request;
1452 
1453     obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1454     if (!obj_request)
1455         return NULL;
1456 
1457     ceph_object_extent_init(&obj_request->ex);
1458     INIT_LIST_HEAD(&obj_request->osd_reqs);
1459     mutex_init(&obj_request->state_mutex);
1460     kref_init(&obj_request->kref);
1461 
1462     dout("%s %p\n", __func__, obj_request);
1463     return obj_request;
1464 }
1465 
1466 static void rbd_obj_request_destroy(struct kref *kref)
1467 {
1468     struct rbd_obj_request *obj_request;
1469     struct ceph_osd_request *osd_req;
1470     u32 i;
1471 
1472     obj_request = container_of(kref, struct rbd_obj_request, kref);
1473 
1474     dout("%s: obj %p\n", __func__, obj_request);
1475 
1476     while (!list_empty(&obj_request->osd_reqs)) {
1477         osd_req = list_first_entry(&obj_request->osd_reqs,
1478                     struct ceph_osd_request, r_private_item);
1479         list_del_init(&osd_req->r_private_item);
1480         ceph_osdc_put_request(osd_req);
1481     }
1482 
1483     switch (obj_request->img_request->data_type) {
1484     case OBJ_REQUEST_NODATA:
1485     case OBJ_REQUEST_BIO:
1486     case OBJ_REQUEST_BVECS:
1487         break;      /* Nothing to do */
1488     case OBJ_REQUEST_OWN_BVECS:
1489         kfree(obj_request->bvec_pos.bvecs);
1490         break;
1491     default:
1492         BUG();
1493     }
1494 
1495     kfree(obj_request->img_extents);
1496     if (obj_request->copyup_bvecs) {
1497         for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1498             if (obj_request->copyup_bvecs[i].bv_page)
1499                 __free_page(obj_request->copyup_bvecs[i].bv_page);
1500         }
1501         kfree(obj_request->copyup_bvecs);
1502     }
1503 
1504     kmem_cache_free(rbd_obj_request_cache, obj_request);
1505 }
1506 
1507 /* It's OK to call this for a device with no parent */
1508 
1509 static void rbd_spec_put(struct rbd_spec *spec);
1510 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1511 {
1512     rbd_dev_remove_parent(rbd_dev);
1513     rbd_spec_put(rbd_dev->parent_spec);
1514     rbd_dev->parent_spec = NULL;
1515     rbd_dev->parent_overlap = 0;
1516 }
1517 
1518 /*
1519  * Parent image reference counting is used to determine when an
1520  * image's parent fields can be safely torn down--after there are no
1521  * more in-flight requests to the parent image.  When the last
1522  * reference is dropped, cleaning them up is safe.
1523  */
1524 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1525 {
1526     int counter;
1527 
1528     if (!rbd_dev->parent_spec)
1529         return;
1530 
1531     counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1532     if (counter > 0)
1533         return;
1534 
1535     /* Last reference; clean up parent data structures */
1536 
1537     if (!counter)
1538         rbd_dev_unparent(rbd_dev);
1539     else
1540         rbd_warn(rbd_dev, "parent reference underflow");
1541 }
1542 
1543 /*
1544  * If an image has a non-zero parent overlap, get a reference to its
1545  * parent.
1546  *
1547  * Returns true if the rbd device has a parent with a non-zero
1548  * overlap and a reference for it was successfully taken, or
1549  * false otherwise.
1550  */
1551 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1552 {
1553     int counter = 0;
1554 
1555     if (!rbd_dev->parent_spec)
1556         return false;
1557 
1558     if (rbd_dev->parent_overlap)
1559         counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1560 
1561     if (counter < 0)
1562         rbd_warn(rbd_dev, "parent reference overflow");
1563 
1564     return counter > 0;
1565 }
1566 
1567 static void rbd_img_request_init(struct rbd_img_request *img_request,
1568                  struct rbd_device *rbd_dev,
1569                  enum obj_operation_type op_type)
1570 {
1571     memset(img_request, 0, sizeof(*img_request));
1572 
1573     img_request->rbd_dev = rbd_dev;
1574     img_request->op_type = op_type;
1575 
1576     INIT_LIST_HEAD(&img_request->lock_item);
1577     INIT_LIST_HEAD(&img_request->object_extents);
1578     mutex_init(&img_request->state_mutex);
1579 }
1580 
1581 static void rbd_img_capture_header(struct rbd_img_request *img_req)
1582 {
1583     struct rbd_device *rbd_dev = img_req->rbd_dev;
1584 
1585     lockdep_assert_held(&rbd_dev->header_rwsem);
1586 
1587     if (rbd_img_is_write(img_req))
1588         img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1589     else
1590         img_req->snap_id = rbd_dev->spec->snap_id;
1591 
1592     if (rbd_dev_parent_get(rbd_dev))
1593         img_request_layered_set(img_req);
1594 }
1595 
1596 static void rbd_img_request_destroy(struct rbd_img_request *img_request)
1597 {
1598     struct rbd_obj_request *obj_request;
1599     struct rbd_obj_request *next_obj_request;
1600 
1601     dout("%s: img %p\n", __func__, img_request);
1602 
1603     WARN_ON(!list_empty(&img_request->lock_item));
1604     for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1605         rbd_img_obj_request_del(img_request, obj_request);
1606 
1607     if (img_request_layered_test(img_request))
1608         rbd_dev_parent_put(img_request->rbd_dev);
1609 
1610     if (rbd_img_is_write(img_request))
1611         ceph_put_snap_context(img_request->snapc);
1612 
1613     if (test_bit(IMG_REQ_CHILD, &img_request->flags))
1614         kmem_cache_free(rbd_img_request_cache, img_request);
1615 }
1616 
1617 #define BITS_PER_OBJ    2
1618 #define OBJS_PER_BYTE   (BITS_PER_BYTE / BITS_PER_OBJ)
1619 #define OBJ_MASK    ((1 << BITS_PER_OBJ) - 1)
1620 
1621 static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1622                    u64 *index, u8 *shift)
1623 {
1624     u32 off;
1625 
1626     rbd_assert(objno < rbd_dev->object_map_size);
1627     *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1628     *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1629 }
1630 
1631 static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1632 {
1633     u64 index;
1634     u8 shift;
1635 
1636     lockdep_assert_held(&rbd_dev->object_map_lock);
1637     __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1638     return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1639 }
1640 
1641 static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1642 {
1643     u64 index;
1644     u8 shift;
1645     u8 *p;
1646 
1647     lockdep_assert_held(&rbd_dev->object_map_lock);
1648     rbd_assert(!(val & ~OBJ_MASK));
1649 
1650     __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1651     p = &rbd_dev->object_map[index];
1652     *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1653 }
1654 
1655 static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1656 {
1657     u8 state;
1658 
1659     spin_lock(&rbd_dev->object_map_lock);
1660     state = __rbd_object_map_get(rbd_dev, objno);
1661     spin_unlock(&rbd_dev->object_map_lock);
1662     return state;
1663 }
1664 
1665 static bool use_object_map(struct rbd_device *rbd_dev)
1666 {
1667     /*
1668      * An image mapped read-only can't use the object map -- it isn't
1669      * loaded because the header lock isn't acquired.  Someone else can
1670      * write to the image and update the object map behind our back.
1671      *
1672      * A snapshot can't be written to, so using the object map is always
1673      * safe.
1674      */
1675     if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev))
1676         return false;
1677 
1678     return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1679         !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1680 }
1681 
1682 static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1683 {
1684     u8 state;
1685 
1686     /* fall back to default logic if object map is disabled or invalid */
1687     if (!use_object_map(rbd_dev))
1688         return true;
1689 
1690     state = rbd_object_map_get(rbd_dev, objno);
1691     return state != OBJECT_NONEXISTENT;
1692 }
1693 
1694 static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1695                 struct ceph_object_id *oid)
1696 {
1697     if (snap_id == CEPH_NOSNAP)
1698         ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1699                 rbd_dev->spec->image_id);
1700     else
1701         ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1702                 rbd_dev->spec->image_id, snap_id);
1703 }
1704 
1705 static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1706 {
1707     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1708     CEPH_DEFINE_OID_ONSTACK(oid);
1709     u8 lock_type;
1710     char *lock_tag;
1711     struct ceph_locker *lockers;
1712     u32 num_lockers;
1713     bool broke_lock = false;
1714     int ret;
1715 
1716     rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1717 
1718 again:
1719     ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1720                 CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1721     if (ret != -EBUSY || broke_lock) {
1722         if (ret == -EEXIST)
1723             ret = 0; /* already locked by myself */
1724         if (ret)
1725             rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1726         return ret;
1727     }
1728 
1729     ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1730                  RBD_LOCK_NAME, &lock_type, &lock_tag,
1731                  &lockers, &num_lockers);
1732     if (ret) {
1733         if (ret == -ENOENT)
1734             goto again;
1735 
1736         rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1737         return ret;
1738     }
1739 
1740     kfree(lock_tag);
1741     if (num_lockers == 0)
1742         goto again;
1743 
1744     rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1745          ENTITY_NAME(lockers[0].id.name));
1746 
1747     ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1748                   RBD_LOCK_NAME, lockers[0].id.cookie,
1749                   &lockers[0].id.name);
1750     ceph_free_lockers(lockers, num_lockers);
1751     if (ret) {
1752         if (ret == -ENOENT)
1753             goto again;
1754 
1755         rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1756         return ret;
1757     }
1758 
1759     broke_lock = true;
1760     goto again;
1761 }
1762 
1763 static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1764 {
1765     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1766     CEPH_DEFINE_OID_ONSTACK(oid);
1767     int ret;
1768 
1769     rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1770 
1771     ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1772                   "");
1773     if (ret && ret != -ENOENT)
1774         rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1775 }
1776 
1777 static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1778 {
1779     u8 struct_v;
1780     u32 struct_len;
1781     u32 header_len;
1782     void *header_end;
1783     int ret;
1784 
1785     ceph_decode_32_safe(p, end, header_len, e_inval);
1786     header_end = *p + header_len;
1787 
1788     ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1789                   &struct_len);
1790     if (ret)
1791         return ret;
1792 
1793     ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1794 
1795     *p = header_end;
1796     return 0;
1797 
1798 e_inval:
1799     return -EINVAL;
1800 }
1801 
1802 static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1803 {
1804     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1805     CEPH_DEFINE_OID_ONSTACK(oid);
1806     struct page **pages;
1807     void *p, *end;
1808     size_t reply_len;
1809     u64 num_objects;
1810     u64 object_map_bytes;
1811     u64 object_map_size;
1812     int num_pages;
1813     int ret;
1814 
1815     rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1816 
1817     num_objects = ceph_get_num_objects(&rbd_dev->layout,
1818                        rbd_dev->mapping.size);
1819     object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1820                         BITS_PER_BYTE);
1821     num_pages = calc_pages_for(0, object_map_bytes) + 1;
1822     pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1823     if (IS_ERR(pages))
1824         return PTR_ERR(pages);
1825 
1826     reply_len = num_pages * PAGE_SIZE;
1827     rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1828     ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1829                  "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1830                  NULL, 0, pages, &reply_len);
1831     if (ret)
1832         goto out;
1833 
1834     p = page_address(pages[0]);
1835     end = p + min(reply_len, (size_t)PAGE_SIZE);
1836     ret = decode_object_map_header(&p, end, &object_map_size);
1837     if (ret)
1838         goto out;
1839 
1840     if (object_map_size != num_objects) {
1841         rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
1842              object_map_size, num_objects);
1843         ret = -EINVAL;
1844         goto out;
1845     }
1846 
1847     if (offset_in_page(p) + object_map_bytes > reply_len) {
1848         ret = -EINVAL;
1849         goto out;
1850     }
1851 
1852     rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
1853     if (!rbd_dev->object_map) {
1854         ret = -ENOMEM;
1855         goto out;
1856     }
1857 
1858     rbd_dev->object_map_size = object_map_size;
1859     ceph_copy_from_page_vector(pages, rbd_dev->object_map,
1860                    offset_in_page(p), object_map_bytes);
1861 
1862 out:
1863     ceph_release_page_vector(pages, num_pages);
1864     return ret;
1865 }
1866 
1867 static void rbd_object_map_free(struct rbd_device *rbd_dev)
1868 {
1869     kvfree(rbd_dev->object_map);
1870     rbd_dev->object_map = NULL;
1871     rbd_dev->object_map_size = 0;
1872 }
1873 
1874 static int rbd_object_map_load(struct rbd_device *rbd_dev)
1875 {
1876     int ret;
1877 
1878     ret = __rbd_object_map_load(rbd_dev);
1879     if (ret)
1880         return ret;
1881 
1882     ret = rbd_dev_v2_get_flags(rbd_dev);
1883     if (ret) {
1884         rbd_object_map_free(rbd_dev);
1885         return ret;
1886     }
1887 
1888     if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
1889         rbd_warn(rbd_dev, "object map is invalid");
1890 
1891     return 0;
1892 }
1893 
1894 static int rbd_object_map_open(struct rbd_device *rbd_dev)
1895 {
1896     int ret;
1897 
1898     ret = rbd_object_map_lock(rbd_dev);
1899     if (ret)
1900         return ret;
1901 
1902     ret = rbd_object_map_load(rbd_dev);
1903     if (ret) {
1904         rbd_object_map_unlock(rbd_dev);
1905         return ret;
1906     }
1907 
1908     return 0;
1909 }
1910 
1911 static void rbd_object_map_close(struct rbd_device *rbd_dev)
1912 {
1913     rbd_object_map_free(rbd_dev);
1914     rbd_object_map_unlock(rbd_dev);
1915 }
1916 
1917 /*
1918  * This function needs snap_id (or more precisely just something to
1919  * distinguish between HEAD and snapshot object maps), new_state and
1920  * current_state that were passed to rbd_object_map_update().
1921  *
1922  * To avoid allocating and stashing a context we piggyback on the OSD
1923  * request.  A HEAD update has two ops (assert_locked).  For new_state
1924  * and current_state we decode our own object_map_update op, encoded in
1925  * rbd_cls_object_map_update().
1926  */
1927 static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
1928                     struct ceph_osd_request *osd_req)
1929 {
1930     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1931     struct ceph_osd_data *osd_data;
1932     u64 objno;
1933     u8 state, new_state, current_state;
1934     bool has_current_state;
1935     void *p;
1936 
1937     if (osd_req->r_result)
1938         return osd_req->r_result;
1939 
1940     /*
1941      * Nothing to do for a snapshot object map.
1942      */
1943     if (osd_req->r_num_ops == 1)
1944         return 0;
1945 
1946     /*
1947      * Update in-memory HEAD object map.
1948      */
1949     rbd_assert(osd_req->r_num_ops == 2);
1950     osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
1951     rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
1952 
1953     p = page_address(osd_data->pages[0]);
1954     objno = ceph_decode_64(&p);
1955     rbd_assert(objno == obj_req->ex.oe_objno);
1956     rbd_assert(ceph_decode_64(&p) == objno + 1);
1957     new_state = ceph_decode_8(&p);
1958     has_current_state = ceph_decode_8(&p);
1959     if (has_current_state)
1960         current_state = ceph_decode_8(&p);
1961 
1962     spin_lock(&rbd_dev->object_map_lock);
1963     state = __rbd_object_map_get(rbd_dev, objno);
1964     if (!has_current_state || current_state == state ||
1965         (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
1966         __rbd_object_map_set(rbd_dev, objno, new_state);
1967     spin_unlock(&rbd_dev->object_map_lock);
1968 
1969     return 0;
1970 }
1971 
1972 static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
1973 {
1974     struct rbd_obj_request *obj_req = osd_req->r_priv;
1975     int result;
1976 
1977     dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1978          osd_req->r_result, obj_req);
1979 
1980     result = rbd_object_map_update_finish(obj_req, osd_req);
1981     rbd_obj_handle_request(obj_req, result);
1982 }
1983 
1984 static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
1985 {
1986     u8 state = rbd_object_map_get(rbd_dev, objno);
1987 
1988     if (state == new_state ||
1989         (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
1990         (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
1991         return false;
1992 
1993     return true;
1994 }
1995 
1996 static int rbd_cls_object_map_update(struct ceph_osd_request *req,
1997                      int which, u64 objno, u8 new_state,
1998                      const u8 *current_state)
1999 {
2000     struct page **pages;
2001     void *p, *start;
2002     int ret;
2003 
2004     ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2005     if (ret)
2006         return ret;
2007 
2008     pages = ceph_alloc_page_vector(1, GFP_NOIO);
2009     if (IS_ERR(pages))
2010         return PTR_ERR(pages);
2011 
2012     p = start = page_address(pages[0]);
2013     ceph_encode_64(&p, objno);
2014     ceph_encode_64(&p, objno + 1);
2015     ceph_encode_8(&p, new_state);
2016     if (current_state) {
2017         ceph_encode_8(&p, 1);
2018         ceph_encode_8(&p, *current_state);
2019     } else {
2020         ceph_encode_8(&p, 0);
2021     }
2022 
2023     osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2024                       false, true);
2025     return 0;
2026 }
2027 
2028 /*
2029  * Return:
2030  *   0 - object map update sent
2031  *   1 - object map update isn't needed
2032  *  <0 - error
2033  */
2034 static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2035                  u8 new_state, const u8 *current_state)
2036 {
2037     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2038     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2039     struct ceph_osd_request *req;
2040     int num_ops = 1;
2041     int which = 0;
2042     int ret;
2043 
2044     if (snap_id == CEPH_NOSNAP) {
2045         if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2046             return 1;
2047 
2048         num_ops++; /* assert_locked */
2049     }
2050 
2051     req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2052     if (!req)
2053         return -ENOMEM;
2054 
2055     list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2056     req->r_callback = rbd_object_map_callback;
2057     req->r_priv = obj_req;
2058 
2059     rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2060     ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2061     req->r_flags = CEPH_OSD_FLAG_WRITE;
2062     ktime_get_real_ts64(&req->r_mtime);
2063 
2064     if (snap_id == CEPH_NOSNAP) {
2065         /*
2066          * Protect against possible race conditions during lock
2067          * ownership transitions.
2068          */
2069         ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2070                          CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2071         if (ret)
2072             return ret;
2073     }
2074 
2075     ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2076                     new_state, current_state);
2077     if (ret)
2078         return ret;
2079 
2080     ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2081     if (ret)
2082         return ret;
2083 
2084     ceph_osdc_start_request(osdc, req);
2085     return 0;
2086 }
2087 
2088 static void prune_extents(struct ceph_file_extent *img_extents,
2089               u32 *num_img_extents, u64 overlap)
2090 {
2091     u32 cnt = *num_img_extents;
2092 
2093     /* drop extents completely beyond the overlap */
2094     while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2095         cnt--;
2096 
2097     if (cnt) {
2098         struct ceph_file_extent *ex = &img_extents[cnt - 1];
2099 
2100         /* trim final overlapping extent */
2101         if (ex->fe_off + ex->fe_len > overlap)
2102             ex->fe_len = overlap - ex->fe_off;
2103     }
2104 
2105     *num_img_extents = cnt;
2106 }
2107 
2108 /*
2109  * Determine the byte range(s) covered by either just the object extent
2110  * or the entire object in the parent image.
2111  */
2112 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2113                     bool entire)
2114 {
2115     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2116     int ret;
2117 
2118     if (!rbd_dev->parent_overlap)
2119         return 0;
2120 
2121     ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2122                   entire ? 0 : obj_req->ex.oe_off,
2123                   entire ? rbd_dev->layout.object_size :
2124                             obj_req->ex.oe_len,
2125                   &obj_req->img_extents,
2126                   &obj_req->num_img_extents);
2127     if (ret)
2128         return ret;
2129 
2130     prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2131               rbd_dev->parent_overlap);
2132     return 0;
2133 }
2134 
2135 static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2136 {
2137     struct rbd_obj_request *obj_req = osd_req->r_priv;
2138 
2139     switch (obj_req->img_request->data_type) {
2140     case OBJ_REQUEST_BIO:
2141         osd_req_op_extent_osd_data_bio(osd_req, which,
2142                            &obj_req->bio_pos,
2143                            obj_req->ex.oe_len);
2144         break;
2145     case OBJ_REQUEST_BVECS:
2146     case OBJ_REQUEST_OWN_BVECS:
2147         rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2148                             obj_req->ex.oe_len);
2149         rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2150         osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2151                             &obj_req->bvec_pos);
2152         break;
2153     default:
2154         BUG();
2155     }
2156 }
2157 
2158 static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2159 {
2160     struct page **pages;
2161 
2162     /*
2163      * The response data for a STAT call consists of:
2164      *     le64 length;
2165      *     struct {
2166      *         le32 tv_sec;
2167      *         le32 tv_nsec;
2168      *     } mtime;
2169      */
2170     pages = ceph_alloc_page_vector(1, GFP_NOIO);
2171     if (IS_ERR(pages))
2172         return PTR_ERR(pages);
2173 
2174     osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2175     osd_req_op_raw_data_in_pages(osd_req, which, pages,
2176                      8 + sizeof(struct ceph_timespec),
2177                      0, false, true);
2178     return 0;
2179 }
2180 
2181 static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2182                 u32 bytes)
2183 {
2184     struct rbd_obj_request *obj_req = osd_req->r_priv;
2185     int ret;
2186 
2187     ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2188     if (ret)
2189         return ret;
2190 
2191     osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2192                       obj_req->copyup_bvec_count, bytes);
2193     return 0;
2194 }
2195 
2196 static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2197 {
2198     obj_req->read_state = RBD_OBJ_READ_START;
2199     return 0;
2200 }
2201 
2202 static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2203                       int which)
2204 {
2205     struct rbd_obj_request *obj_req = osd_req->r_priv;
2206     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2207     u16 opcode;
2208 
2209     if (!use_object_map(rbd_dev) ||
2210         !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2211         osd_req_op_alloc_hint_init(osd_req, which++,
2212                        rbd_dev->layout.object_size,
2213                        rbd_dev->layout.object_size,
2214                        rbd_dev->opts->alloc_hint_flags);
2215     }
2216 
2217     if (rbd_obj_is_entire(obj_req))
2218         opcode = CEPH_OSD_OP_WRITEFULL;
2219     else
2220         opcode = CEPH_OSD_OP_WRITE;
2221 
2222     osd_req_op_extent_init(osd_req, which, opcode,
2223                    obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2224     rbd_osd_setup_data(osd_req, which);
2225 }
2226 
2227 static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2228 {
2229     int ret;
2230 
2231     /* reverse map the entire object onto the parent */
2232     ret = rbd_obj_calc_img_extents(obj_req, true);
2233     if (ret)
2234         return ret;
2235 
2236     if (rbd_obj_copyup_enabled(obj_req))
2237         obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2238 
2239     obj_req->write_state = RBD_OBJ_WRITE_START;
2240     return 0;
2241 }
2242 
2243 static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2244 {
2245     return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2246                       CEPH_OSD_OP_ZERO;
2247 }
2248 
2249 static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2250                     int which)
2251 {
2252     struct rbd_obj_request *obj_req = osd_req->r_priv;
2253 
2254     if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2255         rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2256         osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2257     } else {
2258         osd_req_op_extent_init(osd_req, which,
2259                        truncate_or_zero_opcode(obj_req),
2260                        obj_req->ex.oe_off, obj_req->ex.oe_len,
2261                        0, 0);
2262     }
2263 }
2264 
2265 static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2266 {
2267     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2268     u64 off, next_off;
2269     int ret;
2270 
2271     /*
2272      * Align the range to alloc_size boundary and punt on discards
2273      * that are too small to free up any space.
2274      *
2275      * alloc_size == object_size && is_tail() is a special case for
2276      * filestore with filestore_punch_hole = false, needed to allow
2277      * truncate (in addition to delete).
2278      */
2279     if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2280         !rbd_obj_is_tail(obj_req)) {
2281         off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2282         next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2283                       rbd_dev->opts->alloc_size);
2284         if (off >= next_off)
2285             return 1;
2286 
2287         dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2288              obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2289              off, next_off - off);
2290         obj_req->ex.oe_off = off;
2291         obj_req->ex.oe_len = next_off - off;
2292     }
2293 
2294     /* reverse map the entire object onto the parent */
2295     ret = rbd_obj_calc_img_extents(obj_req, true);
2296     if (ret)
2297         return ret;
2298 
2299     obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2300     if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2301         obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2302 
2303     obj_req->write_state = RBD_OBJ_WRITE_START;
2304     return 0;
2305 }
2306 
2307 static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2308                     int which)
2309 {
2310     struct rbd_obj_request *obj_req = osd_req->r_priv;
2311     u16 opcode;
2312 
2313     if (rbd_obj_is_entire(obj_req)) {
2314         if (obj_req->num_img_extents) {
2315             if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2316                 osd_req_op_init(osd_req, which++,
2317                         CEPH_OSD_OP_CREATE, 0);
2318             opcode = CEPH_OSD_OP_TRUNCATE;
2319         } else {
2320             rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2321             osd_req_op_init(osd_req, which++,
2322                     CEPH_OSD_OP_DELETE, 0);
2323             opcode = 0;
2324         }
2325     } else {
2326         opcode = truncate_or_zero_opcode(obj_req);
2327     }
2328 
2329     if (opcode)
2330         osd_req_op_extent_init(osd_req, which, opcode,
2331                        obj_req->ex.oe_off, obj_req->ex.oe_len,
2332                        0, 0);
2333 }
2334 
2335 static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2336 {
2337     int ret;
2338 
2339     /* reverse map the entire object onto the parent */
2340     ret = rbd_obj_calc_img_extents(obj_req, true);
2341     if (ret)
2342         return ret;
2343 
2344     if (rbd_obj_copyup_enabled(obj_req))
2345         obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2346     if (!obj_req->num_img_extents) {
2347         obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2348         if (rbd_obj_is_entire(obj_req))
2349             obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2350     }
2351 
2352     obj_req->write_state = RBD_OBJ_WRITE_START;
2353     return 0;
2354 }
2355 
2356 static int count_write_ops(struct rbd_obj_request *obj_req)
2357 {
2358     struct rbd_img_request *img_req = obj_req->img_request;
2359 
2360     switch (img_req->op_type) {
2361     case OBJ_OP_WRITE:
2362         if (!use_object_map(img_req->rbd_dev) ||
2363             !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2364             return 2; /* setallochint + write/writefull */
2365 
2366         return 1; /* write/writefull */
2367     case OBJ_OP_DISCARD:
2368         return 1; /* delete/truncate/zero */
2369     case OBJ_OP_ZEROOUT:
2370         if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2371             !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2372             return 2; /* create + truncate */
2373 
2374         return 1; /* delete/truncate/zero */
2375     default:
2376         BUG();
2377     }
2378 }
2379 
2380 static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2381                     int which)
2382 {
2383     struct rbd_obj_request *obj_req = osd_req->r_priv;
2384 
2385     switch (obj_req->img_request->op_type) {
2386     case OBJ_OP_WRITE:
2387         __rbd_osd_setup_write_ops(osd_req, which);
2388         break;
2389     case OBJ_OP_DISCARD:
2390         __rbd_osd_setup_discard_ops(osd_req, which);
2391         break;
2392     case OBJ_OP_ZEROOUT:
2393         __rbd_osd_setup_zeroout_ops(osd_req, which);
2394         break;
2395     default:
2396         BUG();
2397     }
2398 }
2399 
2400 /*
2401  * Prune the list of object requests (adjust offset and/or length, drop
2402  * redundant requests).  Prepare object request state machines and image
2403  * request state machine for execution.
2404  */
2405 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2406 {
2407     struct rbd_obj_request *obj_req, *next_obj_req;
2408     int ret;
2409 
2410     for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2411         switch (img_req->op_type) {
2412         case OBJ_OP_READ:
2413             ret = rbd_obj_init_read(obj_req);
2414             break;
2415         case OBJ_OP_WRITE:
2416             ret = rbd_obj_init_write(obj_req);
2417             break;
2418         case OBJ_OP_DISCARD:
2419             ret = rbd_obj_init_discard(obj_req);
2420             break;
2421         case OBJ_OP_ZEROOUT:
2422             ret = rbd_obj_init_zeroout(obj_req);
2423             break;
2424         default:
2425             BUG();
2426         }
2427         if (ret < 0)
2428             return ret;
2429         if (ret > 0) {
2430             rbd_img_obj_request_del(img_req, obj_req);
2431             continue;
2432         }
2433     }
2434 
2435     img_req->state = RBD_IMG_START;
2436     return 0;
2437 }
2438 
2439 union rbd_img_fill_iter {
2440     struct ceph_bio_iter    bio_iter;
2441     struct ceph_bvec_iter   bvec_iter;
2442 };
2443 
2444 struct rbd_img_fill_ctx {
2445     enum obj_request_type   pos_type;
2446     union rbd_img_fill_iter *pos;
2447     union rbd_img_fill_iter iter;
2448     ceph_object_extent_fn_t set_pos_fn;
2449     ceph_object_extent_fn_t count_fn;
2450     ceph_object_extent_fn_t copy_fn;
2451 };
2452 
2453 static struct ceph_object_extent *alloc_object_extent(void *arg)
2454 {
2455     struct rbd_img_request *img_req = arg;
2456     struct rbd_obj_request *obj_req;
2457 
2458     obj_req = rbd_obj_request_create();
2459     if (!obj_req)
2460         return NULL;
2461 
2462     rbd_img_obj_request_add(img_req, obj_req);
2463     return &obj_req->ex;
2464 }
2465 
2466 /*
2467  * While su != os && sc == 1 is technically not fancy (it's the same
2468  * layout as su == os && sc == 1), we can't use the nocopy path for it
2469  * because ->set_pos_fn() should be called only once per object.
2470  * ceph_file_to_extents() invokes action_fn once per stripe unit, so
2471  * treat su != os && sc == 1 as fancy.
2472  */
2473 static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2474 {
2475     return l->stripe_unit != l->object_size;
2476 }
2477 
2478 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2479                        struct ceph_file_extent *img_extents,
2480                        u32 num_img_extents,
2481                        struct rbd_img_fill_ctx *fctx)
2482 {
2483     u32 i;
2484     int ret;
2485 
2486     img_req->data_type = fctx->pos_type;
2487 
2488     /*
2489      * Create object requests and set each object request's starting
2490      * position in the provided bio (list) or bio_vec array.
2491      */
2492     fctx->iter = *fctx->pos;
2493     for (i = 0; i < num_img_extents; i++) {
2494         ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2495                        img_extents[i].fe_off,
2496                        img_extents[i].fe_len,
2497                        &img_req->object_extents,
2498                        alloc_object_extent, img_req,
2499                        fctx->set_pos_fn, &fctx->iter);
2500         if (ret)
2501             return ret;
2502     }
2503 
2504     return __rbd_img_fill_request(img_req);
2505 }
2506 
2507 /*
2508  * Map a list of image extents to a list of object extents, create the
2509  * corresponding object requests (normally each to a different object,
2510  * but not always) and add them to @img_req.  For each object request,
2511  * set up its data descriptor to point to the corresponding chunk(s) of
2512  * @fctx->pos data buffer.
2513  *
2514  * Because ceph_file_to_extents() will merge adjacent object extents
2515  * together, each object request's data descriptor may point to multiple
2516  * different chunks of @fctx->pos data buffer.
2517  *
2518  * @fctx->pos data buffer is assumed to be large enough.
2519  */
2520 static int rbd_img_fill_request(struct rbd_img_request *img_req,
2521                 struct ceph_file_extent *img_extents,
2522                 u32 num_img_extents,
2523                 struct rbd_img_fill_ctx *fctx)
2524 {
2525     struct rbd_device *rbd_dev = img_req->rbd_dev;
2526     struct rbd_obj_request *obj_req;
2527     u32 i;
2528     int ret;
2529 
2530     if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2531         !rbd_layout_is_fancy(&rbd_dev->layout))
2532         return rbd_img_fill_request_nocopy(img_req, img_extents,
2533                            num_img_extents, fctx);
2534 
2535     img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2536 
2537     /*
2538      * Create object requests and determine ->bvec_count for each object
2539      * request.  Note that ->bvec_count sum over all object requests may
2540      * be greater than the number of bio_vecs in the provided bio (list)
2541      * or bio_vec array because when mapped, those bio_vecs can straddle
2542      * stripe unit boundaries.
2543      */
2544     fctx->iter = *fctx->pos;
2545     for (i = 0; i < num_img_extents; i++) {
2546         ret = ceph_file_to_extents(&rbd_dev->layout,
2547                        img_extents[i].fe_off,
2548                        img_extents[i].fe_len,
2549                        &img_req->object_extents,
2550                        alloc_object_extent, img_req,
2551                        fctx->count_fn, &fctx->iter);
2552         if (ret)
2553             return ret;
2554     }
2555 
2556     for_each_obj_request(img_req, obj_req) {
2557         obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2558                           sizeof(*obj_req->bvec_pos.bvecs),
2559                           GFP_NOIO);
2560         if (!obj_req->bvec_pos.bvecs)
2561             return -ENOMEM;
2562     }
2563 
2564     /*
2565      * Fill in each object request's private bio_vec array, splitting and
2566      * rearranging the provided bio_vecs in stripe unit chunks as needed.
2567      */
2568     fctx->iter = *fctx->pos;
2569     for (i = 0; i < num_img_extents; i++) {
2570         ret = ceph_iterate_extents(&rbd_dev->layout,
2571                        img_extents[i].fe_off,
2572                        img_extents[i].fe_len,
2573                        &img_req->object_extents,
2574                        fctx->copy_fn, &fctx->iter);
2575         if (ret)
2576             return ret;
2577     }
2578 
2579     return __rbd_img_fill_request(img_req);
2580 }
2581 
2582 static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2583                    u64 off, u64 len)
2584 {
2585     struct ceph_file_extent ex = { off, len };
2586     union rbd_img_fill_iter dummy = {};
2587     struct rbd_img_fill_ctx fctx = {
2588         .pos_type = OBJ_REQUEST_NODATA,
2589         .pos = &dummy,
2590     };
2591 
2592     return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2593 }
2594 
2595 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2596 {
2597     struct rbd_obj_request *obj_req =
2598         container_of(ex, struct rbd_obj_request, ex);
2599     struct ceph_bio_iter *it = arg;
2600 
2601     dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2602     obj_req->bio_pos = *it;
2603     ceph_bio_iter_advance(it, bytes);
2604 }
2605 
2606 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2607 {
2608     struct rbd_obj_request *obj_req =
2609         container_of(ex, struct rbd_obj_request, ex);
2610     struct ceph_bio_iter *it = arg;
2611 
2612     dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2613     ceph_bio_iter_advance_step(it, bytes, ({
2614         obj_req->bvec_count++;
2615     }));
2616 
2617 }
2618 
2619 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2620 {
2621     struct rbd_obj_request *obj_req =
2622         container_of(ex, struct rbd_obj_request, ex);
2623     struct ceph_bio_iter *it = arg;
2624 
2625     dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2626     ceph_bio_iter_advance_step(it, bytes, ({
2627         obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2628         obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2629     }));
2630 }
2631 
2632 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2633                    struct ceph_file_extent *img_extents,
2634                    u32 num_img_extents,
2635                    struct ceph_bio_iter *bio_pos)
2636 {
2637     struct rbd_img_fill_ctx fctx = {
2638         .pos_type = OBJ_REQUEST_BIO,
2639         .pos = (union rbd_img_fill_iter *)bio_pos,
2640         .set_pos_fn = set_bio_pos,
2641         .count_fn = count_bio_bvecs,
2642         .copy_fn = copy_bio_bvecs,
2643     };
2644 
2645     return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2646                     &fctx);
2647 }
2648 
2649 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2650                  u64 off, u64 len, struct bio *bio)
2651 {
2652     struct ceph_file_extent ex = { off, len };
2653     struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2654 
2655     return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2656 }
2657 
2658 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2659 {
2660     struct rbd_obj_request *obj_req =
2661         container_of(ex, struct rbd_obj_request, ex);
2662     struct ceph_bvec_iter *it = arg;
2663 
2664     obj_req->bvec_pos = *it;
2665     ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2666     ceph_bvec_iter_advance(it, bytes);
2667 }
2668 
2669 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2670 {
2671     struct rbd_obj_request *obj_req =
2672         container_of(ex, struct rbd_obj_request, ex);
2673     struct ceph_bvec_iter *it = arg;
2674 
2675     ceph_bvec_iter_advance_step(it, bytes, ({
2676         obj_req->bvec_count++;
2677     }));
2678 }
2679 
2680 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2681 {
2682     struct rbd_obj_request *obj_req =
2683         container_of(ex, struct rbd_obj_request, ex);
2684     struct ceph_bvec_iter *it = arg;
2685 
2686     ceph_bvec_iter_advance_step(it, bytes, ({
2687         obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2688         obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2689     }));
2690 }
2691 
2692 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2693                      struct ceph_file_extent *img_extents,
2694                      u32 num_img_extents,
2695                      struct ceph_bvec_iter *bvec_pos)
2696 {
2697     struct rbd_img_fill_ctx fctx = {
2698         .pos_type = OBJ_REQUEST_BVECS,
2699         .pos = (union rbd_img_fill_iter *)bvec_pos,
2700         .set_pos_fn = set_bvec_pos,
2701         .count_fn = count_bvecs,
2702         .copy_fn = copy_bvecs,
2703     };
2704 
2705     return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2706                     &fctx);
2707 }
2708 
2709 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2710                    struct ceph_file_extent *img_extents,
2711                    u32 num_img_extents,
2712                    struct bio_vec *bvecs)
2713 {
2714     struct ceph_bvec_iter it = {
2715         .bvecs = bvecs,
2716         .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2717                                  num_img_extents) },
2718     };
2719 
2720     return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2721                      &it);
2722 }
2723 
2724 static void rbd_img_handle_request_work(struct work_struct *work)
2725 {
2726     struct rbd_img_request *img_req =
2727         container_of(work, struct rbd_img_request, work);
2728 
2729     rbd_img_handle_request(img_req, img_req->work_result);
2730 }
2731 
2732 static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2733 {
2734     INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2735     img_req->work_result = result;
2736     queue_work(rbd_wq, &img_req->work);
2737 }
2738 
2739 static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2740 {
2741     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2742 
2743     if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2744         obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2745         return true;
2746     }
2747 
2748     dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2749          obj_req->ex.oe_objno);
2750     return false;
2751 }
2752 
2753 static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2754 {
2755     struct ceph_osd_request *osd_req;
2756     int ret;
2757 
2758     osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2759     if (IS_ERR(osd_req))
2760         return PTR_ERR(osd_req);
2761 
2762     osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2763                    obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2764     rbd_osd_setup_data(osd_req, 0);
2765     rbd_osd_format_read(osd_req);
2766 
2767     ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2768     if (ret)
2769         return ret;
2770 
2771     rbd_osd_submit(osd_req);
2772     return 0;
2773 }
2774 
2775 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2776 {
2777     struct rbd_img_request *img_req = obj_req->img_request;
2778     struct rbd_device *parent = img_req->rbd_dev->parent;
2779     struct rbd_img_request *child_img_req;
2780     int ret;
2781 
2782     child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2783     if (!child_img_req)
2784         return -ENOMEM;
2785 
2786     rbd_img_request_init(child_img_req, parent, OBJ_OP_READ);
2787     __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2788     child_img_req->obj_request = obj_req;
2789 
2790     down_read(&parent->header_rwsem);
2791     rbd_img_capture_header(child_img_req);
2792     up_read(&parent->header_rwsem);
2793 
2794     dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2795          obj_req);
2796 
2797     if (!rbd_img_is_write(img_req)) {
2798         switch (img_req->data_type) {
2799         case OBJ_REQUEST_BIO:
2800             ret = __rbd_img_fill_from_bio(child_img_req,
2801                               obj_req->img_extents,
2802                               obj_req->num_img_extents,
2803                               &obj_req->bio_pos);
2804             break;
2805         case OBJ_REQUEST_BVECS:
2806         case OBJ_REQUEST_OWN_BVECS:
2807             ret = __rbd_img_fill_from_bvecs(child_img_req,
2808                               obj_req->img_extents,
2809                               obj_req->num_img_extents,
2810                               &obj_req->bvec_pos);
2811             break;
2812         default:
2813             BUG();
2814         }
2815     } else {
2816         ret = rbd_img_fill_from_bvecs(child_img_req,
2817                           obj_req->img_extents,
2818                           obj_req->num_img_extents,
2819                           obj_req->copyup_bvecs);
2820     }
2821     if (ret) {
2822         rbd_img_request_destroy(child_img_req);
2823         return ret;
2824     }
2825 
2826     /* avoid parent chain recursion */
2827     rbd_img_schedule(child_img_req, 0);
2828     return 0;
2829 }
2830 
2831 static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2832 {
2833     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2834     int ret;
2835 
2836 again:
2837     switch (obj_req->read_state) {
2838     case RBD_OBJ_READ_START:
2839         rbd_assert(!*result);
2840 
2841         if (!rbd_obj_may_exist(obj_req)) {
2842             *result = -ENOENT;
2843             obj_req->read_state = RBD_OBJ_READ_OBJECT;
2844             goto again;
2845         }
2846 
2847         ret = rbd_obj_read_object(obj_req);
2848         if (ret) {
2849             *result = ret;
2850             return true;
2851         }
2852         obj_req->read_state = RBD_OBJ_READ_OBJECT;
2853         return false;
2854     case RBD_OBJ_READ_OBJECT:
2855         if (*result == -ENOENT && rbd_dev->parent_overlap) {
2856             /* reverse map this object extent onto the parent */
2857             ret = rbd_obj_calc_img_extents(obj_req, false);
2858             if (ret) {
2859                 *result = ret;
2860                 return true;
2861             }
2862             if (obj_req->num_img_extents) {
2863                 ret = rbd_obj_read_from_parent(obj_req);
2864                 if (ret) {
2865                     *result = ret;
2866                     return true;
2867                 }
2868                 obj_req->read_state = RBD_OBJ_READ_PARENT;
2869                 return false;
2870             }
2871         }
2872 
2873         /*
2874          * -ENOENT means a hole in the image -- zero-fill the entire
2875          * length of the request.  A short read also implies zero-fill
2876          * to the end of the request.
2877          */
2878         if (*result == -ENOENT) {
2879             rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
2880             *result = 0;
2881         } else if (*result >= 0) {
2882             if (*result < obj_req->ex.oe_len)
2883                 rbd_obj_zero_range(obj_req, *result,
2884                         obj_req->ex.oe_len - *result);
2885             else
2886                 rbd_assert(*result == obj_req->ex.oe_len);
2887             *result = 0;
2888         }
2889         return true;
2890     case RBD_OBJ_READ_PARENT:
2891         /*
2892          * The parent image is read only up to the overlap -- zero-fill
2893          * from the overlap to the end of the request.
2894          */
2895         if (!*result) {
2896             u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
2897 
2898             if (obj_overlap < obj_req->ex.oe_len)
2899                 rbd_obj_zero_range(obj_req, obj_overlap,
2900                         obj_req->ex.oe_len - obj_overlap);
2901         }
2902         return true;
2903     default:
2904         BUG();
2905     }
2906 }
2907 
2908 static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
2909 {
2910     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2911 
2912     if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
2913         obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2914 
2915     if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
2916         (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
2917         dout("%s %p noop for nonexistent\n", __func__, obj_req);
2918         return true;
2919     }
2920 
2921     return false;
2922 }
2923 
2924 /*
2925  * Return:
2926  *   0 - object map update sent
2927  *   1 - object map update isn't needed
2928  *  <0 - error
2929  */
2930 static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
2931 {
2932     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2933     u8 new_state;
2934 
2935     if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
2936         return 1;
2937 
2938     if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
2939         new_state = OBJECT_PENDING;
2940     else
2941         new_state = OBJECT_EXISTS;
2942 
2943     return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
2944 }
2945 
2946 static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
2947 {
2948     struct ceph_osd_request *osd_req;
2949     int num_ops = count_write_ops(obj_req);
2950     int which = 0;
2951     int ret;
2952 
2953     if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
2954         num_ops++; /* stat */
2955 
2956     osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
2957     if (IS_ERR(osd_req))
2958         return PTR_ERR(osd_req);
2959 
2960     if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
2961         ret = rbd_osd_setup_stat(osd_req, which++);
2962         if (ret)
2963             return ret;
2964     }
2965 
2966     rbd_osd_setup_write_ops(osd_req, which);
2967     rbd_osd_format_write(osd_req);
2968 
2969     ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2970     if (ret)
2971         return ret;
2972 
2973     rbd_osd_submit(osd_req);
2974     return 0;
2975 }
2976 
2977 /*
2978  * copyup_bvecs pages are never highmem pages
2979  */
2980 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2981 {
2982     struct ceph_bvec_iter it = {
2983         .bvecs = bvecs,
2984         .iter = { .bi_size = bytes },
2985     };
2986 
2987     ceph_bvec_iter_advance_step(&it, bytes, ({
2988         if (memchr_inv(bvec_virt(&bv), 0, bv.bv_len))
2989             return false;
2990     }));
2991     return true;
2992 }
2993 
2994 #define MODS_ONLY   U32_MAX
2995 
2996 static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
2997                       u32 bytes)
2998 {
2999     struct ceph_osd_request *osd_req;
3000     int ret;
3001 
3002     dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3003     rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3004 
3005     osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3006     if (IS_ERR(osd_req))
3007         return PTR_ERR(osd_req);
3008 
3009     ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3010     if (ret)
3011         return ret;
3012 
3013     rbd_osd_format_write(osd_req);
3014 
3015     ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3016     if (ret)
3017         return ret;
3018 
3019     rbd_osd_submit(osd_req);
3020     return 0;
3021 }
3022 
3023 static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3024                     u32 bytes)
3025 {
3026     struct ceph_osd_request *osd_req;
3027     int num_ops = count_write_ops(obj_req);
3028     int which = 0;
3029     int ret;
3030 
3031     dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3032 
3033     if (bytes != MODS_ONLY)
3034         num_ops++; /* copyup */
3035 
3036     osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3037     if (IS_ERR(osd_req))
3038         return PTR_ERR(osd_req);
3039 
3040     if (bytes != MODS_ONLY) {
3041         ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3042         if (ret)
3043             return ret;
3044     }
3045 
3046     rbd_osd_setup_write_ops(osd_req, which);
3047     rbd_osd_format_write(osd_req);
3048 
3049     ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3050     if (ret)
3051         return ret;
3052 
3053     rbd_osd_submit(osd_req);
3054     return 0;
3055 }
3056 
3057 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3058 {
3059     u32 i;
3060 
3061     rbd_assert(!obj_req->copyup_bvecs);
3062     obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3063     obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3064                     sizeof(*obj_req->copyup_bvecs),
3065                     GFP_NOIO);
3066     if (!obj_req->copyup_bvecs)
3067         return -ENOMEM;
3068 
3069     for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3070         unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3071 
3072         obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
3073         if (!obj_req->copyup_bvecs[i].bv_page)
3074             return -ENOMEM;
3075 
3076         obj_req->copyup_bvecs[i].bv_offset = 0;
3077         obj_req->copyup_bvecs[i].bv_len = len;
3078         obj_overlap -= len;
3079     }
3080 
3081     rbd_assert(!obj_overlap);
3082     return 0;
3083 }
3084 
3085 /*
3086  * The target object doesn't exist.  Read the data for the entire
3087  * target object up to the overlap point (if any) from the parent,
3088  * so we can use it for a copyup.
3089  */
3090 static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3091 {
3092     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3093     int ret;
3094 
3095     rbd_assert(obj_req->num_img_extents);
3096     prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3097               rbd_dev->parent_overlap);
3098     if (!obj_req->num_img_extents) {
3099         /*
3100          * The overlap has become 0 (most likely because the
3101          * image has been flattened).  Re-submit the original write
3102          * request -- pass MODS_ONLY since the copyup isn't needed
3103          * anymore.
3104          */
3105         return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3106     }
3107 
3108     ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3109     if (ret)
3110         return ret;
3111 
3112     return rbd_obj_read_from_parent(obj_req);
3113 }
3114 
3115 static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3116 {
3117     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3118     struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3119     u8 new_state;
3120     u32 i;
3121     int ret;
3122 
3123     rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3124 
3125     if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3126         return;
3127 
3128     if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3129         return;
3130 
3131     for (i = 0; i < snapc->num_snaps; i++) {
3132         if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3133             i + 1 < snapc->num_snaps)
3134             new_state = OBJECT_EXISTS_CLEAN;
3135         else
3136             new_state = OBJECT_EXISTS;
3137 
3138         ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3139                         new_state, NULL);
3140         if (ret < 0) {
3141             obj_req->pending.result = ret;
3142             return;
3143         }
3144 
3145         rbd_assert(!ret);
3146         obj_req->pending.num_pending++;
3147     }
3148 }
3149 
3150 static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3151 {
3152     u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3153     int ret;
3154 
3155     rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3156 
3157     /*
3158      * Only send non-zero copyup data to save some I/O and network
3159      * bandwidth -- zero copyup data is equivalent to the object not
3160      * existing.
3161      */
3162     if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3163         bytes = 0;
3164 
3165     if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3166         /*
3167          * Send a copyup request with an empty snapshot context to
3168          * deep-copyup the object through all existing snapshots.
3169          * A second request with the current snapshot context will be
3170          * sent for the actual modification.
3171          */
3172         ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3173         if (ret) {
3174             obj_req->pending.result = ret;
3175             return;
3176         }
3177 
3178         obj_req->pending.num_pending++;
3179         bytes = MODS_ONLY;
3180     }
3181 
3182     ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3183     if (ret) {
3184         obj_req->pending.result = ret;
3185         return;
3186     }
3187 
3188     obj_req->pending.num_pending++;
3189 }
3190 
3191 static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3192 {
3193     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3194     int ret;
3195 
3196 again:
3197     switch (obj_req->copyup_state) {
3198     case RBD_OBJ_COPYUP_START:
3199         rbd_assert(!*result);
3200 
3201         ret = rbd_obj_copyup_read_parent(obj_req);
3202         if (ret) {
3203             *result = ret;
3204             return true;
3205         }
3206         if (obj_req->num_img_extents)
3207             obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3208         else
3209             obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3210         return false;
3211     case RBD_OBJ_COPYUP_READ_PARENT:
3212         if (*result)
3213             return true;
3214 
3215         if (is_zero_bvecs(obj_req->copyup_bvecs,
3216                   rbd_obj_img_extents_bytes(obj_req))) {
3217             dout("%s %p detected zeros\n", __func__, obj_req);
3218             obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3219         }
3220 
3221         rbd_obj_copyup_object_maps(obj_req);
3222         if (!obj_req->pending.num_pending) {
3223             *result = obj_req->pending.result;
3224             obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3225             goto again;
3226         }
3227         obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3228         return false;
3229     case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3230         if (!pending_result_dec(&obj_req->pending, result))
3231             return false;
3232         fallthrough;
3233     case RBD_OBJ_COPYUP_OBJECT_MAPS:
3234         if (*result) {
3235             rbd_warn(rbd_dev, "snap object map update failed: %d",
3236                  *result);
3237             return true;
3238         }
3239 
3240         rbd_obj_copyup_write_object(obj_req);
3241         if (!obj_req->pending.num_pending) {
3242             *result = obj_req->pending.result;
3243             obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3244             goto again;
3245         }
3246         obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3247         return false;
3248     case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3249         if (!pending_result_dec(&obj_req->pending, result))
3250             return false;
3251         fallthrough;
3252     case RBD_OBJ_COPYUP_WRITE_OBJECT:
3253         return true;
3254     default:
3255         BUG();
3256     }
3257 }
3258 
3259 /*
3260  * Return:
3261  *   0 - object map update sent
3262  *   1 - object map update isn't needed
3263  *  <0 - error
3264  */
3265 static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3266 {
3267     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3268     u8 current_state = OBJECT_PENDING;
3269 
3270     if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3271         return 1;
3272 
3273     if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3274         return 1;
3275 
3276     return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3277                      &current_state);
3278 }
3279 
3280 static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3281 {
3282     struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3283     int ret;
3284 
3285 again:
3286     switch (obj_req->write_state) {
3287     case RBD_OBJ_WRITE_START:
3288         rbd_assert(!*result);
3289 
3290         if (rbd_obj_write_is_noop(obj_req))
3291             return true;
3292 
3293         ret = rbd_obj_write_pre_object_map(obj_req);
3294         if (ret < 0) {
3295             *result = ret;
3296             return true;
3297         }
3298         obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3299         if (ret > 0)
3300             goto again;
3301         return false;
3302     case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3303         if (*result) {
3304             rbd_warn(rbd_dev, "pre object map update failed: %d",
3305                  *result);
3306             return true;
3307         }
3308         ret = rbd_obj_write_object(obj_req);
3309         if (ret) {
3310             *result = ret;
3311             return true;
3312         }
3313         obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3314         return false;
3315     case RBD_OBJ_WRITE_OBJECT:
3316         if (*result == -ENOENT) {
3317             if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3318                 *result = 0;
3319                 obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3320                 obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3321                 goto again;
3322             }
3323             /*
3324              * On a non-existent object:
3325              *   delete - -ENOENT, truncate/zero - 0
3326              */
3327             if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3328                 *result = 0;
3329         }
3330         if (*result)
3331             return true;
3332 
3333         obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3334         goto again;
3335     case __RBD_OBJ_WRITE_COPYUP:
3336         if (!rbd_obj_advance_copyup(obj_req, result))
3337             return false;
3338         fallthrough;
3339     case RBD_OBJ_WRITE_COPYUP:
3340         if (*result) {
3341             rbd_warn(rbd_dev, "copyup failed: %d", *result);
3342             return true;
3343         }
3344         ret = rbd_obj_write_post_object_map(obj_req);
3345         if (ret < 0) {
3346             *result = ret;
3347             return true;
3348         }
3349         obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3350         if (ret > 0)
3351             goto again;
3352         return false;
3353     case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3354         if (*result)
3355             rbd_warn(rbd_dev, "post object map update failed: %d",
3356                  *result);
3357         return true;
3358     default:
3359         BUG();
3360     }
3361 }
3362 
3363 /*
3364  * Return true if @obj_req is completed.
3365  */
3366 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3367                      int *result)
3368 {
3369     struct rbd_img_request *img_req = obj_req->img_request;
3370     struct rbd_device *rbd_dev = img_req->rbd_dev;
3371     bool done;
3372 
3373     mutex_lock(&obj_req->state_mutex);
3374     if (!rbd_img_is_write(img_req))
3375         done = rbd_obj_advance_read(obj_req, result);
3376     else
3377         done = rbd_obj_advance_write(obj_req, result);
3378     mutex_unlock(&obj_req->state_mutex);
3379 
3380     if (done && *result) {
3381         rbd_assert(*result < 0);
3382         rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3383              obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3384              obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3385     }
3386     return done;
3387 }
3388 
3389 /*
3390  * This is open-coded in rbd_img_handle_request() to avoid parent chain
3391  * recursion.
3392  */
3393 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3394 {
3395     if (__rbd_obj_handle_request(obj_req, &result))
3396         rbd_img_handle_request(obj_req->img_request, result);
3397 }
3398 
3399 static bool need_exclusive_lock(struct rbd_img_request *img_req)
3400 {
3401     struct rbd_device *rbd_dev = img_req->rbd_dev;
3402 
3403     if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3404         return false;
3405 
3406     if (rbd_is_ro(rbd_dev))
3407         return false;
3408 
3409     rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3410     if (rbd_dev->opts->lock_on_read ||
3411         (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3412         return true;
3413 
3414     return rbd_img_is_write(img_req);
3415 }
3416 
3417 static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3418 {
3419     struct rbd_device *rbd_dev = img_req->rbd_dev;
3420     bool locked;
3421 
3422     lockdep_assert_held(&rbd_dev->lock_rwsem);
3423     locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3424     spin_lock(&rbd_dev->lock_lists_lock);
3425     rbd_assert(list_empty(&img_req->lock_item));
3426     if (!locked)
3427         list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3428     else
3429         list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3430     spin_unlock(&rbd_dev->lock_lists_lock);
3431     return locked;
3432 }
3433 
3434 static void rbd_lock_del_request(struct rbd_img_request *img_req)
3435 {
3436     struct rbd_device *rbd_dev = img_req->rbd_dev;
3437     bool need_wakeup;
3438 
3439     lockdep_assert_held(&rbd_dev->lock_rwsem);
3440     spin_lock(&rbd_dev->lock_lists_lock);
3441     rbd_assert(!list_empty(&img_req->lock_item));
3442     list_del_init(&img_req->lock_item);
3443     need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3444                list_empty(&rbd_dev->running_list));
3445     spin_unlock(&rbd_dev->lock_lists_lock);
3446     if (need_wakeup)
3447         complete(&rbd_dev->releasing_wait);
3448 }
3449 
3450 static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3451 {
3452     struct rbd_device *rbd_dev = img_req->rbd_dev;
3453 
3454     if (!need_exclusive_lock(img_req))
3455         return 1;
3456 
3457     if (rbd_lock_add_request(img_req))
3458         return 1;
3459 
3460     if (rbd_dev->opts->exclusive) {
3461         WARN_ON(1); /* lock got released? */
3462         return -EROFS;
3463     }
3464 
3465     /*
3466      * Note the use of mod_delayed_work() in rbd_acquire_lock()
3467      * and cancel_delayed_work() in wake_lock_waiters().
3468      */
3469     dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3470     queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3471     return 0;
3472 }
3473 
3474 static void rbd_img_object_requests(struct rbd_img_request *img_req)
3475 {
3476     struct rbd_obj_request *obj_req;
3477 
3478     rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3479 
3480     for_each_obj_request(img_req, obj_req) {
3481         int result = 0;
3482 
3483         if (__rbd_obj_handle_request(obj_req, &result)) {
3484             if (result) {
3485                 img_req->pending.result = result;
3486                 return;
3487             }
3488         } else {
3489             img_req->pending.num_pending++;
3490         }
3491     }
3492 }
3493 
3494 static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3495 {
3496     struct rbd_device *rbd_dev = img_req->rbd_dev;
3497     int ret;
3498 
3499 again:
3500     switch (img_req->state) {
3501     case RBD_IMG_START:
3502         rbd_assert(!*result);
3503 
3504         ret = rbd_img_exclusive_lock(img_req);
3505         if (ret < 0) {
3506             *result = ret;
3507             return true;
3508         }
3509         img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3510         if (ret > 0)
3511             goto again;
3512         return false;
3513     case RBD_IMG_EXCLUSIVE_LOCK:
3514         if (*result)
3515             return true;
3516 
3517         rbd_assert(!need_exclusive_lock(img_req) ||
3518                __rbd_is_lock_owner(rbd_dev));
3519 
3520         rbd_img_object_requests(img_req);
3521         if (!img_req->pending.num_pending) {
3522             *result = img_req->pending.result;
3523             img_req->state = RBD_IMG_OBJECT_REQUESTS;
3524             goto again;
3525         }
3526         img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3527         return false;
3528     case __RBD_IMG_OBJECT_REQUESTS:
3529         if (!pending_result_dec(&img_req->pending, result))
3530             return false;
3531         fallthrough;
3532     case RBD_IMG_OBJECT_REQUESTS:
3533         return true;
3534     default:
3535         BUG();
3536     }
3537 }
3538 
3539 /*
3540  * Return true if @img_req is completed.
3541  */
3542 static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3543                      int *result)
3544 {
3545     struct rbd_device *rbd_dev = img_req->rbd_dev;
3546     bool done;
3547 
3548     if (need_exclusive_lock(img_req)) {
3549         down_read(&rbd_dev->lock_rwsem);
3550         mutex_lock(&img_req->state_mutex);
3551         done = rbd_img_advance(img_req, result);
3552         if (done)
3553             rbd_lock_del_request(img_req);
3554         mutex_unlock(&img_req->state_mutex);
3555         up_read(&rbd_dev->lock_rwsem);
3556     } else {
3557         mutex_lock(&img_req->state_mutex);
3558         done = rbd_img_advance(img_req, result);
3559         mutex_unlock(&img_req->state_mutex);
3560     }
3561 
3562     if (done && *result) {
3563         rbd_assert(*result < 0);
3564         rbd_warn(rbd_dev, "%s%s result %d",
3565               test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3566               obj_op_name(img_req->op_type), *result);
3567     }
3568     return done;
3569 }
3570 
3571 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3572 {
3573 again:
3574     if (!__rbd_img_handle_request(img_req, &result))
3575         return;
3576 
3577     if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3578         struct rbd_obj_request *obj_req = img_req->obj_request;
3579 
3580         rbd_img_request_destroy(img_req);
3581         if (__rbd_obj_handle_request(obj_req, &result)) {
3582             img_req = obj_req->img_request;
3583             goto again;
3584         }
3585     } else {
3586         struct request *rq = blk_mq_rq_from_pdu(img_req);
3587 
3588         rbd_img_request_destroy(img_req);
3589         blk_mq_end_request(rq, errno_to_blk_status(result));
3590     }
3591 }
3592 
3593 static const struct rbd_client_id rbd_empty_cid;
3594 
3595 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3596               const struct rbd_client_id *rhs)
3597 {
3598     return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3599 }
3600 
3601 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3602 {
3603     struct rbd_client_id cid;
3604 
3605     mutex_lock(&rbd_dev->watch_mutex);
3606     cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3607     cid.handle = rbd_dev->watch_cookie;
3608     mutex_unlock(&rbd_dev->watch_mutex);
3609     return cid;
3610 }
3611 
3612 /*
3613  * lock_rwsem must be held for write
3614  */
3615 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3616                   const struct rbd_client_id *cid)
3617 {
3618     dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3619          rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3620          cid->gid, cid->handle);
3621     rbd_dev->owner_cid = *cid; /* struct */
3622 }
3623 
3624 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3625 {
3626     mutex_lock(&rbd_dev->watch_mutex);
3627     sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3628     mutex_unlock(&rbd_dev->watch_mutex);
3629 }
3630 
3631 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3632 {
3633     struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3634 
3635     rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3636     strcpy(rbd_dev->lock_cookie, cookie);
3637     rbd_set_owner_cid(rbd_dev, &cid);
3638     queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3639 }
3640 
3641 /*
3642  * lock_rwsem must be held for write
3643  */
3644 static int rbd_lock(struct rbd_device *rbd_dev)
3645 {
3646     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3647     char cookie[32];
3648     int ret;
3649 
3650     WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3651         rbd_dev->lock_cookie[0] != '\0');
3652 
3653     format_lock_cookie(rbd_dev, cookie);
3654     ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3655                 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3656                 RBD_LOCK_TAG, "", 0);
3657     if (ret)
3658         return ret;
3659 
3660     __rbd_lock(rbd_dev, cookie);
3661     return 0;
3662 }
3663 
3664 /*
3665  * lock_rwsem must be held for write
3666  */
3667 static void rbd_unlock(struct rbd_device *rbd_dev)
3668 {
3669     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3670     int ret;
3671 
3672     WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3673         rbd_dev->lock_cookie[0] == '\0');
3674 
3675     ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3676                   RBD_LOCK_NAME, rbd_dev->lock_cookie);
3677     if (ret && ret != -ENOENT)
3678         rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3679 
3680     /* treat errors as the image is unlocked */
3681     rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3682     rbd_dev->lock_cookie[0] = '\0';
3683     rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3684     queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3685 }
3686 
3687 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3688                 enum rbd_notify_op notify_op,
3689                 struct page ***preply_pages,
3690                 size_t *preply_len)
3691 {
3692     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3693     struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3694     char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3695     int buf_size = sizeof(buf);
3696     void *p = buf;
3697 
3698     dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3699 
3700     /* encode *LockPayload NotifyMessage (op + ClientId) */
3701     ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3702     ceph_encode_32(&p, notify_op);
3703     ceph_encode_64(&p, cid.gid);
3704     ceph_encode_64(&p, cid.handle);
3705 
3706     return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3707                 &rbd_dev->header_oloc, buf, buf_size,
3708                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3709 }
3710 
3711 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3712                    enum rbd_notify_op notify_op)
3713 {
3714     __rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL);
3715 }
3716 
3717 static void rbd_notify_acquired_lock(struct work_struct *work)
3718 {
3719     struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3720                           acquired_lock_work);
3721 
3722     rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3723 }
3724 
3725 static void rbd_notify_released_lock(struct work_struct *work)
3726 {
3727     struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3728                           released_lock_work);
3729 
3730     rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3731 }
3732 
3733 static int rbd_request_lock(struct rbd_device *rbd_dev)
3734 {
3735     struct page **reply_pages;
3736     size_t reply_len;
3737     bool lock_owner_responded = false;
3738     int ret;
3739 
3740     dout("%s rbd_dev %p\n", __func__, rbd_dev);
3741 
3742     ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3743                    &reply_pages, &reply_len);
3744     if (ret && ret != -ETIMEDOUT) {
3745         rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3746         goto out;
3747     }
3748 
3749     if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3750         void *p = page_address(reply_pages[0]);
3751         void *const end = p + reply_len;
3752         u32 n;
3753 
3754         ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3755         while (n--) {
3756             u8 struct_v;
3757             u32 len;
3758 
3759             ceph_decode_need(&p, end, 8 + 8, e_inval);
3760             p += 8 + 8; /* skip gid and cookie */
3761 
3762             ceph_decode_32_safe(&p, end, len, e_inval);
3763             if (!len)
3764                 continue;
3765 
3766             if (lock_owner_responded) {
3767                 rbd_warn(rbd_dev,
3768                      "duplicate lock owners detected");
3769                 ret = -EIO;
3770                 goto out;
3771             }
3772 
3773             lock_owner_responded = true;
3774             ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3775                           &struct_v, &len);
3776             if (ret) {
3777                 rbd_warn(rbd_dev,
3778                      "failed to decode ResponseMessage: %d",
3779                      ret);
3780                 goto e_inval;
3781             }
3782 
3783             ret = ceph_decode_32(&p);
3784         }
3785     }
3786 
3787     if (!lock_owner_responded) {
3788         rbd_warn(rbd_dev, "no lock owners detected");
3789         ret = -ETIMEDOUT;
3790     }
3791 
3792 out:
3793     ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3794     return ret;
3795 
3796 e_inval:
3797     ret = -EINVAL;
3798     goto out;
3799 }
3800 
3801 /*
3802  * Either image request state machine(s) or rbd_add_acquire_lock()
3803  * (i.e. "rbd map").
3804  */
3805 static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3806 {
3807     struct rbd_img_request *img_req;
3808 
3809     dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3810     lockdep_assert_held_write(&rbd_dev->lock_rwsem);
3811 
3812     cancel_delayed_work(&rbd_dev->lock_dwork);
3813     if (!completion_done(&rbd_dev->acquire_wait)) {
3814         rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3815                list_empty(&rbd_dev->running_list));
3816         rbd_dev->acquire_err = result;
3817         complete_all(&rbd_dev->acquire_wait);
3818         return;
3819     }
3820 
3821     list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3822         mutex_lock(&img_req->state_mutex);
3823         rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3824         rbd_img_schedule(img_req, result);
3825         mutex_unlock(&img_req->state_mutex);
3826     }
3827 
3828     list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3829 }
3830 
3831 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3832                    struct ceph_locker **lockers, u32 *num_lockers)
3833 {
3834     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3835     u8 lock_type;
3836     char *lock_tag;
3837     int ret;
3838 
3839     dout("%s rbd_dev %p\n", __func__, rbd_dev);
3840 
3841     ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3842                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3843                  &lock_type, &lock_tag, lockers, num_lockers);
3844     if (ret)
3845         return ret;
3846 
3847     if (*num_lockers == 0) {
3848         dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3849         goto out;
3850     }
3851 
3852     if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3853         rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3854              lock_tag);
3855         ret = -EBUSY;
3856         goto out;
3857     }
3858 
3859     if (lock_type == CEPH_CLS_LOCK_SHARED) {
3860         rbd_warn(rbd_dev, "shared lock type detected");
3861         ret = -EBUSY;
3862         goto out;
3863     }
3864 
3865     if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3866             strlen(RBD_LOCK_COOKIE_PREFIX))) {
3867         rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3868              (*lockers)[0].id.cookie);
3869         ret = -EBUSY;
3870         goto out;
3871     }
3872 
3873 out:
3874     kfree(lock_tag);
3875     return ret;
3876 }
3877 
3878 static int find_watcher(struct rbd_device *rbd_dev,
3879             const struct ceph_locker *locker)
3880 {
3881     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3882     struct ceph_watch_item *watchers;
3883     u32 num_watchers;
3884     u64 cookie;
3885     int i;
3886     int ret;
3887 
3888     ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3889                       &rbd_dev->header_oloc, &watchers,
3890                       &num_watchers);
3891     if (ret)
3892         return ret;
3893 
3894     sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3895     for (i = 0; i < num_watchers; i++) {
3896         /*
3897          * Ignore addr->type while comparing.  This mimics
3898          * entity_addr_t::get_legacy_str() + strcmp().
3899          */
3900         if (ceph_addr_equal_no_type(&watchers[i].addr,
3901                         &locker->info.addr) &&
3902             watchers[i].cookie == cookie) {
3903             struct rbd_client_id cid = {
3904                 .gid = le64_to_cpu(watchers[i].name.num),
3905                 .handle = cookie,
3906             };
3907 
3908             dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3909                  rbd_dev, cid.gid, cid.handle);
3910             rbd_set_owner_cid(rbd_dev, &cid);
3911             ret = 1;
3912             goto out;
3913         }
3914     }
3915 
3916     dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3917     ret = 0;
3918 out:
3919     kfree(watchers);
3920     return ret;
3921 }
3922 
3923 /*
3924  * lock_rwsem must be held for write
3925  */
3926 static int rbd_try_lock(struct rbd_device *rbd_dev)
3927 {
3928     struct ceph_client *client = rbd_dev->rbd_client->client;
3929     struct ceph_locker *lockers;
3930     u32 num_lockers;
3931     int ret;
3932 
3933     for (;;) {
3934         ret = rbd_lock(rbd_dev);
3935         if (ret != -EBUSY)
3936             return ret;
3937 
3938         /* determine if the current lock holder is still alive */
3939         ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3940         if (ret)
3941             return ret;
3942 
3943         if (num_lockers == 0)
3944             goto again;
3945 
3946         ret = find_watcher(rbd_dev, lockers);
3947         if (ret)
3948             goto out; /* request lock or error */
3949 
3950         rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
3951              ENTITY_NAME(lockers[0].id.name));
3952 
3953         ret = ceph_monc_blocklist_add(&client->monc,
3954                           &lockers[0].info.addr);
3955         if (ret) {
3956             rbd_warn(rbd_dev, "blocklist of %s%llu failed: %d",
3957                  ENTITY_NAME(lockers[0].id.name), ret);
3958             goto out;
3959         }
3960 
3961         ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3962                       &rbd_dev->header_oloc, RBD_LOCK_NAME,
3963                       lockers[0].id.cookie,
3964                       &lockers[0].id.name);
3965         if (ret && ret != -ENOENT)
3966             goto out;
3967 
3968 again:
3969         ceph_free_lockers(lockers, num_lockers);
3970     }
3971 
3972 out:
3973     ceph_free_lockers(lockers, num_lockers);
3974     return ret;
3975 }
3976 
3977 static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
3978 {
3979     int ret;
3980 
3981     if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
3982         ret = rbd_object_map_open(rbd_dev);
3983         if (ret)
3984             return ret;
3985     }
3986 
3987     return 0;
3988 }
3989 
3990 /*
3991  * Return:
3992  *   0 - lock acquired
3993  *   1 - caller should call rbd_request_lock()
3994  *  <0 - error
3995  */
3996 static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
3997 {
3998     int ret;
3999 
4000     down_read(&rbd_dev->lock_rwsem);
4001     dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4002          rbd_dev->lock_state);
4003     if (__rbd_is_lock_owner(rbd_dev)) {
4004         up_read(&rbd_dev->lock_rwsem);
4005         return 0;
4006     }
4007 
4008     up_read(&rbd_dev->lock_rwsem);
4009     down_write(&rbd_dev->lock_rwsem);
4010     dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4011          rbd_dev->lock_state);
4012     if (__rbd_is_lock_owner(rbd_dev)) {
4013         up_write(&rbd_dev->lock_rwsem);
4014         return 0;
4015     }
4016 
4017     ret = rbd_try_lock(rbd_dev);
4018     if (ret < 0) {
4019         rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4020         if (ret == -EBLOCKLISTED)
4021             goto out;
4022 
4023         ret = 1; /* request lock anyway */
4024     }
4025     if (ret > 0) {
4026         up_write(&rbd_dev->lock_rwsem);
4027         return ret;
4028     }
4029 
4030     rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4031     rbd_assert(list_empty(&rbd_dev->running_list));
4032 
4033     ret = rbd_post_acquire_action(rbd_dev);
4034     if (ret) {
4035         rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4036         /*
4037          * Can't stay in RBD_LOCK_STATE_LOCKED because
4038          * rbd_lock_add_request() would let the request through,
4039          * assuming that e.g. object map is locked and loaded.
4040          */
4041         rbd_unlock(rbd_dev);
4042     }
4043 
4044 out:
4045     wake_lock_waiters(rbd_dev, ret);
4046     up_write(&rbd_dev->lock_rwsem);
4047     return ret;
4048 }
4049 
4050 static void rbd_acquire_lock(struct work_struct *work)
4051 {
4052     struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4053                         struct rbd_device, lock_dwork);
4054     int ret;
4055 
4056     dout("%s rbd_dev %p\n", __func__, rbd_dev);
4057 again:
4058     ret = rbd_try_acquire_lock(rbd_dev);
4059     if (ret <= 0) {
4060         dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4061         return;
4062     }
4063 
4064     ret = rbd_request_lock(rbd_dev);
4065     if (ret == -ETIMEDOUT) {
4066         goto again; /* treat this as a dead client */
4067     } else if (ret == -EROFS) {
4068         rbd_warn(rbd_dev, "peer will not release lock");
4069         down_write(&rbd_dev->lock_rwsem);
4070         wake_lock_waiters(rbd_dev, ret);
4071         up_write(&rbd_dev->lock_rwsem);
4072     } else if (ret < 0) {
4073         rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4074         mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4075                  RBD_RETRY_DELAY);
4076     } else {
4077         /*
4078          * lock owner acked, but resend if we don't see them
4079          * release the lock
4080          */
4081         dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
4082              rbd_dev);
4083         mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4084             msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4085     }
4086 }
4087 
4088 static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4089 {
4090     dout("%s rbd_dev %p\n", __func__, rbd_dev);
4091     lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4092 
4093     if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4094         return false;
4095 
4096     /*
4097      * Ensure that all in-flight IO is flushed.
4098      */
4099     rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4100     rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4101     if (list_empty(&rbd_dev->running_list))
4102         return true;
4103 
4104     up_write(&rbd_dev->lock_rwsem);
4105     wait_for_completion(&rbd_dev->releasing_wait);
4106 
4107     down_write(&rbd_dev->lock_rwsem);
4108     if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4109         return false;
4110 
4111     rbd_assert(list_empty(&rbd_dev->running_list));
4112     return true;
4113 }
4114 
4115 static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4116 {
4117     if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4118         rbd_object_map_close(rbd_dev);
4119 }
4120 
4121 static void __rbd_release_lock(struct rbd_device *rbd_dev)
4122 {
4123     rbd_assert(list_empty(&rbd_dev->running_list));
4124 
4125     rbd_pre_release_action(rbd_dev);
4126     rbd_unlock(rbd_dev);
4127 }
4128 
4129 /*
4130  * lock_rwsem must be held for write
4131  */
4132 static void rbd_release_lock(struct rbd_device *rbd_dev)
4133 {
4134     if (!rbd_quiesce_lock(rbd_dev))
4135         return;
4136 
4137     __rbd_release_lock(rbd_dev);
4138 
4139     /*
4140      * Give others a chance to grab the lock - we would re-acquire
4141      * almost immediately if we got new IO while draining the running
4142      * list otherwise.  We need to ack our own notifications, so this
4143      * lock_dwork will be requeued from rbd_handle_released_lock() by
4144      * way of maybe_kick_acquire().
4145      */
4146     cancel_delayed_work(&rbd_dev->lock_dwork);
4147 }
4148 
4149 static void rbd_release_lock_work(struct work_struct *work)
4150 {
4151     struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4152                           unlock_work);
4153 
4154     down_write(&rbd_dev->lock_rwsem);
4155     rbd_release_lock(rbd_dev);
4156     up_write(&rbd_dev->lock_rwsem);
4157 }
4158 
4159 static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4160 {
4161     bool have_requests;
4162 
4163     dout("%s rbd_dev %p\n", __func__, rbd_dev);
4164     if (__rbd_is_lock_owner(rbd_dev))
4165         return;
4166 
4167     spin_lock(&rbd_dev->lock_lists_lock);
4168     have_requests = !list_empty(&rbd_dev->acquiring_list);
4169     spin_unlock(&rbd_dev->lock_lists_lock);
4170     if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4171         dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4172         mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4173     }
4174 }
4175 
4176 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4177                      void **p)
4178 {
4179     struct rbd_client_id cid = { 0 };
4180 
4181     if (struct_v >= 2) {
4182         cid.gid = ceph_decode_64(p);
4183         cid.handle = ceph_decode_64(p);
4184     }
4185 
4186     dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4187          cid.handle);
4188     if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4189         down_write(&rbd_dev->lock_rwsem);
4190         if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4191             dout("%s rbd_dev %p cid %llu-%llu == owner_cid\n",
4192                  __func__, rbd_dev, cid.gid, cid.handle);
4193         } else {
4194             rbd_set_owner_cid(rbd_dev, &cid);
4195         }
4196         downgrade_write(&rbd_dev->lock_rwsem);
4197     } else {
4198         down_read(&rbd_dev->lock_rwsem);
4199     }
4200 
4201     maybe_kick_acquire(rbd_dev);
4202     up_read(&rbd_dev->lock_rwsem);
4203 }
4204 
4205 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4206                      void **p)
4207 {
4208     struct rbd_client_id cid = { 0 };
4209 
4210     if (struct_v >= 2) {
4211         cid.gid = ceph_decode_64(p);
4212         cid.handle = ceph_decode_64(p);
4213     }
4214 
4215     dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4216          cid.handle);
4217     if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4218         down_write(&rbd_dev->lock_rwsem);
4219         if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4220             dout("%s rbd_dev %p cid %llu-%llu != owner_cid %llu-%llu\n",
4221                  __func__, rbd_dev, cid.gid, cid.handle,
4222                  rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4223         } else {
4224             rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4225         }
4226         downgrade_write(&rbd_dev->lock_rwsem);
4227     } else {
4228         down_read(&rbd_dev->lock_rwsem);
4229     }
4230 
4231     maybe_kick_acquire(rbd_dev);
4232     up_read(&rbd_dev->lock_rwsem);
4233 }
4234 
4235 /*
4236  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
4237  * ResponseMessage is needed.
4238  */
4239 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4240                    void **p)
4241 {
4242     struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4243     struct rbd_client_id cid = { 0 };
4244     int result = 1;
4245 
4246     if (struct_v >= 2) {
4247         cid.gid = ceph_decode_64(p);
4248         cid.handle = ceph_decode_64(p);
4249     }
4250 
4251     dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4252          cid.handle);
4253     if (rbd_cid_equal(&cid, &my_cid))
4254         return result;
4255 
4256     down_read(&rbd_dev->lock_rwsem);
4257     if (__rbd_is_lock_owner(rbd_dev)) {
4258         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4259             rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4260             goto out_unlock;
4261 
4262         /*
4263          * encode ResponseMessage(0) so the peer can detect
4264          * a missing owner
4265          */
4266         result = 0;
4267 
4268         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4269             if (!rbd_dev->opts->exclusive) {
4270                 dout("%s rbd_dev %p queueing unlock_work\n",
4271                      __func__, rbd_dev);
4272                 queue_work(rbd_dev->task_wq,
4273                        &rbd_dev->unlock_work);
4274             } else {
4275                 /* refuse to release the lock */
4276                 result = -EROFS;
4277             }
4278         }
4279     }
4280 
4281 out_unlock:
4282     up_read(&rbd_dev->lock_rwsem);
4283     return result;
4284 }
4285 
4286 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4287                      u64 notify_id, u64 cookie, s32 *result)
4288 {
4289     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4290     char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4291     int buf_size = sizeof(buf);
4292     int ret;
4293 
4294     if (result) {
4295         void *p = buf;
4296 
4297         /* encode ResponseMessage */
4298         ceph_start_encoding(&p, 1, 1,
4299                     buf_size - CEPH_ENCODING_START_BLK_LEN);
4300         ceph_encode_32(&p, *result);
4301     } else {
4302         buf_size = 0;
4303     }
4304 
4305     ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4306                    &rbd_dev->header_oloc, notify_id, cookie,
4307                    buf, buf_size);
4308     if (ret)
4309         rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4310 }
4311 
4312 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4313                    u64 cookie)
4314 {
4315     dout("%s rbd_dev %p\n", __func__, rbd_dev);
4316     __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4317 }
4318 
4319 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4320                       u64 notify_id, u64 cookie, s32 result)
4321 {
4322     dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4323     __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4324 }
4325 
4326 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4327              u64 notifier_id, void *data, size_t data_len)
4328 {
4329     struct rbd_device *rbd_dev = arg;
4330     void *p = data;
4331     void *const end = p + data_len;
4332     u8 struct_v = 0;
4333     u32 len;
4334     u32 notify_op;
4335     int ret;
4336 
4337     dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4338          __func__, rbd_dev, cookie, notify_id, data_len);
4339     if (data_len) {
4340         ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4341                       &struct_v, &len);
4342         if (ret) {
4343             rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4344                  ret);
4345             return;
4346         }
4347 
4348         notify_op = ceph_decode_32(&p);
4349     } else {
4350         /* legacy notification for header updates */
4351         notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4352         len = 0;
4353     }
4354 
4355     dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4356     switch (notify_op) {
4357     case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4358         rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4359         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4360         break;
4361     case RBD_NOTIFY_OP_RELEASED_LOCK:
4362         rbd_handle_released_lock(rbd_dev, struct_v, &p);
4363         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4364         break;
4365     case RBD_NOTIFY_OP_REQUEST_LOCK:
4366         ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4367         if (ret <= 0)
4368             rbd_acknowledge_notify_result(rbd_dev, notify_id,
4369                               cookie, ret);
4370         else
4371             rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4372         break;
4373     case RBD_NOTIFY_OP_HEADER_UPDATE:
4374         ret = rbd_dev_refresh(rbd_dev);
4375         if (ret)
4376             rbd_warn(rbd_dev, "refresh failed: %d", ret);
4377 
4378         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4379         break;
4380     default:
4381         if (rbd_is_lock_owner(rbd_dev))
4382             rbd_acknowledge_notify_result(rbd_dev, notify_id,
4383                               cookie, -EOPNOTSUPP);
4384         else
4385             rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4386         break;
4387     }
4388 }
4389 
4390 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4391 
4392 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4393 {
4394     struct rbd_device *rbd_dev = arg;
4395 
4396     rbd_warn(rbd_dev, "encountered watch error: %d", err);
4397 
4398     down_write(&rbd_dev->lock_rwsem);
4399     rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4400     up_write(&rbd_dev->lock_rwsem);
4401 
4402     mutex_lock(&rbd_dev->watch_mutex);
4403     if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4404         __rbd_unregister_watch(rbd_dev);
4405         rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4406 
4407         queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4408     }
4409     mutex_unlock(&rbd_dev->watch_mutex);
4410 }
4411 
4412 /*
4413  * watch_mutex must be locked
4414  */
4415 static int __rbd_register_watch(struct rbd_device *rbd_dev)
4416 {
4417     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4418     struct ceph_osd_linger_request *handle;
4419 
4420     rbd_assert(!rbd_dev->watch_handle);
4421     dout("%s rbd_dev %p\n", __func__, rbd_dev);
4422 
4423     handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4424                  &rbd_dev->header_oloc, rbd_watch_cb,
4425                  rbd_watch_errcb, rbd_dev);
4426     if (IS_ERR(handle))
4427         return PTR_ERR(handle);
4428 
4429     rbd_dev->watch_handle = handle;
4430     return 0;
4431 }
4432 
4433 /*
4434  * watch_mutex must be locked
4435  */
4436 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4437 {
4438     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4439     int ret;
4440 
4441     rbd_assert(rbd_dev->watch_handle);
4442     dout("%s rbd_dev %p\n", __func__, rbd_dev);
4443 
4444     ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4445     if (ret)
4446         rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4447 
4448     rbd_dev->watch_handle = NULL;
4449 }
4450 
4451 static int rbd_register_watch(struct rbd_device *rbd_dev)
4452 {
4453     int ret;
4454 
4455     mutex_lock(&rbd_dev->watch_mutex);
4456     rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4457     ret = __rbd_register_watch(rbd_dev);
4458     if (ret)
4459         goto out;
4460 
4461     rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4462     rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4463 
4464 out:
4465     mutex_unlock(&rbd_dev->watch_mutex);
4466     return ret;
4467 }
4468 
4469 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4470 {
4471     dout("%s rbd_dev %p\n", __func__, rbd_dev);
4472 
4473     cancel_work_sync(&rbd_dev->acquired_lock_work);
4474     cancel_work_sync(&rbd_dev->released_lock_work);
4475     cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4476     cancel_work_sync(&rbd_dev->unlock_work);
4477 }
4478 
4479 /*
4480  * header_rwsem must not be held to avoid a deadlock with
4481  * rbd_dev_refresh() when flushing notifies.
4482  */
4483 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4484 {
4485     cancel_tasks_sync(rbd_dev);
4486 
4487     mutex_lock(&rbd_dev->watch_mutex);
4488     if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4489         __rbd_unregister_watch(rbd_dev);
4490     rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4491     mutex_unlock(&rbd_dev->watch_mutex);
4492 
4493     cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4494     ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4495 }
4496 
4497 /*
4498  * lock_rwsem must be held for write
4499  */
4500 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4501 {
4502     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4503     char cookie[32];
4504     int ret;
4505 
4506     if (!rbd_quiesce_lock(rbd_dev))
4507         return;
4508 
4509     format_lock_cookie(rbd_dev, cookie);
4510     ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4511                   &rbd_dev->header_oloc, RBD_LOCK_NAME,
4512                   CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4513                   RBD_LOCK_TAG, cookie);
4514     if (ret) {
4515         if (ret != -EOPNOTSUPP)
4516             rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4517                  ret);
4518 
4519         /*
4520          * Lock cookie cannot be updated on older OSDs, so do
4521          * a manual release and queue an acquire.
4522          */
4523         __rbd_release_lock(rbd_dev);
4524         queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4525     } else {
4526         __rbd_lock(rbd_dev, cookie);
4527         wake_lock_waiters(rbd_dev, 0);
4528     }
4529 }
4530 
4531 static void rbd_reregister_watch(struct work_struct *work)
4532 {
4533     struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4534                         struct rbd_device, watch_dwork);
4535     int ret;
4536 
4537     dout("%s rbd_dev %p\n", __func__, rbd_dev);
4538 
4539     mutex_lock(&rbd_dev->watch_mutex);
4540     if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4541         mutex_unlock(&rbd_dev->watch_mutex);
4542         return;
4543     }
4544 
4545     ret = __rbd_register_watch(rbd_dev);
4546     if (ret) {
4547         rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4548         if (ret != -EBLOCKLISTED && ret != -ENOENT) {
4549             queue_delayed_work(rbd_dev->task_wq,
4550                        &rbd_dev->watch_dwork,
4551                        RBD_RETRY_DELAY);
4552             mutex_unlock(&rbd_dev->watch_mutex);
4553             return;
4554         }
4555 
4556         mutex_unlock(&rbd_dev->watch_mutex);
4557         down_write(&rbd_dev->lock_rwsem);
4558         wake_lock_waiters(rbd_dev, ret);
4559         up_write(&rbd_dev->lock_rwsem);
4560         return;
4561     }
4562 
4563     rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4564     rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4565     mutex_unlock(&rbd_dev->watch_mutex);
4566 
4567     down_write(&rbd_dev->lock_rwsem);
4568     if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4569         rbd_reacquire_lock(rbd_dev);
4570     up_write(&rbd_dev->lock_rwsem);
4571 
4572     ret = rbd_dev_refresh(rbd_dev);
4573     if (ret)
4574         rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4575 }
4576 
4577 /*
4578  * Synchronous osd object method call.  Returns the number of bytes
4579  * returned in the outbound buffer, or a negative error code.
4580  */
4581 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4582                  struct ceph_object_id *oid,
4583                  struct ceph_object_locator *oloc,
4584                  const char *method_name,
4585                  const void *outbound,
4586                  size_t outbound_size,
4587                  void *inbound,
4588                  size_t inbound_size)
4589 {
4590     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4591     struct page *req_page = NULL;
4592     struct page *reply_page;
4593     int ret;
4594 
4595     /*
4596      * Method calls are ultimately read operations.  The result
4597      * should placed into the inbound buffer provided.  They
4598      * also supply outbound data--parameters for the object
4599      * method.  Currently if this is present it will be a
4600      * snapshot id.
4601      */
4602     if (outbound) {
4603         if (outbound_size > PAGE_SIZE)
4604             return -E2BIG;
4605 
4606         req_page = alloc_page(GFP_KERNEL);
4607         if (!req_page)
4608             return -ENOMEM;
4609 
4610         memcpy(page_address(req_page), outbound, outbound_size);
4611     }
4612 
4613     reply_page = alloc_page(GFP_KERNEL);
4614     if (!reply_page) {
4615         if (req_page)
4616             __free_page(req_page);
4617         return -ENOMEM;
4618     }
4619 
4620     ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
4621                  CEPH_OSD_FLAG_READ, req_page, outbound_size,
4622                  &reply_page, &inbound_size);
4623     if (!ret) {
4624         memcpy(inbound, page_address(reply_page), inbound_size);
4625         ret = inbound_size;
4626     }
4627 
4628     if (req_page)
4629         __free_page(req_page);
4630     __free_page(reply_page);
4631     return ret;
4632 }
4633 
4634 static void rbd_queue_workfn(struct work_struct *work)
4635 {
4636     struct rbd_img_request *img_request =
4637         container_of(work, struct rbd_img_request, work);
4638     struct rbd_device *rbd_dev = img_request->rbd_dev;
4639     enum obj_operation_type op_type = img_request->op_type;
4640     struct request *rq = blk_mq_rq_from_pdu(img_request);
4641     u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4642     u64 length = blk_rq_bytes(rq);
4643     u64 mapping_size;
4644     int result;
4645 
4646     /* Ignore/skip any zero-length requests */
4647     if (!length) {
4648         dout("%s: zero-length request\n", __func__);
4649         result = 0;
4650         goto err_img_request;
4651     }
4652 
4653     blk_mq_start_request(rq);
4654 
4655     down_read(&rbd_dev->header_rwsem);
4656     mapping_size = rbd_dev->mapping.size;
4657     rbd_img_capture_header(img_request);
4658     up_read(&rbd_dev->header_rwsem);
4659 
4660     if (offset + length > mapping_size) {
4661         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4662              length, mapping_size);
4663         result = -EIO;
4664         goto err_img_request;
4665     }
4666 
4667     dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
4668          img_request, obj_op_name(op_type), offset, length);
4669 
4670     if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4671         result = rbd_img_fill_nodata(img_request, offset, length);
4672     else
4673         result = rbd_img_fill_from_bio(img_request, offset, length,
4674                            rq->bio);
4675     if (result)
4676         goto err_img_request;
4677 
4678     rbd_img_handle_request(img_request, 0);
4679     return;
4680 
4681 err_img_request:
4682     rbd_img_request_destroy(img_request);
4683     if (result)
4684         rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4685              obj_op_name(op_type), length, offset, result);
4686     blk_mq_end_request(rq, errno_to_blk_status(result));
4687 }
4688 
4689 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4690         const struct blk_mq_queue_data *bd)
4691 {
4692     struct rbd_device *rbd_dev = hctx->queue->queuedata;
4693     struct rbd_img_request *img_req = blk_mq_rq_to_pdu(bd->rq);
4694     enum obj_operation_type op_type;
4695 
4696     switch (req_op(bd->rq)) {
4697     case REQ_OP_DISCARD:
4698         op_type = OBJ_OP_DISCARD;
4699         break;
4700     case REQ_OP_WRITE_ZEROES:
4701         op_type = OBJ_OP_ZEROOUT;
4702         break;
4703     case REQ_OP_WRITE:
4704         op_type = OBJ_OP_WRITE;
4705         break;
4706     case REQ_OP_READ:
4707         op_type = OBJ_OP_READ;
4708         break;
4709     default:
4710         rbd_warn(rbd_dev, "unknown req_op %d", req_op(bd->rq));
4711         return BLK_STS_IOERR;
4712     }
4713 
4714     rbd_img_request_init(img_req, rbd_dev, op_type);
4715 
4716     if (rbd_img_is_write(img_req)) {
4717         if (rbd_is_ro(rbd_dev)) {
4718             rbd_warn(rbd_dev, "%s on read-only mapping",
4719                  obj_op_name(img_req->op_type));
4720             return BLK_STS_IOERR;
4721         }
4722         rbd_assert(!rbd_is_snap(rbd_dev));
4723     }
4724 
4725     INIT_WORK(&img_req->work, rbd_queue_workfn);
4726     queue_work(rbd_wq, &img_req->work);
4727     return BLK_STS_OK;
4728 }
4729 
4730 static void rbd_free_disk(struct rbd_device *rbd_dev)
4731 {
4732     put_disk(rbd_dev->disk);
4733     blk_mq_free_tag_set(&rbd_dev->tag_set);
4734     rbd_dev->disk = NULL;
4735 }
4736 
4737 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4738                  struct ceph_object_id *oid,
4739                  struct ceph_object_locator *oloc,
4740                  void *buf, int buf_len)
4741 
4742 {
4743     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4744     struct ceph_osd_request *req;
4745     struct page **pages;
4746     int num_pages = calc_pages_for(0, buf_len);
4747     int ret;
4748 
4749     req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4750     if (!req)
4751         return -ENOMEM;
4752 
4753     ceph_oid_copy(&req->r_base_oid, oid);
4754     ceph_oloc_copy(&req->r_base_oloc, oloc);
4755     req->r_flags = CEPH_OSD_FLAG_READ;
4756 
4757     pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4758     if (IS_ERR(pages)) {
4759         ret = PTR_ERR(pages);
4760         goto out_req;
4761     }
4762 
4763     osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4764     osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4765                      true);
4766 
4767     ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4768     if (ret)
4769         goto out_req;
4770 
4771     ceph_osdc_start_request(osdc, req);
4772     ret = ceph_osdc_wait_request(osdc, req);
4773     if (ret >= 0)
4774         ceph_copy_from_page_vector(pages, buf, 0, ret);
4775 
4776 out_req:
4777     ceph_osdc_put_request(req);
4778     return ret;
4779 }
4780 
4781 /*
4782  * Read the complete header for the given rbd device.  On successful
4783  * return, the rbd_dev->header field will contain up-to-date
4784  * information about the image.
4785  */
4786 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4787 {
4788     struct rbd_image_header_ondisk *ondisk = NULL;
4789     u32 snap_count = 0;
4790     u64 names_size = 0;
4791     u32 want_count;
4792     int ret;
4793 
4794     /*
4795      * The complete header will include an array of its 64-bit
4796      * snapshot ids, followed by the names of those snapshots as
4797      * a contiguous block of NUL-terminated strings.  Note that
4798      * the number of snapshots could change by the time we read
4799      * it in, in which case we re-read it.
4800      */
4801     do {
4802         size_t size;
4803 
4804         kfree(ondisk);
4805 
4806         size = sizeof (*ondisk);
4807         size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4808         size += names_size;
4809         ondisk = kmalloc(size, GFP_KERNEL);
4810         if (!ondisk)
4811             return -ENOMEM;
4812 
4813         ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4814                     &rbd_dev->header_oloc, ondisk, size);
4815         if (ret < 0)
4816             goto out;
4817         if ((size_t)ret < size) {
4818             ret = -ENXIO;
4819             rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4820                 size, ret);
4821             goto out;
4822         }
4823         if (!rbd_dev_ondisk_valid(ondisk)) {
4824             ret = -ENXIO;
4825             rbd_warn(rbd_dev, "invalid header");
4826             goto out;
4827         }
4828 
4829         names_size = le64_to_cpu(ondisk->snap_names_len);
4830         want_count = snap_count;
4831         snap_count = le32_to_cpu(ondisk->snap_count);
4832     } while (snap_count != want_count);
4833 
4834     ret = rbd_header_from_disk(rbd_dev, ondisk);
4835 out:
4836     kfree(ondisk);
4837 
4838     return ret;
4839 }
4840 
4841 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4842 {
4843     sector_t size;
4844 
4845     /*
4846      * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4847      * try to update its size.  If REMOVING is set, updating size
4848      * is just useless work since the device can't be opened.
4849      */
4850     if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4851         !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4852         size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4853         dout("setting size to %llu sectors", (unsigned long long)size);
4854         set_capacity_and_notify(rbd_dev->disk, size);
4855     }
4856 }
4857 
4858 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4859 {
4860     u64 mapping_size;
4861     int ret;
4862 
4863     down_write(&rbd_dev->header_rwsem);
4864     mapping_size = rbd_dev->mapping.size;
4865 
4866     ret = rbd_dev_header_info(rbd_dev);
4867     if (ret)
4868         goto out;
4869 
4870     /*
4871      * If there is a parent, see if it has disappeared due to the
4872      * mapped image getting flattened.
4873      */
4874     if (rbd_dev->parent) {
4875         ret = rbd_dev_v2_parent_info(rbd_dev);
4876         if (ret)
4877             goto out;
4878     }
4879 
4880     rbd_assert(!rbd_is_snap(rbd_dev));
4881     rbd_dev->mapping.size = rbd_dev->header.image_size;
4882 
4883 out:
4884     up_write(&rbd_dev->header_rwsem);
4885     if (!ret && mapping_size != rbd_dev->mapping.size)
4886         rbd_dev_update_size(rbd_dev);
4887 
4888     return ret;
4889 }
4890 
4891 static const struct blk_mq_ops rbd_mq_ops = {
4892     .queue_rq   = rbd_queue_rq,
4893 };
4894 
4895 static int rbd_init_disk(struct rbd_device *rbd_dev)
4896 {
4897     struct gendisk *disk;
4898     struct request_queue *q;
4899     unsigned int objset_bytes =
4900         rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
4901     int err;
4902 
4903     memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4904     rbd_dev->tag_set.ops = &rbd_mq_ops;
4905     rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4906     rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4907     rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
4908     rbd_dev->tag_set.nr_hw_queues = num_present_cpus();
4909     rbd_dev->tag_set.cmd_size = sizeof(struct rbd_img_request);
4910 
4911     err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4912     if (err)
4913         return err;
4914 
4915     disk = blk_mq_alloc_disk(&rbd_dev->tag_set, rbd_dev);
4916     if (IS_ERR(disk)) {
4917         err = PTR_ERR(disk);
4918         goto out_tag_set;
4919     }
4920     q = disk->queue;
4921 
4922     snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4923          rbd_dev->dev_id);
4924     disk->major = rbd_dev->major;
4925     disk->first_minor = rbd_dev->minor;
4926     if (single_major)
4927         disk->minors = (1 << RBD_SINGLE_MAJOR_PART_SHIFT);
4928     else
4929         disk->minors = RBD_MINORS_PER_MAJOR;
4930     disk->fops = &rbd_bd_ops;
4931     disk->private_data = rbd_dev;
4932 
4933     blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
4934     /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4935 
4936     blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
4937     q->limits.max_sectors = queue_max_hw_sectors(q);
4938     blk_queue_max_segments(q, USHRT_MAX);
4939     blk_queue_max_segment_size(q, UINT_MAX);
4940     blk_queue_io_min(q, rbd_dev->opts->alloc_size);
4941     blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
4942 
4943     if (rbd_dev->opts->trim) {
4944         q->limits.discard_granularity = rbd_dev->opts->alloc_size;
4945         blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
4946         blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
4947     }
4948 
4949     if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4950         blk_queue_flag_set(QUEUE_FLAG_STABLE_WRITES, q);
4951 
4952     rbd_dev->disk = disk;
4953 
4954     return 0;
4955 out_tag_set:
4956     blk_mq_free_tag_set(&rbd_dev->tag_set);
4957     return err;
4958 }
4959 
4960 /*
4961   sysfs
4962 */
4963 
4964 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4965 {
4966     return container_of(dev, struct rbd_device, dev);
4967 }
4968 
4969 static ssize_t rbd_size_show(struct device *dev,
4970                  struct device_attribute *attr, char *buf)
4971 {
4972     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4973 
4974     return sprintf(buf, "%llu\n",
4975         (unsigned long long)rbd_dev->mapping.size);
4976 }
4977 
4978 static ssize_t rbd_features_show(struct device *dev,
4979                  struct device_attribute *attr, char *buf)
4980 {
4981     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4982 
4983     return sprintf(buf, "0x%016llx\n", rbd_dev->header.features);
4984 }
4985 
4986 static ssize_t rbd_major_show(struct device *dev,
4987                   struct device_attribute *attr, char *buf)
4988 {
4989     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4990 
4991     if (rbd_dev->major)
4992         return sprintf(buf, "%d\n", rbd_dev->major);
4993 
4994     return sprintf(buf, "(none)\n");
4995 }
4996 
4997 static ssize_t rbd_minor_show(struct device *dev,
4998                   struct device_attribute *attr, char *buf)
4999 {
5000     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5001 
5002     return sprintf(buf, "%d\n", rbd_dev->minor);
5003 }
5004 
5005 static ssize_t rbd_client_addr_show(struct device *dev,
5006                     struct device_attribute *attr, char *buf)
5007 {
5008     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5009     struct ceph_entity_addr *client_addr =
5010         ceph_client_addr(rbd_dev->rbd_client->client);
5011 
5012     return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
5013                le32_to_cpu(client_addr->nonce));
5014 }
5015 
5016 static ssize_t rbd_client_id_show(struct device *dev,
5017                   struct device_attribute *attr, char *buf)
5018 {
5019     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5020 
5021     return sprintf(buf, "client%lld\n",
5022                ceph_client_gid(rbd_dev->rbd_client->client));
5023 }
5024 
5025 static ssize_t rbd_cluster_fsid_show(struct device *dev,
5026                      struct device_attribute *attr, char *buf)
5027 {
5028     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5029 
5030     return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
5031 }
5032 
5033 static ssize_t rbd_config_info_show(struct device *dev,
5034                     struct device_attribute *attr, char *buf)
5035 {
5036     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5037 
5038     if (!capable(CAP_SYS_ADMIN))
5039         return -EPERM;
5040 
5041     return sprintf(buf, "%s\n", rbd_dev->config_info);
5042 }
5043 
5044 static ssize_t rbd_pool_show(struct device *dev,
5045                  struct device_attribute *attr, char *buf)
5046 {
5047     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5048 
5049     return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
5050 }
5051 
5052 static ssize_t rbd_pool_id_show(struct device *dev,
5053                  struct device_attribute *attr, char *buf)
5054 {
5055     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5056 
5057     return sprintf(buf, "%llu\n",
5058             (unsigned long long) rbd_dev->spec->pool_id);
5059 }
5060 
5061 static ssize_t rbd_pool_ns_show(struct device *dev,
5062                 struct device_attribute *attr, char *buf)
5063 {
5064     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5065 
5066     return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
5067 }
5068 
5069 static ssize_t rbd_name_show(struct device *dev,
5070                  struct device_attribute *attr, char *buf)
5071 {
5072     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5073 
5074     if (rbd_dev->spec->image_name)
5075         return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
5076 
5077     return sprintf(buf, "(unknown)\n");
5078 }
5079 
5080 static ssize_t rbd_image_id_show(struct device *dev,
5081                  struct device_attribute *attr, char *buf)
5082 {
5083     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5084 
5085     return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
5086 }
5087 
5088 /*
5089  * Shows the name of the currently-mapped snapshot (or
5090  * RBD_SNAP_HEAD_NAME for the base image).
5091  */
5092 static ssize_t rbd_snap_show(struct device *dev,
5093                  struct device_attribute *attr,
5094                  char *buf)
5095 {
5096     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5097 
5098     return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
5099 }
5100 
5101 static ssize_t rbd_snap_id_show(struct device *dev,
5102                 struct device_attribute *attr, char *buf)
5103 {
5104     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5105 
5106     return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
5107 }
5108 
5109 /*
5110  * For a v2 image, shows the chain of parent images, separated by empty
5111  * lines.  For v1 images or if there is no parent, shows "(no parent
5112  * image)".
5113  */
5114 static ssize_t rbd_parent_show(struct device *dev,
5115                    struct device_attribute *attr,
5116                    char *buf)
5117 {
5118     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5119     ssize_t count = 0;
5120 
5121     if (!rbd_dev->parent)
5122         return sprintf(buf, "(no parent image)\n");
5123 
5124     for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
5125         struct rbd_spec *spec = rbd_dev->parent_spec;
5126 
5127         count += sprintf(&buf[count], "%s"
5128                 "pool_id %llu\npool_name %s\n"
5129                 "pool_ns %s\n"
5130                 "image_id %s\nimage_name %s\n"
5131                 "snap_id %llu\nsnap_name %s\n"
5132                 "overlap %llu\n",
5133                 !count ? "" : "\n", /* first? */
5134                 spec->pool_id, spec->pool_name,
5135                 spec->pool_ns ?: "",
5136                 spec->image_id, spec->image_name ?: "(unknown)",
5137                 spec->snap_id, spec->snap_name,
5138                 rbd_dev->parent_overlap);
5139     }
5140 
5141     return count;
5142 }
5143 
5144 static ssize_t rbd_image_refresh(struct device *dev,
5145                  struct device_attribute *attr,
5146                  const char *buf,
5147                  size_t size)
5148 {
5149     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5150     int ret;
5151 
5152     if (!capable(CAP_SYS_ADMIN))
5153         return -EPERM;
5154 
5155     ret = rbd_dev_refresh(rbd_dev);
5156     if (ret)
5157         return ret;
5158 
5159     return size;
5160 }
5161 
5162 static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
5163 static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
5164 static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
5165 static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
5166 static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
5167 static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
5168 static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
5169 static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
5170 static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
5171 static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
5172 static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
5173 static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
5174 static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
5175 static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
5176 static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
5177 static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
5178 static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
5179 
5180 static struct attribute *rbd_attrs[] = {
5181     &dev_attr_size.attr,
5182     &dev_attr_features.attr,
5183     &dev_attr_major.attr,
5184     &dev_attr_minor.attr,
5185     &dev_attr_client_addr.attr,
5186     &dev_attr_client_id.attr,
5187     &dev_attr_cluster_fsid.attr,
5188     &dev_attr_config_info.attr,
5189     &dev_attr_pool.attr,
5190     &dev_attr_pool_id.attr,
5191     &dev_attr_pool_ns.attr,
5192     &dev_attr_name.attr,
5193     &dev_attr_image_id.attr,
5194     &dev_attr_current_snap.attr,
5195     &dev_attr_snap_id.attr,
5196     &dev_attr_parent.attr,
5197     &dev_attr_refresh.attr,
5198     NULL
5199 };
5200 
5201 static struct attribute_group rbd_attr_group = {
5202     .attrs = rbd_attrs,
5203 };
5204 
5205 static const struct attribute_group *rbd_attr_groups[] = {
5206     &rbd_attr_group,
5207     NULL
5208 };
5209 
5210 static void rbd_dev_release(struct device *dev);
5211 
5212 static const struct device_type rbd_device_type = {
5213     .name       = "rbd",
5214     .groups     = rbd_attr_groups,
5215     .release    = rbd_dev_release,
5216 };
5217 
5218 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
5219 {
5220     kref_get(&spec->kref);
5221 
5222     return spec;
5223 }
5224 
5225 static void rbd_spec_free(struct kref *kref);
5226 static void rbd_spec_put(struct rbd_spec *spec)
5227 {
5228     if (spec)
5229         kref_put(&spec->kref, rbd_spec_free);
5230 }
5231 
5232 static struct rbd_spec *rbd_spec_alloc(void)
5233 {
5234     struct rbd_spec *spec;
5235 
5236     spec = kzalloc(sizeof (*spec), GFP_KERNEL);
5237     if (!spec)
5238         return NULL;
5239 
5240     spec->pool_id = CEPH_NOPOOL;
5241     spec->snap_id = CEPH_NOSNAP;
5242     kref_init(&spec->kref);
5243 
5244     return spec;
5245 }
5246 
5247 static void rbd_spec_free(struct kref *kref)
5248 {
5249     struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
5250 
5251     kfree(spec->pool_name);
5252     kfree(spec->pool_ns);
5253     kfree(spec->image_id);
5254     kfree(spec->image_name);
5255     kfree(spec->snap_name);
5256     kfree(spec);
5257 }
5258 
5259 static void rbd_dev_free(struct rbd_device *rbd_dev)
5260 {
5261     WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
5262     WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
5263 
5264     ceph_oid_destroy(&rbd_dev->header_oid);
5265     ceph_oloc_destroy(&rbd_dev->header_oloc);
5266     kfree(rbd_dev->config_info);
5267 
5268     rbd_put_client(rbd_dev->rbd_client);
5269     rbd_spec_put(rbd_dev->spec);
5270     kfree(rbd_dev->opts);
5271     kfree(rbd_dev);
5272 }
5273 
5274 static void rbd_dev_release(struct device *dev)
5275 {
5276     struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5277     bool need_put = !!rbd_dev->opts;
5278 
5279     if (need_put) {
5280         destroy_workqueue(rbd_dev->task_wq);
5281         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5282     }
5283 
5284     rbd_dev_free(rbd_dev);
5285 
5286     /*
5287      * This is racy, but way better than putting module outside of
5288      * the release callback.  The race window is pretty small, so
5289      * doing something similar to dm (dm-builtin.c) is overkill.
5290      */
5291     if (need_put)
5292         module_put(THIS_MODULE);
5293 }
5294 
5295 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
5296                        struct rbd_spec *spec)
5297 {
5298     struct rbd_device *rbd_dev;
5299 
5300     rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
5301     if (!rbd_dev)
5302         return NULL;
5303 
5304     spin_lock_init(&rbd_dev->lock);
5305     INIT_LIST_HEAD(&rbd_dev->node);
5306     init_rwsem(&rbd_dev->header_rwsem);
5307 
5308     rbd_dev->header.data_pool_id = CEPH_NOPOOL;
5309     ceph_oid_init(&rbd_dev->header_oid);
5310     rbd_dev->header_oloc.pool = spec->pool_id;
5311     if (spec->pool_ns) {
5312         WARN_ON(!*spec->pool_ns);
5313         rbd_dev->header_oloc.pool_ns =
5314             ceph_find_or_create_string(spec->pool_ns,
5315                            strlen(spec->pool_ns));
5316     }
5317 
5318     mutex_init(&rbd_dev->watch_mutex);
5319     rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5320     INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5321 
5322     init_rwsem(&rbd_dev->lock_rwsem);
5323     rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5324     INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5325     INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5326     INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5327     INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5328     spin_lock_init(&rbd_dev->lock_lists_lock);
5329     INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5330     INIT_LIST_HEAD(&rbd_dev->running_list);
5331     init_completion(&rbd_dev->acquire_wait);
5332     init_completion(&rbd_dev->releasing_wait);
5333 
5334     spin_lock_init(&rbd_dev->object_map_lock);
5335 
5336     rbd_dev->dev.bus = &rbd_bus_type;
5337     rbd_dev->dev.type = &rbd_device_type;
5338     rbd_dev->dev.parent = &rbd_root_dev;
5339     device_initialize(&rbd_dev->dev);
5340 
5341     rbd_dev->rbd_client = rbdc;
5342     rbd_dev->spec = spec;
5343 
5344     return rbd_dev;
5345 }
5346 
5347 /*
5348  * Create a mapping rbd_dev.
5349  */
5350 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5351                      struct rbd_spec *spec,
5352                      struct rbd_options *opts)
5353 {
5354     struct rbd_device *rbd_dev;
5355 
5356     rbd_dev = __rbd_dev_create(rbdc, spec);
5357     if (!rbd_dev)
5358         return NULL;
5359 
5360     rbd_dev->opts = opts;
5361 
5362     /* get an id and fill in device name */
5363     rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
5364                      minor_to_rbd_dev_id(1 << MINORBITS),
5365                      GFP_KERNEL);
5366     if (rbd_dev->dev_id < 0)
5367         goto fail_rbd_dev;
5368 
5369     sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5370     rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5371                            rbd_dev->name);
5372     if (!rbd_dev->task_wq)
5373         goto fail_dev_id;
5374 
5375     /* we have a ref from do_rbd_add() */
5376     __module_get(THIS_MODULE);
5377 
5378     dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5379     return rbd_dev;
5380 
5381 fail_dev_id:
5382     ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5383 fail_rbd_dev:
5384     rbd_dev_free(rbd_dev);
5385     return NULL;
5386 }
5387 
5388 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5389 {
5390     if (rbd_dev)
5391         put_device(&rbd_dev->dev);
5392 }
5393 
5394 /*
5395  * Get the size and object order for an image snapshot, or if
5396  * snap_id is CEPH_NOSNAP, gets this information for the base
5397  * image.
5398  */
5399 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5400                 u8 *order, u64 *snap_size)
5401 {
5402     __le64 snapid = cpu_to_le64(snap_id);
5403     int ret;
5404     struct {
5405         u8 order;
5406         __le64 size;
5407     } __attribute__ ((packed)) size_buf = { 0 };
5408 
5409     ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5410                   &rbd_dev->header_oloc, "get_size",
5411                   &snapid, sizeof(snapid),
5412                   &size_buf, sizeof(size_buf));
5413     dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5414     if (ret < 0)
5415         return ret;
5416     if (ret < sizeof (size_buf))
5417         return -ERANGE;
5418 
5419     if (order) {
5420         *order = size_buf.order;
5421         dout("  order %u", (unsigned int)*order);
5422     }
5423     *snap_size = le64_to_cpu(size_buf.size);
5424 
5425     dout("  snap_id 0x%016llx snap_size = %llu\n",
5426         (unsigned long long)snap_id,
5427         (unsigned long long)*snap_size);
5428 
5429     return 0;
5430 }
5431 
5432 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5433 {
5434     return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
5435                     &rbd_dev->header.obj_order,
5436                     &rbd_dev->header.image_size);
5437 }
5438 
5439 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5440 {
5441     size_t size;
5442     void *reply_buf;
5443     int ret;
5444     void *p;
5445 
5446     /* Response will be an encoded string, which includes a length */
5447     size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
5448     reply_buf = kzalloc(size, GFP_KERNEL);
5449     if (!reply_buf)
5450         return -ENOMEM;
5451 
5452     ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5453                   &rbd_dev->header_oloc, "get_object_prefix",
5454                   NULL, 0, reply_buf, size);
5455     dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5456     if (ret < 0)
5457         goto out;
5458 
5459     p = reply_buf;
5460     rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5461                         p + ret, NULL, GFP_NOIO);
5462     ret = 0;
5463 
5464     if (IS_ERR(rbd_dev->header.object_prefix)) {
5465         ret = PTR_ERR(rbd_dev->header.object_prefix);
5466         rbd_dev->header.object_prefix = NULL;
5467     } else {
5468         dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
5469     }
5470 out:
5471     kfree(reply_buf);
5472 
5473     return ret;
5474 }
5475 
5476 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5477                      bool read_only, u64 *snap_features)
5478 {
5479     struct {
5480         __le64 snap_id;
5481         u8 read_only;
5482     } features_in;
5483     struct {
5484         __le64 features;
5485         __le64 incompat;
5486     } __attribute__ ((packed)) features_buf = { 0 };
5487     u64 unsup;
5488     int ret;
5489 
5490     features_in.snap_id = cpu_to_le64(snap_id);
5491     features_in.read_only = read_only;
5492 
5493     ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5494                   &rbd_dev->header_oloc, "get_features",
5495                   &features_in, sizeof(features_in),
5496                   &features_buf, sizeof(features_buf));
5497     dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5498     if (ret < 0)
5499         return ret;
5500     if (ret < sizeof (features_buf))
5501         return -ERANGE;
5502 
5503     unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5504     if (unsup) {
5505         rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5506              unsup);
5507         return -ENXIO;
5508     }
5509 
5510     *snap_features = le64_to_cpu(features_buf.features);
5511 
5512     dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5513         (unsigned long long)snap_id,
5514         (unsigned long long)*snap_features,
5515         (unsigned long long)le64_to_cpu(features_buf.incompat));
5516 
5517     return 0;
5518 }
5519 
5520 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5521 {
5522     return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5523                      rbd_is_ro(rbd_dev),
5524                      &rbd_dev->header.features);
5525 }
5526 
5527 /*
5528  * These are generic image flags, but since they are used only for
5529  * object map, store them in rbd_dev->object_map_flags.
5530  *
5531  * For the same reason, this function is called only on object map
5532  * (re)load and not on header refresh.
5533  */
5534 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5535 {
5536     __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5537     __le64 flags;
5538     int ret;
5539 
5540     ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5541                   &rbd_dev->header_oloc, "get_flags",
5542                   &snapid, sizeof(snapid),
5543                   &flags, sizeof(flags));
5544     if (ret < 0)
5545         return ret;
5546     if (ret < sizeof(flags))
5547         return -EBADMSG;
5548 
5549     rbd_dev->object_map_flags = le64_to_cpu(flags);
5550     return 0;
5551 }
5552 
5553 struct parent_image_info {
5554     u64     pool_id;
5555     const char  *pool_ns;
5556     const char  *image_id;
5557     u64     snap_id;
5558 
5559     bool        has_overlap;
5560     u64     overlap;
5561 };
5562 
5563 /*
5564  * The caller is responsible for @pii.
5565  */
5566 static int decode_parent_image_spec(void **p, void *end,
5567                     struct parent_image_info *pii)
5568 {
5569     u8 struct_v;
5570     u32 struct_len;
5571     int ret;
5572 
5573     ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
5574                   &struct_v, &struct_len);
5575     if (ret)
5576         return ret;
5577 
5578     ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
5579     pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5580     if (IS_ERR(pii->pool_ns)) {
5581         ret = PTR_ERR(pii->pool_ns);
5582         pii->pool_ns = NULL;
5583         return ret;
5584     }
5585     pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5586     if (IS_ERR(pii->image_id)) {
5587         ret = PTR_ERR(pii->image_id);
5588         pii->image_id = NULL;
5589         return ret;
5590     }
5591     ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
5592     return 0;
5593 
5594 e_inval:
5595     return -EINVAL;
5596 }
5597 
5598 static int __get_parent_info(struct rbd_device *rbd_dev,
5599                  struct page *req_page,
5600                  struct page *reply_page,
5601                  struct parent_image_info *pii)
5602 {
5603     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5604     size_t reply_len = PAGE_SIZE;
5605     void *p, *end;
5606     int ret;
5607 
5608     ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5609                  "rbd", "parent_get", CEPH_OSD_FLAG_READ,
5610                  req_page, sizeof(u64), &reply_page, &reply_len);
5611     if (ret)
5612         return ret == -EOPNOTSUPP ? 1 : ret;
5613 
5614     p = page_address(reply_page);
5615     end = p + reply_len;
5616     ret = decode_parent_image_spec(&p, end, pii);
5617     if (ret)
5618         return ret;
5619 
5620     ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5621                  "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
5622                  req_page, sizeof(u64), &reply_page, &reply_len);
5623     if (ret)
5624         return ret;
5625 
5626     p = page_address(reply_page);
5627     end = p + reply_len;
5628     ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
5629     if (pii->has_overlap)
5630         ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5631 
5632     return 0;
5633 
5634 e_inval:
5635     return -EINVAL;
5636 }
5637 
5638 /*
5639  * The caller is responsible for @pii.
5640  */
5641 static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
5642                     struct page *req_page,
5643                     struct page *reply_page,
5644                     struct parent_image_info *pii)
5645 {
5646     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5647     size_t reply_len = PAGE_SIZE;
5648     void *p, *end;
5649     int ret;
5650 
5651     ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5652                  "rbd", "get_parent", CEPH_OSD_FLAG_READ,
5653                  req_page, sizeof(u64), &reply_page, &reply_len);
5654     if (ret)
5655         return ret;
5656 
5657     p = page_address(reply_page);
5658     end = p + reply_len;
5659     ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
5660     pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5661     if (IS_ERR(pii->image_id)) {
5662         ret = PTR_ERR(pii->image_id);
5663         pii->image_id = NULL;
5664         return ret;
5665     }
5666     ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
5667     pii->has_overlap = true;
5668     ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5669 
5670     return 0;
5671 
5672 e_inval:
5673     return -EINVAL;
5674 }
5675 
5676 static int get_parent_info(struct rbd_device *rbd_dev,
5677                struct parent_image_info *pii)
5678 {
5679     struct page *req_page, *reply_page;
5680     void *p;
5681     int ret;
5682 
5683     req_page = alloc_page(GFP_KERNEL);
5684     if (!req_page)
5685         return -ENOMEM;
5686 
5687     reply_page = alloc_page(GFP_KERNEL);
5688     if (!reply_page) {
5689         __free_page(req_page);
5690         return -ENOMEM;
5691     }
5692 
5693     p = page_address(req_page);
5694     ceph_encode_64(&p, rbd_dev->spec->snap_id);
5695     ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
5696     if (ret > 0)
5697         ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
5698                            pii);
5699 
5700     __free_page(req_page);
5701     __free_page(reply_page);
5702     return ret;
5703 }
5704 
5705 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5706 {
5707     struct rbd_spec *parent_spec;
5708     struct parent_image_info pii = { 0 };
5709     int ret;
5710 
5711     parent_spec = rbd_spec_alloc();
5712     if (!parent_spec)
5713         return -ENOMEM;
5714 
5715     ret = get_parent_info(rbd_dev, &pii);
5716     if (ret)
5717         goto out_err;
5718 
5719     dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5720          __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
5721          pii.has_overlap, pii.overlap);
5722 
5723     if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
5724         /*
5725          * Either the parent never existed, or we have
5726          * record of it but the image got flattened so it no
5727          * longer has a parent.  When the parent of a
5728          * layered image disappears we immediately set the
5729          * overlap to 0.  The effect of this is that all new
5730          * requests will be treated as if the image had no
5731          * parent.
5732          *
5733          * If !pii.has_overlap, the parent image spec is not
5734          * applicable.  It's there to avoid duplication in each
5735          * snapshot record.
5736          */
5737         if (rbd_dev->parent_overlap) {
5738             rbd_dev->parent_overlap = 0;
5739             rbd_dev_parent_put(rbd_dev);
5740             pr_info("%s: clone image has been flattened\n",
5741                 rbd_dev->disk->disk_name);
5742         }
5743 
5744         goto out;   /* No parent?  No problem. */
5745     }
5746 
5747     /* The ceph file layout needs to fit pool id in 32 bits */
5748 
5749     ret = -EIO;
5750     if (pii.pool_id > (u64)U32_MAX) {
5751         rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5752             (unsigned long long)pii.pool_id, U32_MAX);
5753         goto out_err;
5754     }
5755 
5756     /*
5757      * The parent won't change (except when the clone is
5758      * flattened, already handled that).  So we only need to
5759      * record the parent spec we have not already done so.
5760      */
5761     if (!rbd_dev->parent_spec) {
5762         parent_spec->pool_id = pii.pool_id;
5763         if (pii.pool_ns && *pii.pool_ns) {
5764             parent_spec->pool_ns = pii.pool_ns;
5765             pii.pool_ns = NULL;
5766         }
5767         parent_spec->image_id = pii.image_id;
5768         pii.image_id = NULL;
5769         parent_spec->snap_id = pii.snap_id;
5770 
5771         rbd_dev->parent_spec = parent_spec;
5772         parent_spec = NULL; /* rbd_dev now owns this */
5773     }
5774 
5775     /*
5776      * We always update the parent overlap.  If it's zero we issue
5777      * a warning, as we will proceed as if there was no parent.
5778      */
5779     if (!pii.overlap) {
5780         if (parent_spec) {
5781             /* refresh, careful to warn just once */
5782             if (rbd_dev->parent_overlap)
5783                 rbd_warn(rbd_dev,
5784                     "clone now standalone (overlap became 0)");
5785         } else {
5786             /* initial probe */
5787             rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5788         }
5789     }
5790     rbd_dev->parent_overlap = pii.overlap;
5791 
5792 out:
5793     ret = 0;
5794 out_err:
5795     kfree(pii.pool_ns);
5796     kfree(pii.image_id);
5797     rbd_spec_put(parent_spec);
5798     return ret;
5799 }
5800 
5801 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5802 {
5803     struct {
5804         __le64 stripe_unit;
5805         __le64 stripe_count;
5806     } __attribute__ ((packed)) striping_info_buf = { 0 };
5807     size_t size = sizeof (striping_info_buf);
5808     void *p;
5809     int ret;
5810 
5811     ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5812                 &rbd_dev->header_oloc, "get_stripe_unit_count",
5813                 NULL, 0, &striping_info_buf, size);
5814     dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5815     if (ret < 0)
5816         return ret;
5817     if (ret < size)
5818         return -ERANGE;
5819 
5820     p = &striping_info_buf;
5821     rbd_dev->header.stripe_unit = ceph_decode_64(&p);
5822     rbd_dev->header.stripe_count = ceph_decode_64(&p);
5823     return 0;
5824 }
5825 
5826 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5827 {
5828     __le64 data_pool_id;
5829     int ret;
5830 
5831     ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5832                   &rbd_dev->header_oloc, "get_data_pool",
5833                   NULL, 0, &data_pool_id, sizeof(data_pool_id));
5834     if (ret < 0)
5835         return ret;
5836     if (ret < sizeof(data_pool_id))
5837         return -EBADMSG;
5838 
5839     rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5840     WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5841     return 0;
5842 }
5843 
5844 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5845 {
5846     CEPH_DEFINE_OID_ONSTACK(oid);
5847     size_t image_id_size;
5848     char *image_id;
5849     void *p;
5850     void *end;
5851     size_t size;
5852     void *reply_buf = NULL;
5853     size_t len = 0;
5854     char *image_name = NULL;
5855     int ret;
5856 
5857     rbd_assert(!rbd_dev->spec->image_name);
5858 
5859     len = strlen(rbd_dev->spec->image_id);
5860     image_id_size = sizeof (__le32) + len;
5861     image_id = kmalloc(image_id_size, GFP_KERNEL);
5862     if (!image_id)
5863         return NULL;
5864 
5865     p = image_id;
5866     end = image_id + image_id_size;
5867     ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5868 
5869     size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5870     reply_buf = kmalloc(size, GFP_KERNEL);
5871     if (!reply_buf)
5872         goto out;
5873 
5874     ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5875     ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5876                   "dir_get_name", image_id, image_id_size,
5877                   reply_buf, size);
5878     if (ret < 0)
5879         goto out;
5880     p = reply_buf;
5881     end = reply_buf + ret;
5882 
5883     image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5884     if (IS_ERR(image_name))
5885         image_name = NULL;
5886     else
5887         dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5888 out:
5889     kfree(reply_buf);
5890     kfree(image_id);
5891 
5892     return image_name;
5893 }
5894 
5895 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5896 {
5897     struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5898     const char *snap_name;
5899     u32 which = 0;
5900 
5901     /* Skip over names until we find the one we are looking for */
5902 
5903     snap_name = rbd_dev->header.snap_names;
5904     while (which < snapc->num_snaps) {
5905         if (!strcmp(name, snap_name))
5906             return snapc->snaps[which];
5907         snap_name += strlen(snap_name) + 1;
5908         which++;
5909     }
5910     return CEPH_NOSNAP;
5911 }
5912 
5913 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5914 {
5915     struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5916     u32 which;
5917     bool found = false;
5918     u64 snap_id;
5919 
5920     for (which = 0; !found && which < snapc->num_snaps; which++) {
5921         const char *snap_name;
5922 
5923         snap_id = snapc->snaps[which];
5924         snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5925         if (IS_ERR(snap_name)) {
5926             /* ignore no-longer existing snapshots */
5927             if (PTR_ERR(snap_name) == -ENOENT)
5928                 continue;
5929             else
5930                 break;
5931         }
5932         found = !strcmp(name, snap_name);
5933         kfree(snap_name);
5934     }
5935     return found ? snap_id : CEPH_NOSNAP;
5936 }
5937 
5938 /*
5939  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5940  * no snapshot by that name is found, or if an error occurs.
5941  */
5942 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5943 {
5944     if (rbd_dev->image_format == 1)
5945         return rbd_v1_snap_id_by_name(rbd_dev, name);
5946 
5947     return rbd_v2_snap_id_by_name(rbd_dev, name);
5948 }
5949 
5950 /*
5951  * An image being mapped will have everything but the snap id.
5952  */
5953 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5954 {
5955     struct rbd_spec *spec = rbd_dev->spec;
5956 
5957     rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5958     rbd_assert(spec->image_id && spec->image_name);
5959     rbd_assert(spec->snap_name);
5960 
5961     if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5962         u64 snap_id;
5963 
5964         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5965         if (snap_id == CEPH_NOSNAP)
5966             return -ENOENT;
5967 
5968         spec->snap_id = snap_id;
5969     } else {
5970         spec->snap_id = CEPH_NOSNAP;
5971     }
5972 
5973     return 0;
5974 }
5975 
5976 /*
5977  * A parent image will have all ids but none of the names.
5978  *
5979  * All names in an rbd spec are dynamically allocated.  It's OK if we
5980  * can't figure out the name for an image id.
5981  */
5982 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5983 {
5984     struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5985     struct rbd_spec *spec = rbd_dev->spec;
5986     const char *pool_name;
5987     const char *image_name;
5988     const char *snap_name;
5989     int ret;
5990 
5991     rbd_assert(spec->pool_id != CEPH_NOPOOL);
5992     rbd_assert(spec->image_id);
5993     rbd_assert(spec->snap_id != CEPH_NOSNAP);
5994 
5995     /* Get the pool name; we have to make our own copy of this */
5996 
5997     pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5998     if (!pool_name) {
5999         rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
6000         return -EIO;
6001     }
6002     pool_name = kstrdup(pool_name, GFP_KERNEL);
6003     if (!pool_name)
6004         return -ENOMEM;
6005 
6006     /* Fetch the image name; tolerate failure here */
6007 
6008     image_name = rbd_dev_image_name(rbd_dev);
6009     if (!image_name)
6010         rbd_warn(rbd_dev, "unable to get image name");
6011 
6012     /* Fetch the snapshot name */
6013 
6014     snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
6015     if (IS_ERR(snap_name)) {
6016         ret = PTR_ERR(snap_name);
6017         goto out_err;
6018     }
6019 
6020     spec->pool_name = pool_name;
6021     spec->image_name = image_name;
6022     spec->snap_name = snap_name;
6023 
6024     return 0;
6025 
6026 out_err:
6027     kfree(image_name);
6028     kfree(pool_name);
6029     return ret;
6030 }
6031 
6032 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
6033 {
6034     size_t size;
6035     int ret;
6036     void *reply_buf;
6037     void *p;
6038     void *end;
6039     u64 seq;
6040     u32 snap_count;
6041     struct ceph_snap_context *snapc;
6042     u32 i;
6043 
6044     /*
6045      * We'll need room for the seq value (maximum snapshot id),
6046      * snapshot count, and array of that many snapshot ids.
6047      * For now we have a fixed upper limit on the number we're
6048      * prepared to receive.
6049      */
6050     size = sizeof (__le64) + sizeof (__le32) +
6051             RBD_MAX_SNAP_COUNT * sizeof (__le64);
6052     reply_buf = kzalloc(size, GFP_KERNEL);
6053     if (!reply_buf)
6054         return -ENOMEM;
6055 
6056     ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6057                   &rbd_dev->header_oloc, "get_snapcontext",
6058                   NULL, 0, reply_buf, size);
6059     dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6060     if (ret < 0)
6061         goto out;
6062 
6063     p = reply_buf;
6064     end = reply_buf + ret;
6065     ret = -ERANGE;
6066     ceph_decode_64_safe(&p, end, seq, out);
6067     ceph_decode_32_safe(&p, end, snap_count, out);
6068 
6069     /*
6070      * Make sure the reported number of snapshot ids wouldn't go
6071      * beyond the end of our buffer.  But before checking that,
6072      * make sure the computed size of the snapshot context we
6073      * allocate is representable in a size_t.
6074      */
6075     if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
6076                  / sizeof (u64)) {
6077         ret = -EINVAL;
6078         goto out;
6079     }
6080     if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
6081         goto out;
6082     ret = 0;
6083 
6084     snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6085     if (!snapc) {
6086         ret = -ENOMEM;
6087         goto out;
6088     }
6089     snapc->seq = seq;
6090     for (i = 0; i < snap_count; i++)
6091         snapc->snaps[i] = ceph_decode_64(&p);
6092 
6093     ceph_put_snap_context(rbd_dev->header.snapc);
6094     rbd_dev->header.snapc = snapc;
6095 
6096     dout("  snap context seq = %llu, snap_count = %u\n",
6097         (unsigned long long)seq, (unsigned int)snap_count);
6098 out:
6099     kfree(reply_buf);
6100 
6101     return ret;
6102 }
6103 
6104 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
6105                     u64 snap_id)
6106 {
6107     size_t size;
6108     void *reply_buf;
6109     __le64 snapid;
6110     int ret;
6111     void *p;
6112     void *end;
6113     char *snap_name;
6114 
6115     size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
6116     reply_buf = kmalloc(size, GFP_KERNEL);
6117     if (!reply_buf)
6118         return ERR_PTR(-ENOMEM);
6119 
6120     snapid = cpu_to_le64(snap_id);
6121     ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6122                   &rbd_dev->header_oloc, "get_snapshot_name",
6123                   &snapid, sizeof(snapid), reply_buf, size);
6124     dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6125     if (ret < 0) {
6126         snap_name = ERR_PTR(ret);
6127         goto out;
6128     }
6129 
6130     p = reply_buf;
6131     end = reply_buf + ret;
6132     snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
6133     if (IS_ERR(snap_name))
6134         goto out;
6135 
6136     dout("  snap_id 0x%016llx snap_name = %s\n",
6137         (unsigned long long)snap_id, snap_name);
6138 out:
6139     kfree(reply_buf);
6140 
6141     return snap_name;
6142 }
6143 
6144 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
6145 {
6146     bool first_time = rbd_dev->header.object_prefix == NULL;
6147     int ret;
6148 
6149     ret = rbd_dev_v2_image_size(rbd_dev);
6150     if (ret)
6151         return ret;
6152 
6153     if (first_time) {
6154         ret = rbd_dev_v2_header_onetime(rbd_dev);
6155         if (ret)
6156             return ret;
6157     }
6158 
6159     ret = rbd_dev_v2_snap_context(rbd_dev);
6160     if (ret && first_time) {
6161         kfree(rbd_dev->header.object_prefix);
6162         rbd_dev->header.object_prefix = NULL;
6163     }
6164 
6165     return ret;
6166 }
6167 
6168 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
6169 {
6170     rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6171 
6172     if (rbd_dev->image_format == 1)
6173         return rbd_dev_v1_header_info(rbd_dev);
6174 
6175     return rbd_dev_v2_header_info(rbd_dev);
6176 }
6177 
6178 /*
6179  * Skips over white space at *buf, and updates *buf to point to the
6180  * first found non-space character (if any). Returns the length of
6181  * the token (string of non-white space characters) found.  Note
6182  * that *buf must be terminated with '\0'.
6183  */
6184 static inline size_t next_token(const char **buf)
6185 {
6186         /*
6187         * These are the characters that produce nonzero for
6188         * isspace() in the "C" and "POSIX" locales.
6189         */
6190     static const char spaces[] = " \f\n\r\t\v";
6191 
6192         *buf += strspn(*buf, spaces);   /* Find start of token */
6193 
6194     return strcspn(*buf, spaces);   /* Return token length */
6195 }
6196 
6197 /*
6198  * Finds the next token in *buf, dynamically allocates a buffer big
6199  * enough to hold a copy of it, and copies the token into the new
6200  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
6201  * that a duplicate buffer is created even for a zero-length token.
6202  *
6203  * Returns a pointer to the newly-allocated duplicate, or a null
6204  * pointer if memory for the duplicate was not available.  If
6205  * the lenp argument is a non-null pointer, the length of the token
6206  * (not including the '\0') is returned in *lenp.
6207  *
6208  * If successful, the *buf pointer will be updated to point beyond
6209  * the end of the found token.
6210  *
6211  * Note: uses GFP_KERNEL for allocation.
6212  */
6213 static inline char *dup_token(const char **buf, size_t *lenp)
6214 {
6215     char *dup;
6216     size_t len;
6217 
6218     len = next_token(buf);
6219     dup = kmemdup(*buf, len + 1, GFP_KERNEL);
6220     if (!dup)
6221         return NULL;
6222     *(dup + len) = '\0';
6223     *buf += len;
6224 
6225     if (lenp)
6226         *lenp = len;
6227 
6228     return dup;
6229 }
6230 
6231 static int rbd_parse_param(struct fs_parameter *param,
6232                 struct rbd_parse_opts_ctx *pctx)
6233 {
6234     struct rbd_options *opt = pctx->opts;
6235     struct fs_parse_result result;
6236     struct p_log log = {.prefix = "rbd"};
6237     int token, ret;
6238 
6239     ret = ceph_parse_param(param, pctx->copts, NULL);
6240     if (ret != -ENOPARAM)
6241         return ret;
6242 
6243     token = __fs_parse(&log, rbd_parameters, param, &result);
6244     dout("%s fs_parse '%s' token %d\n", __func__, param->key, token);
6245     if (token < 0) {
6246         if (token == -ENOPARAM)
6247             return inval_plog(&log, "Unknown parameter '%s'",
6248                       param->key);
6249         return token;
6250     }
6251 
6252     switch (token) {
6253     case Opt_queue_depth:
6254         if (result.uint_32 < 1)
6255             goto out_of_range;
6256         opt->queue_depth = result.uint_32;
6257         break;
6258     case Opt_alloc_size:
6259         if (result.uint_32 < SECTOR_SIZE)
6260             goto out_of_range;
6261         if (!is_power_of_2(result.uint_32))
6262             return inval_plog(&log, "alloc_size must be a power of 2");
6263         opt->alloc_size = result.uint_32;
6264         break;
6265     case Opt_lock_timeout:
6266         /* 0 is "wait forever" (i.e. infinite timeout) */
6267         if (result.uint_32 > INT_MAX / 1000)
6268             goto out_of_range;
6269         opt->lock_timeout = msecs_to_jiffies(result.uint_32 * 1000);
6270         break;
6271     case Opt_pool_ns:
6272         kfree(pctx->spec->pool_ns);
6273         pctx->spec->pool_ns = param->string;
6274         param->string = NULL;
6275         break;
6276     case Opt_compression_hint:
6277         switch (result.uint_32) {
6278         case Opt_compression_hint_none:
6279             opt->alloc_hint_flags &=
6280                 ~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE |
6281                   CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE);
6282             break;
6283         case Opt_compression_hint_compressible:
6284             opt->alloc_hint_flags |=
6285                 CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6286             opt->alloc_hint_flags &=
6287                 ~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6288             break;
6289         case Opt_compression_hint_incompressible:
6290             opt->alloc_hint_flags |=
6291                 CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6292             opt->alloc_hint_flags &=
6293                 ~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6294             break;
6295         default:
6296             BUG();
6297         }
6298         break;
6299     case Opt_read_only:
6300         opt->read_only = true;
6301         break;
6302     case Opt_read_write:
6303         opt->read_only = false;
6304         break;
6305     case Opt_lock_on_read:
6306         opt->lock_on_read = true;
6307         break;
6308     case Opt_exclusive:
6309         opt->exclusive = true;
6310         break;
6311     case Opt_notrim:
6312         opt->trim = false;
6313         break;
6314     default:
6315         BUG();
6316     }
6317 
6318     return 0;
6319 
6320 out_of_range:
6321     return inval_plog(&log, "%s out of range", param->key);
6322 }
6323 
6324 /*
6325  * This duplicates most of generic_parse_monolithic(), untying it from
6326  * fs_context and skipping standard superblock and security options.
6327  */
6328 static int rbd_parse_options(char *options, struct rbd_parse_opts_ctx *pctx)
6329 {
6330     char *key;
6331     int ret = 0;
6332 
6333     dout("%s '%s'\n", __func__, options);
6334     while ((key = strsep(&options, ",")) != NULL) {
6335         if (*key) {
6336             struct fs_parameter param = {
6337                 .key    = key,
6338                 .type   = fs_value_is_flag,
6339             };
6340             char *value = strchr(key, '=');
6341             size_t v_len = 0;
6342 
6343             if (value) {
6344                 if (value == key)
6345                     continue;
6346                 *value++ = 0;
6347                 v_len = strlen(value);
6348                 param.string = kmemdup_nul(value, v_len,
6349                                GFP_KERNEL);
6350                 if (!param.string)
6351                     return -ENOMEM;
6352                 param.type = fs_value_is_string;
6353             }
6354             param.size = v_len;
6355 
6356             ret = rbd_parse_param(&param, pctx);
6357             kfree(param.string);
6358             if (ret)
6359                 break;
6360         }
6361     }
6362 
6363     return ret;
6364 }
6365 
6366 /*
6367  * Parse the options provided for an "rbd add" (i.e., rbd image
6368  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
6369  * and the data written is passed here via a NUL-terminated buffer.
6370  * Returns 0 if successful or an error code otherwise.
6371  *
6372  * The information extracted from these options is recorded in
6373  * the other parameters which return dynamically-allocated
6374  * structures:
6375  *  ceph_opts
6376  *      The address of a pointer that will refer to a ceph options
6377  *      structure.  Caller must release the returned pointer using
6378  *      ceph_destroy_options() when it is no longer needed.
6379  *  rbd_opts
6380  *  Address of an rbd options pointer.  Fully initialized by
6381  *  this function; caller must release with kfree().
6382  *  spec
6383  *  Address of an rbd image specification pointer.  Fully
6384  *  initialized by this function based on parsed options.
6385  *  Caller must release with rbd_spec_put().
6386  *
6387  * The options passed take this form:
6388  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
6389  * where:
6390  *  <mon_addrs>
6391  *      A comma-separated list of one or more monitor addresses.
6392  *      A monitor address is an ip address, optionally followed
6393  *      by a port number (separated by a colon).
6394  *        I.e.:  ip1[:port1][,ip2[:port2]...]
6395  *  <options>
6396  *      A comma-separated list of ceph and/or rbd options.
6397  *  <pool_name>
6398  *      The name of the rados pool containing the rbd image.
6399  *  <image_name>
6400  *      The name of the image in that pool to map.
6401  *  <snap_id>
6402  *      An optional snapshot id.  If provided, the mapping will
6403  *      present data from the image at the time that snapshot was
6404  *      created.  The image head is used if no snapshot id is
6405  *      provided.  Snapshot mappings are always read-only.
6406  */
6407 static int rbd_add_parse_args(const char *buf,
6408                 struct ceph_options **ceph_opts,
6409                 struct rbd_options **opts,
6410                 struct rbd_spec **rbd_spec)
6411 {
6412     size_t len;
6413     char *options;
6414     const char *mon_addrs;
6415     char *snap_name;
6416     size_t mon_addrs_size;
6417     struct rbd_parse_opts_ctx pctx = { 0 };
6418     int ret;
6419 
6420     /* The first four tokens are required */
6421 
6422     len = next_token(&buf);
6423     if (!len) {
6424         rbd_warn(NULL, "no monitor address(es) provided");
6425         return -EINVAL;
6426     }
6427     mon_addrs = buf;
6428     mon_addrs_size = len;
6429     buf += len;
6430 
6431     ret = -EINVAL;
6432     options = dup_token(&buf, NULL);
6433     if (!options)
6434         return -ENOMEM;
6435     if (!*options) {
6436         rbd_warn(NULL, "no options provided");
6437         goto out_err;
6438     }
6439 
6440     pctx.spec = rbd_spec_alloc();
6441     if (!pctx.spec)
6442         goto out_mem;
6443 
6444     pctx.spec->pool_name = dup_token(&buf, NULL);
6445     if (!pctx.spec->pool_name)
6446         goto out_mem;
6447     if (!*pctx.spec->pool_name) {
6448         rbd_warn(NULL, "no pool name provided");
6449         goto out_err;
6450     }
6451 
6452     pctx.spec->image_name = dup_token(&buf, NULL);
6453     if (!pctx.spec->image_name)
6454         goto out_mem;
6455     if (!*pctx.spec->image_name) {
6456         rbd_warn(NULL, "no image name provided");
6457         goto out_err;
6458     }
6459 
6460     /*
6461      * Snapshot name is optional; default is to use "-"
6462      * (indicating the head/no snapshot).
6463      */
6464     len = next_token(&buf);
6465     if (!len) {
6466         buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
6467         len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
6468     } else if (len > RBD_MAX_SNAP_NAME_LEN) {
6469         ret = -ENAMETOOLONG;
6470         goto out_err;
6471     }
6472     snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
6473     if (!snap_name)
6474         goto out_mem;
6475     *(snap_name + len) = '\0';
6476     pctx.spec->snap_name = snap_name;
6477 
6478     pctx.copts = ceph_alloc_options();
6479     if (!pctx.copts)
6480         goto out_mem;
6481 
6482     /* Initialize all rbd options to the defaults */
6483 
6484     pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
6485     if (!pctx.opts)
6486         goto out_mem;
6487 
6488     pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
6489     pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6490     pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
6491     pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
6492     pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
6493     pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
6494     pctx.opts->trim = RBD_TRIM_DEFAULT;
6495 
6496     ret = ceph_parse_mon_ips(mon_addrs, mon_addrs_size, pctx.copts, NULL,
6497                  ',');
6498     if (ret)
6499         goto out_err;
6500 
6501     ret = rbd_parse_options(options, &pctx);
6502     if (ret)
6503         goto out_err;
6504 
6505     *ceph_opts = pctx.copts;
6506     *opts = pctx.opts;
6507     *rbd_spec = pctx.spec;
6508     kfree(options);
6509     return 0;
6510 
6511 out_mem:
6512     ret = -ENOMEM;
6513 out_err:
6514     kfree(pctx.opts);
6515     ceph_destroy_options(pctx.copts);
6516     rbd_spec_put(pctx.spec);
6517     kfree(options);
6518     return ret;
6519 }
6520 
6521 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
6522 {
6523     down_write(&rbd_dev->lock_rwsem);
6524     if (__rbd_is_lock_owner(rbd_dev))
6525         __rbd_release_lock(rbd_dev);
6526     up_write(&rbd_dev->lock_rwsem);
6527 }
6528 
6529 /*
6530  * If the wait is interrupted, an error is returned even if the lock
6531  * was successfully acquired.  rbd_dev_image_unlock() will release it
6532  * if needed.
6533  */
6534 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
6535 {
6536     long ret;
6537 
6538     if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6539         if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6540             return 0;
6541 
6542         rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
6543         return -EINVAL;
6544     }
6545 
6546     if (rbd_is_ro(rbd_dev))
6547         return 0;
6548 
6549     rbd_assert(!rbd_is_lock_owner(rbd_dev));
6550     queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6551     ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6552                 ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6553     if (ret > 0) {
6554         ret = rbd_dev->acquire_err;
6555     } else {
6556         cancel_delayed_work_sync(&rbd_dev->lock_dwork);
6557         if (!ret)
6558             ret = -ETIMEDOUT;
6559     }
6560 
6561     if (ret) {
6562         rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
6563         return ret;
6564     }
6565 
6566     /*
6567      * The lock may have been released by now, unless automatic lock
6568      * transitions are disabled.
6569      */
6570     rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
6571     return 0;
6572 }
6573 
6574 /*
6575  * An rbd format 2 image has a unique identifier, distinct from the
6576  * name given to it by the user.  Internally, that identifier is
6577  * what's used to specify the names of objects related to the image.
6578  *
6579  * A special "rbd id" object is used to map an rbd image name to its
6580  * id.  If that object doesn't exist, then there is no v2 rbd image
6581  * with the supplied name.
6582  *
6583  * This function will record the given rbd_dev's image_id field if
6584  * it can be determined, and in that case will return 0.  If any
6585  * errors occur a negative errno will be returned and the rbd_dev's
6586  * image_id field will be unchanged (and should be NULL).
6587  */
6588 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
6589 {
6590     int ret;
6591     size_t size;
6592     CEPH_DEFINE_OID_ONSTACK(oid);
6593     void *response;
6594     char *image_id;
6595 
6596     /*
6597      * When probing a parent image, the image id is already
6598      * known (and the image name likely is not).  There's no
6599      * need to fetch the image id again in this case.  We
6600      * do still need to set the image format though.
6601      */
6602     if (rbd_dev->spec->image_id) {
6603         rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
6604 
6605         return 0;
6606     }
6607 
6608     /*
6609      * First, see if the format 2 image id file exists, and if
6610      * so, get the image's persistent id from it.
6611      */
6612     ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
6613                    rbd_dev->spec->image_name);
6614     if (ret)
6615         return ret;
6616 
6617     dout("rbd id object name is %s\n", oid.name);
6618 
6619     /* Response will be an encoded string, which includes a length */
6620     size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
6621     response = kzalloc(size, GFP_NOIO);
6622     if (!response) {
6623         ret = -ENOMEM;
6624         goto out;
6625     }
6626 
6627     /* If it doesn't exist we'll assume it's a format 1 image */
6628 
6629     ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6630                   "get_id", NULL, 0,
6631                   response, size);
6632     dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6633     if (ret == -ENOENT) {
6634         image_id = kstrdup("", GFP_KERNEL);
6635         ret = image_id ? 0 : -ENOMEM;
6636         if (!ret)
6637             rbd_dev->image_format = 1;
6638     } else if (ret >= 0) {
6639         void *p = response;
6640 
6641         image_id = ceph_extract_encoded_string(&p, p + ret,
6642                         NULL, GFP_NOIO);
6643         ret = PTR_ERR_OR_ZERO(image_id);
6644         if (!ret)
6645             rbd_dev->image_format = 2;
6646     }
6647 
6648     if (!ret) {
6649         rbd_dev->spec->image_id = image_id;
6650         dout("image_id is %s\n", image_id);
6651     }
6652 out:
6653     kfree(response);
6654     ceph_oid_destroy(&oid);
6655     return ret;
6656 }
6657 
6658 /*
6659  * Undo whatever state changes are made by v1 or v2 header info
6660  * call.
6661  */
6662 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6663 {
6664     struct rbd_image_header *header;
6665 
6666     rbd_dev_parent_put(rbd_dev);
6667     rbd_object_map_free(rbd_dev);
6668     rbd_dev_mapping_clear(rbd_dev);
6669 
6670     /* Free dynamic fields from the header, then zero it out */
6671 
6672     header = &rbd_dev->header;
6673     ceph_put_snap_context(header->snapc);
6674     kfree(header->snap_sizes);
6675     kfree(header->snap_names);
6676     kfree(header->object_prefix);
6677     memset(header, 0, sizeof (*header));
6678 }
6679 
6680 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6681 {
6682     int ret;
6683 
6684     ret = rbd_dev_v2_object_prefix(rbd_dev);
6685     if (ret)
6686         goto out_err;
6687 
6688     /*
6689      * Get the and check features for the image.  Currently the
6690      * features are assumed to never change.
6691      */
6692     ret = rbd_dev_v2_features(rbd_dev);
6693     if (ret)
6694         goto out_err;
6695 
6696     /* If the image supports fancy striping, get its parameters */
6697 
6698     if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
6699         ret = rbd_dev_v2_striping_info(rbd_dev);
6700         if (ret < 0)
6701             goto out_err;
6702     }
6703 
6704     if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
6705         ret = rbd_dev_v2_data_pool(rbd_dev);
6706         if (ret)
6707             goto out_err;
6708     }
6709 
6710     rbd_init_layout(rbd_dev);
6711     return 0;
6712 
6713 out_err:
6714     rbd_dev->header.features = 0;
6715     kfree(rbd_dev->header.object_prefix);
6716     rbd_dev->header.object_prefix = NULL;
6717     return ret;
6718 }
6719 
6720 /*
6721  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
6722  * rbd_dev_image_probe() recursion depth, which means it's also the
6723  * length of the already discovered part of the parent chain.
6724  */
6725 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6726 {
6727     struct rbd_device *parent = NULL;
6728     int ret;
6729 
6730     if (!rbd_dev->parent_spec)
6731         return 0;
6732 
6733     if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6734         pr_info("parent chain is too long (%d)\n", depth);
6735         ret = -EINVAL;
6736         goto out_err;
6737     }
6738 
6739     parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
6740     if (!parent) {
6741         ret = -ENOMEM;
6742         goto out_err;
6743     }
6744 
6745     /*
6746      * Images related by parent/child relationships always share
6747      * rbd_client and spec/parent_spec, so bump their refcounts.
6748      */
6749     __rbd_get_client(rbd_dev->rbd_client);
6750     rbd_spec_get(rbd_dev->parent_spec);
6751 
6752     __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
6753 
6754     ret = rbd_dev_image_probe(parent, depth);
6755     if (ret < 0)
6756         goto out_err;
6757 
6758     rbd_dev->parent = parent;
6759     atomic_set(&rbd_dev->parent_ref, 1);
6760     return 0;
6761 
6762 out_err:
6763     rbd_dev_unparent(rbd_dev);
6764     rbd_dev_destroy(parent);
6765     return ret;
6766 }
6767 
6768 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6769 {
6770     clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6771     rbd_free_disk(rbd_dev);
6772     if (!single_major)
6773         unregister_blkdev(rbd_dev->major, rbd_dev->name);
6774 }
6775 
6776 /*
6777  * rbd_dev->header_rwsem must be locked for write and will be unlocked
6778  * upon return.
6779  */
6780 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6781 {
6782     int ret;
6783 
6784     /* Record our major and minor device numbers. */
6785 
6786     if (!single_major) {
6787         ret = register_blkdev(0, rbd_dev->name);
6788         if (ret < 0)
6789             goto err_out_unlock;
6790 
6791         rbd_dev->major = ret;
6792         rbd_dev->minor = 0;
6793     } else {
6794         rbd_dev->major = rbd_major;
6795         rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6796     }
6797 
6798     /* Set up the blkdev mapping. */
6799 
6800     ret = rbd_init_disk(rbd_dev);
6801     if (ret)
6802         goto err_out_blkdev;
6803 
6804     set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6805     set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev));
6806 
6807     ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6808     if (ret)
6809         goto err_out_disk;
6810 
6811     set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6812     up_write(&rbd_dev->header_rwsem);
6813     return 0;
6814 
6815 err_out_disk:
6816     rbd_free_disk(rbd_dev);
6817 err_out_blkdev:
6818     if (!single_major)
6819         unregister_blkdev(rbd_dev->major, rbd_dev->name);
6820 err_out_unlock:
6821     up_write(&rbd_dev->header_rwsem);
6822     return ret;
6823 }
6824 
6825 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6826 {
6827     struct rbd_spec *spec = rbd_dev->spec;
6828     int ret;
6829 
6830     /* Record the header object name for this rbd image. */
6831 
6832     rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6833     if (rbd_dev->image_format == 1)
6834         ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6835                        spec->image_name, RBD_SUFFIX);
6836     else
6837         ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6838                        RBD_HEADER_PREFIX, spec->image_id);
6839 
6840     return ret;
6841 }
6842 
6843 static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap)
6844 {
6845     if (!is_snap) {
6846         pr_info("image %s/%s%s%s does not exist\n",
6847             rbd_dev->spec->pool_name,
6848             rbd_dev->spec->pool_ns ?: "",
6849             rbd_dev->spec->pool_ns ? "/" : "",
6850             rbd_dev->spec->image_name);
6851     } else {
6852         pr_info("snap %s/%s%s%s@%s does not exist\n",
6853             rbd_dev->spec->pool_name,
6854             rbd_dev->spec->pool_ns ?: "",
6855             rbd_dev->spec->pool_ns ? "/" : "",
6856             rbd_dev->spec->image_name,
6857             rbd_dev->spec->snap_name);
6858     }
6859 }
6860 
6861 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6862 {
6863     if (!rbd_is_ro(rbd_dev))
6864         rbd_unregister_watch(rbd_dev);
6865 
6866     rbd_dev_unprobe(rbd_dev);
6867     rbd_dev->image_format = 0;
6868     kfree(rbd_dev->spec->image_id);
6869     rbd_dev->spec->image_id = NULL;
6870 }
6871 
6872 /*
6873  * Probe for the existence of the header object for the given rbd
6874  * device.  If this image is the one being mapped (i.e., not a
6875  * parent), initiate a watch on its header object before using that
6876  * object to get detailed information about the rbd image.
6877  *
6878  * On success, returns with header_rwsem held for write if called
6879  * with @depth == 0.
6880  */
6881 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6882 {
6883     bool need_watch = !rbd_is_ro(rbd_dev);
6884     int ret;
6885 
6886     /*
6887      * Get the id from the image id object.  Unless there's an
6888      * error, rbd_dev->spec->image_id will be filled in with
6889      * a dynamically-allocated string, and rbd_dev->image_format
6890      * will be set to either 1 or 2.
6891      */
6892     ret = rbd_dev_image_id(rbd_dev);
6893     if (ret)
6894         return ret;
6895 
6896     ret = rbd_dev_header_name(rbd_dev);
6897     if (ret)
6898         goto err_out_format;
6899 
6900     if (need_watch) {
6901         ret = rbd_register_watch(rbd_dev);
6902         if (ret) {
6903             if (ret == -ENOENT)
6904                 rbd_print_dne(rbd_dev, false);
6905             goto err_out_format;
6906         }
6907     }
6908 
6909     if (!depth)
6910         down_write(&rbd_dev->header_rwsem);
6911 
6912     ret = rbd_dev_header_info(rbd_dev);
6913     if (ret) {
6914         if (ret == -ENOENT && !need_watch)
6915             rbd_print_dne(rbd_dev, false);
6916         goto err_out_probe;
6917     }
6918 
6919     /*
6920      * If this image is the one being mapped, we have pool name and
6921      * id, image name and id, and snap name - need to fill snap id.
6922      * Otherwise this is a parent image, identified by pool, image
6923      * and snap ids - need to fill in names for those ids.
6924      */
6925     if (!depth)
6926         ret = rbd_spec_fill_snap_id(rbd_dev);
6927     else
6928         ret = rbd_spec_fill_names(rbd_dev);
6929     if (ret) {
6930         if (ret == -ENOENT)
6931             rbd_print_dne(rbd_dev, true);
6932         goto err_out_probe;
6933     }
6934 
6935     ret = rbd_dev_mapping_set(rbd_dev);
6936     if (ret)
6937         goto err_out_probe;
6938 
6939     if (rbd_is_snap(rbd_dev) &&
6940         (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
6941         ret = rbd_object_map_load(rbd_dev);
6942         if (ret)
6943             goto err_out_probe;
6944     }
6945 
6946     if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6947         ret = rbd_dev_v2_parent_info(rbd_dev);
6948         if (ret)
6949             goto err_out_probe;
6950     }
6951 
6952     ret = rbd_dev_probe_parent(rbd_dev, depth);
6953     if (ret)
6954         goto err_out_probe;
6955 
6956     dout("discovered format %u image, header name is %s\n",
6957         rbd_dev->image_format, rbd_dev->header_oid.name);
6958     return 0;
6959 
6960 err_out_probe:
6961     if (!depth)
6962         up_write(&rbd_dev->header_rwsem);
6963     if (need_watch)
6964         rbd_unregister_watch(rbd_dev);
6965     rbd_dev_unprobe(rbd_dev);
6966 err_out_format:
6967     rbd_dev->image_format = 0;
6968     kfree(rbd_dev->spec->image_id);
6969     rbd_dev->spec->image_id = NULL;
6970     return ret;
6971 }
6972 
6973 static ssize_t do_rbd_add(struct bus_type *bus,
6974               const char *buf,
6975               size_t count)
6976 {
6977     struct rbd_device *rbd_dev = NULL;
6978     struct ceph_options *ceph_opts = NULL;
6979     struct rbd_options *rbd_opts = NULL;
6980     struct rbd_spec *spec = NULL;
6981     struct rbd_client *rbdc;
6982     int rc;
6983 
6984     if (!capable(CAP_SYS_ADMIN))
6985         return -EPERM;
6986 
6987     if (!try_module_get(THIS_MODULE))
6988         return -ENODEV;
6989 
6990     /* parse add command */
6991     rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6992     if (rc < 0)
6993         goto out;
6994 
6995     rbdc = rbd_get_client(ceph_opts);
6996     if (IS_ERR(rbdc)) {
6997         rc = PTR_ERR(rbdc);
6998         goto err_out_args;
6999     }
7000 
7001     /* pick the pool */
7002     rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
7003     if (rc < 0) {
7004         if (rc == -ENOENT)
7005             pr_info("pool %s does not exist\n", spec->pool_name);
7006         goto err_out_client;
7007     }
7008     spec->pool_id = (u64)rc;
7009 
7010     rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
7011     if (!rbd_dev) {
7012         rc = -ENOMEM;
7013         goto err_out_client;
7014     }
7015     rbdc = NULL;        /* rbd_dev now owns this */
7016     spec = NULL;        /* rbd_dev now owns this */
7017     rbd_opts = NULL;    /* rbd_dev now owns this */
7018 
7019     /* if we are mapping a snapshot it will be a read-only mapping */
7020     if (rbd_dev->opts->read_only ||
7021         strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
7022         __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
7023 
7024     rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
7025     if (!rbd_dev->config_info) {
7026         rc = -ENOMEM;
7027         goto err_out_rbd_dev;
7028     }
7029 
7030     rc = rbd_dev_image_probe(rbd_dev, 0);
7031     if (rc < 0)
7032         goto err_out_rbd_dev;
7033 
7034     if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7035         rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7036              rbd_dev->layout.object_size);
7037         rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7038     }
7039 
7040     rc = rbd_dev_device_setup(rbd_dev);
7041     if (rc)
7042         goto err_out_image_probe;
7043 
7044     rc = rbd_add_acquire_lock(rbd_dev);
7045     if (rc)
7046         goto err_out_image_lock;
7047 
7048     /* Everything's ready.  Announce the disk to the world. */
7049 
7050     rc = device_add(&rbd_dev->dev);
7051     if (rc)
7052         goto err_out_image_lock;
7053 
7054     rc = device_add_disk(&rbd_dev->dev, rbd_dev->disk, NULL);
7055     if (rc)
7056         goto err_out_cleanup_disk;
7057 
7058     spin_lock(&rbd_dev_list_lock);
7059     list_add_tail(&rbd_dev->node, &rbd_dev_list);
7060     spin_unlock(&rbd_dev_list_lock);
7061 
7062     pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
7063         (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
7064         rbd_dev->header.features);
7065     rc = count;
7066 out:
7067     module_put(THIS_MODULE);
7068     return rc;
7069 
7070 err_out_cleanup_disk:
7071     rbd_free_disk(rbd_dev);
7072 err_out_image_lock:
7073     rbd_dev_image_unlock(rbd_dev);
7074     rbd_dev_device_release(rbd_dev);
7075 err_out_image_probe:
7076     rbd_dev_image_release(rbd_dev);
7077 err_out_rbd_dev:
7078     rbd_dev_destroy(rbd_dev);
7079 err_out_client:
7080     rbd_put_client(rbdc);
7081 err_out_args:
7082     rbd_spec_put(spec);
7083     kfree(rbd_opts);
7084     goto out;
7085 }
7086 
7087 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count)
7088 {
7089     if (single_major)
7090         return -EINVAL;
7091 
7092     return do_rbd_add(bus, buf, count);
7093 }
7094 
7095 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
7096                       size_t count)
7097 {
7098     return do_rbd_add(bus, buf, count);
7099 }
7100 
7101 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
7102 {
7103     while (rbd_dev->parent) {
7104         struct rbd_device *first = rbd_dev;
7105         struct rbd_device *second = first->parent;
7106         struct rbd_device *third;
7107 
7108         /*
7109          * Follow to the parent with no grandparent and
7110          * remove it.
7111          */
7112         while (second && (third = second->parent)) {
7113             first = second;
7114             second = third;
7115         }
7116         rbd_assert(second);
7117         rbd_dev_image_release(second);
7118         rbd_dev_destroy(second);
7119         first->parent = NULL;
7120         first->parent_overlap = 0;
7121 
7122         rbd_assert(first->parent_spec);
7123         rbd_spec_put(first->parent_spec);
7124         first->parent_spec = NULL;
7125     }
7126 }
7127 
7128 static ssize_t do_rbd_remove(struct bus_type *bus,
7129                  const char *buf,
7130                  size_t count)
7131 {
7132     struct rbd_device *rbd_dev = NULL;
7133     struct list_head *tmp;
7134     int dev_id;
7135     char opt_buf[6];
7136     bool force = false;
7137     int ret;
7138 
7139     if (!capable(CAP_SYS_ADMIN))
7140         return -EPERM;
7141 
7142     dev_id = -1;
7143     opt_buf[0] = '\0';
7144     sscanf(buf, "%d %5s", &dev_id, opt_buf);
7145     if (dev_id < 0) {
7146         pr_err("dev_id out of range\n");
7147         return -EINVAL;
7148     }
7149     if (opt_buf[0] != '\0') {
7150         if (!strcmp(opt_buf, "force")) {
7151             force = true;
7152         } else {
7153             pr_err("bad remove option at '%s'\n", opt_buf);
7154             return -EINVAL;
7155         }
7156     }
7157 
7158     ret = -ENOENT;
7159     spin_lock(&rbd_dev_list_lock);
7160     list_for_each(tmp, &rbd_dev_list) {
7161         rbd_dev = list_entry(tmp, struct rbd_device, node);
7162         if (rbd_dev->dev_id == dev_id) {
7163             ret = 0;
7164             break;
7165         }
7166     }
7167     if (!ret) {
7168         spin_lock_irq(&rbd_dev->lock);
7169         if (rbd_dev->open_count && !force)
7170             ret = -EBUSY;
7171         else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
7172                       &rbd_dev->flags))
7173             ret = -EINPROGRESS;
7174         spin_unlock_irq(&rbd_dev->lock);
7175     }
7176     spin_unlock(&rbd_dev_list_lock);
7177     if (ret)
7178         return ret;
7179 
7180     if (force) {
7181         /*
7182          * Prevent new IO from being queued and wait for existing
7183          * IO to complete/fail.
7184          */
7185         blk_mq_freeze_queue(rbd_dev->disk->queue);
7186         blk_mark_disk_dead(rbd_dev->disk);
7187     }
7188 
7189     del_gendisk(rbd_dev->disk);
7190     spin_lock(&rbd_dev_list_lock);
7191     list_del_init(&rbd_dev->node);
7192     spin_unlock(&rbd_dev_list_lock);
7193     device_del(&rbd_dev->dev);
7194 
7195     rbd_dev_image_unlock(rbd_dev);
7196     rbd_dev_device_release(rbd_dev);
7197     rbd_dev_image_release(rbd_dev);
7198     rbd_dev_destroy(rbd_dev);
7199     return count;
7200 }
7201 
7202 static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count)
7203 {
7204     if (single_major)
7205         return -EINVAL;
7206 
7207     return do_rbd_remove(bus, buf, count);
7208 }
7209 
7210 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
7211                      size_t count)
7212 {
7213     return do_rbd_remove(bus, buf, count);
7214 }
7215 
7216 /*
7217  * create control files in sysfs
7218  * /sys/bus/rbd/...
7219  */
7220 static int __init rbd_sysfs_init(void)
7221 {
7222     int ret;
7223 
7224     ret = device_register(&rbd_root_dev);
7225     if (ret < 0)
7226         return ret;
7227 
7228     ret = bus_register(&rbd_bus_type);
7229     if (ret < 0)
7230         device_unregister(&rbd_root_dev);
7231 
7232     return ret;
7233 }
7234 
7235 static void __exit rbd_sysfs_cleanup(void)
7236 {
7237     bus_unregister(&rbd_bus_type);
7238     device_unregister(&rbd_root_dev);
7239 }
7240 
7241 static int __init rbd_slab_init(void)
7242 {
7243     rbd_assert(!rbd_img_request_cache);
7244     rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
7245     if (!rbd_img_request_cache)
7246         return -ENOMEM;
7247 
7248     rbd_assert(!rbd_obj_request_cache);
7249     rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
7250     if (!rbd_obj_request_cache)
7251         goto out_err;
7252 
7253     return 0;
7254 
7255 out_err:
7256     kmem_cache_destroy(rbd_img_request_cache);
7257     rbd_img_request_cache = NULL;
7258     return -ENOMEM;
7259 }
7260 
7261 static void rbd_slab_exit(void)
7262 {
7263     rbd_assert(rbd_obj_request_cache);
7264     kmem_cache_destroy(rbd_obj_request_cache);
7265     rbd_obj_request_cache = NULL;
7266 
7267     rbd_assert(rbd_img_request_cache);
7268     kmem_cache_destroy(rbd_img_request_cache);
7269     rbd_img_request_cache = NULL;
7270 }
7271 
7272 static int __init rbd_init(void)
7273 {
7274     int rc;
7275 
7276     if (!libceph_compatible(NULL)) {
7277         rbd_warn(NULL, "libceph incompatibility (quitting)");
7278         return -EINVAL;
7279     }
7280 
7281     rc = rbd_slab_init();
7282     if (rc)
7283         return rc;
7284 
7285     /*
7286      * The number of active work items is limited by the number of
7287      * rbd devices * queue depth, so leave @max_active at default.
7288      */
7289     rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
7290     if (!rbd_wq) {
7291         rc = -ENOMEM;
7292         goto err_out_slab;
7293     }
7294 
7295     if (single_major) {
7296         rbd_major = register_blkdev(0, RBD_DRV_NAME);
7297         if (rbd_major < 0) {
7298             rc = rbd_major;
7299             goto err_out_wq;
7300         }
7301     }
7302 
7303     rc = rbd_sysfs_init();
7304     if (rc)
7305         goto err_out_blkdev;
7306 
7307     if (single_major)
7308         pr_info("loaded (major %d)\n", rbd_major);
7309     else
7310         pr_info("loaded\n");
7311 
7312     return 0;
7313 
7314 err_out_blkdev:
7315     if (single_major)
7316         unregister_blkdev(rbd_major, RBD_DRV_NAME);
7317 err_out_wq:
7318     destroy_workqueue(rbd_wq);
7319 err_out_slab:
7320     rbd_slab_exit();
7321     return rc;
7322 }
7323 
7324 static void __exit rbd_exit(void)
7325 {
7326     ida_destroy(&rbd_dev_id_ida);
7327     rbd_sysfs_cleanup();
7328     if (single_major)
7329         unregister_blkdev(rbd_major, RBD_DRV_NAME);
7330     destroy_workqueue(rbd_wq);
7331     rbd_slab_exit();
7332 }
7333 
7334 module_init(rbd_init);
7335 module_exit(rbd_exit);
7336 
7337 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
7338 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
7339 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
7340 /* following authorship retained from original osdblk.c */
7341 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
7342 
7343 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
7344 MODULE_LICENSE("GPL");