Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-or-later
0002 /*
0003  * Copyright (C) 2015, SUSE
0004  */
0005 
0006 
0007 #include <linux/module.h>
0008 #include <linux/kthread.h>
0009 #include <linux/dlm.h>
0010 #include <linux/sched.h>
0011 #include <linux/raid/md_p.h>
0012 #include "md.h"
0013 #include "md-bitmap.h"
0014 #include "md-cluster.h"
0015 
0016 #define LVB_SIZE    64
0017 #define NEW_DEV_TIMEOUT 5000
0018 
0019 struct dlm_lock_resource {
0020     dlm_lockspace_t *ls;
0021     struct dlm_lksb lksb;
0022     char *name; /* lock name. */
0023     uint32_t flags; /* flags to pass to dlm_lock() */
0024     wait_queue_head_t sync_locking; /* wait queue for synchronized locking */
0025     bool sync_locking_done;
0026     void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
0027     struct mddev *mddev; /* pointing back to mddev. */
0028     int mode;
0029 };
0030 
0031 struct resync_info {
0032     __le64 lo;
0033     __le64 hi;
0034 };
0035 
0036 /* md_cluster_info flags */
0037 #define     MD_CLUSTER_WAITING_FOR_NEWDISK      1
0038 #define     MD_CLUSTER_SUSPEND_READ_BALANCING   2
0039 #define     MD_CLUSTER_BEGIN_JOIN_CLUSTER       3
0040 
0041 /* Lock the send communication. This is done through
0042  * bit manipulation as opposed to a mutex in order to
0043  * accommodate lock and hold. See next comment.
0044  */
0045 #define     MD_CLUSTER_SEND_LOCK            4
0046 /* If cluster operations (such as adding a disk) must lock the
0047  * communication channel, so as to perform extra operations
0048  * (update metadata) and no other operation is allowed on the
0049  * MD. Token needs to be locked and held until the operation
0050  * completes witha md_update_sb(), which would eventually release
0051  * the lock.
0052  */
0053 #define     MD_CLUSTER_SEND_LOCKED_ALREADY      5
0054 /* We should receive message after node joined cluster and
0055  * set up all the related infos such as bitmap and personality */
0056 #define     MD_CLUSTER_ALREADY_IN_CLUSTER       6
0057 #define     MD_CLUSTER_PENDING_RECV_EVENT       7
0058 #define     MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD      8
0059 
0060 struct md_cluster_info {
0061     struct mddev *mddev; /* the md device which md_cluster_info belongs to */
0062     /* dlm lock space and resources for clustered raid. */
0063     dlm_lockspace_t *lockspace;
0064     int slot_number;
0065     struct completion completion;
0066     struct mutex recv_mutex;
0067     struct dlm_lock_resource *bitmap_lockres;
0068     struct dlm_lock_resource **other_bitmap_lockres;
0069     struct dlm_lock_resource *resync_lockres;
0070     struct list_head suspend_list;
0071 
0072     spinlock_t suspend_lock;
0073     /* record the region which write should be suspended */
0074     sector_t suspend_lo;
0075     sector_t suspend_hi;
0076     int suspend_from; /* the slot which broadcast suspend_lo/hi */
0077 
0078     struct md_thread *recovery_thread;
0079     unsigned long recovery_map;
0080     /* communication loc resources */
0081     struct dlm_lock_resource *ack_lockres;
0082     struct dlm_lock_resource *message_lockres;
0083     struct dlm_lock_resource *token_lockres;
0084     struct dlm_lock_resource *no_new_dev_lockres;
0085     struct md_thread *recv_thread;
0086     struct completion newdisk_completion;
0087     wait_queue_head_t wait;
0088     unsigned long state;
0089     /* record the region in RESYNCING message */
0090     sector_t sync_low;
0091     sector_t sync_hi;
0092 };
0093 
0094 enum msg_type {
0095     METADATA_UPDATED = 0,
0096     RESYNCING,
0097     NEWDISK,
0098     REMOVE,
0099     RE_ADD,
0100     BITMAP_NEEDS_SYNC,
0101     CHANGE_CAPACITY,
0102     BITMAP_RESIZE,
0103 };
0104 
0105 struct cluster_msg {
0106     __le32 type;
0107     __le32 slot;
0108     /* TODO: Unionize this for smaller footprint */
0109     __le64 low;
0110     __le64 high;
0111     char uuid[16];
0112     __le32 raid_slot;
0113 };
0114 
0115 static void sync_ast(void *arg)
0116 {
0117     struct dlm_lock_resource *res;
0118 
0119     res = arg;
0120     res->sync_locking_done = true;
0121     wake_up(&res->sync_locking);
0122 }
0123 
0124 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
0125 {
0126     int ret = 0;
0127 
0128     ret = dlm_lock(res->ls, mode, &res->lksb,
0129             res->flags, res->name, strlen(res->name),
0130             0, sync_ast, res, res->bast);
0131     if (ret)
0132         return ret;
0133     wait_event(res->sync_locking, res->sync_locking_done);
0134     res->sync_locking_done = false;
0135     if (res->lksb.sb_status == 0)
0136         res->mode = mode;
0137     return res->lksb.sb_status;
0138 }
0139 
0140 static int dlm_unlock_sync(struct dlm_lock_resource *res)
0141 {
0142     return dlm_lock_sync(res, DLM_LOCK_NL);
0143 }
0144 
0145 /*
0146  * An variation of dlm_lock_sync, which make lock request could
0147  * be interrupted
0148  */
0149 static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode,
0150                        struct mddev *mddev)
0151 {
0152     int ret = 0;
0153 
0154     ret = dlm_lock(res->ls, mode, &res->lksb,
0155             res->flags, res->name, strlen(res->name),
0156             0, sync_ast, res, res->bast);
0157     if (ret)
0158         return ret;
0159 
0160     wait_event(res->sync_locking, res->sync_locking_done
0161                       || kthread_should_stop()
0162                       || test_bit(MD_CLOSING, &mddev->flags));
0163     if (!res->sync_locking_done) {
0164         /*
0165          * the convert queue contains the lock request when request is
0166          * interrupted, and sync_ast could still be run, so need to
0167          * cancel the request and reset completion
0168          */
0169         ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL,
0170             &res->lksb, res);
0171         res->sync_locking_done = false;
0172         if (unlikely(ret != 0))
0173             pr_info("failed to cancel previous lock request "
0174                  "%s return %d\n", res->name, ret);
0175         return -EPERM;
0176     } else
0177         res->sync_locking_done = false;
0178     if (res->lksb.sb_status == 0)
0179         res->mode = mode;
0180     return res->lksb.sb_status;
0181 }
0182 
0183 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
0184         char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
0185 {
0186     struct dlm_lock_resource *res = NULL;
0187     int ret, namelen;
0188     struct md_cluster_info *cinfo = mddev->cluster_info;
0189 
0190     res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
0191     if (!res)
0192         return NULL;
0193     init_waitqueue_head(&res->sync_locking);
0194     res->sync_locking_done = false;
0195     res->ls = cinfo->lockspace;
0196     res->mddev = mddev;
0197     res->mode = DLM_LOCK_IV;
0198     namelen = strlen(name);
0199     res->name = kzalloc(namelen + 1, GFP_KERNEL);
0200     if (!res->name) {
0201         pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
0202         goto out_err;
0203     }
0204     strscpy(res->name, name, namelen + 1);
0205     if (with_lvb) {
0206         res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
0207         if (!res->lksb.sb_lvbptr) {
0208             pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
0209             goto out_err;
0210         }
0211         res->flags = DLM_LKF_VALBLK;
0212     }
0213 
0214     if (bastfn)
0215         res->bast = bastfn;
0216 
0217     res->flags |= DLM_LKF_EXPEDITE;
0218 
0219     ret = dlm_lock_sync(res, DLM_LOCK_NL);
0220     if (ret) {
0221         pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
0222         goto out_err;
0223     }
0224     res->flags &= ~DLM_LKF_EXPEDITE;
0225     res->flags |= DLM_LKF_CONVERT;
0226 
0227     return res;
0228 out_err:
0229     kfree(res->lksb.sb_lvbptr);
0230     kfree(res->name);
0231     kfree(res);
0232     return NULL;
0233 }
0234 
0235 static void lockres_free(struct dlm_lock_resource *res)
0236 {
0237     int ret = 0;
0238 
0239     if (!res)
0240         return;
0241 
0242     /*
0243      * use FORCEUNLOCK flag, so we can unlock even the lock is on the
0244      * waiting or convert queue
0245      */
0246     ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK,
0247         &res->lksb, res);
0248     if (unlikely(ret != 0))
0249         pr_err("failed to unlock %s return %d\n", res->name, ret);
0250     else
0251         wait_event(res->sync_locking, res->sync_locking_done);
0252 
0253     kfree(res->name);
0254     kfree(res->lksb.sb_lvbptr);
0255     kfree(res);
0256 }
0257 
0258 static void add_resync_info(struct dlm_lock_resource *lockres,
0259                 sector_t lo, sector_t hi)
0260 {
0261     struct resync_info *ri;
0262 
0263     ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
0264     ri->lo = cpu_to_le64(lo);
0265     ri->hi = cpu_to_le64(hi);
0266 }
0267 
0268 static int read_resync_info(struct mddev *mddev,
0269                 struct dlm_lock_resource *lockres)
0270 {
0271     struct resync_info ri;
0272     struct md_cluster_info *cinfo = mddev->cluster_info;
0273     int ret = 0;
0274 
0275     dlm_lock_sync(lockres, DLM_LOCK_CR);
0276     memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
0277     if (le64_to_cpu(ri.hi) > 0) {
0278         cinfo->suspend_hi = le64_to_cpu(ri.hi);
0279         cinfo->suspend_lo = le64_to_cpu(ri.lo);
0280         ret = 1;
0281     }
0282     dlm_unlock_sync(lockres);
0283     return ret;
0284 }
0285 
0286 static void recover_bitmaps(struct md_thread *thread)
0287 {
0288     struct mddev *mddev = thread->mddev;
0289     struct md_cluster_info *cinfo = mddev->cluster_info;
0290     struct dlm_lock_resource *bm_lockres;
0291     char str[64];
0292     int slot, ret;
0293     sector_t lo, hi;
0294 
0295     while (cinfo->recovery_map) {
0296         slot = fls64((u64)cinfo->recovery_map) - 1;
0297 
0298         snprintf(str, 64, "bitmap%04d", slot);
0299         bm_lockres = lockres_init(mddev, str, NULL, 1);
0300         if (!bm_lockres) {
0301             pr_err("md-cluster: Cannot initialize bitmaps\n");
0302             goto clear_bit;
0303         }
0304 
0305         ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev);
0306         if (ret) {
0307             pr_err("md-cluster: Could not DLM lock %s: %d\n",
0308                     str, ret);
0309             goto clear_bit;
0310         }
0311         ret = md_bitmap_copy_from_slot(mddev, slot, &lo, &hi, true);
0312         if (ret) {
0313             pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
0314             goto clear_bit;
0315         }
0316 
0317         /* Clear suspend_area associated with the bitmap */
0318         spin_lock_irq(&cinfo->suspend_lock);
0319         cinfo->suspend_hi = 0;
0320         cinfo->suspend_lo = 0;
0321         cinfo->suspend_from = -1;
0322         spin_unlock_irq(&cinfo->suspend_lock);
0323 
0324         /* Kick off a reshape if needed */
0325         if (test_bit(MD_RESYNCING_REMOTE, &mddev->recovery) &&
0326             test_bit(MD_RECOVERY_RESHAPE, &mddev->recovery) &&
0327             mddev->reshape_position != MaxSector)
0328             md_wakeup_thread(mddev->sync_thread);
0329 
0330         if (hi > 0) {
0331             if (lo < mddev->recovery_cp)
0332                 mddev->recovery_cp = lo;
0333             /* wake up thread to continue resync in case resync
0334              * is not finished */
0335             if (mddev->recovery_cp != MaxSector) {
0336                 /*
0337                  * clear the REMOTE flag since we will launch
0338                  * resync thread in current node.
0339                  */
0340                 clear_bit(MD_RESYNCING_REMOTE,
0341                       &mddev->recovery);
0342                 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
0343                 md_wakeup_thread(mddev->thread);
0344             }
0345         }
0346 clear_bit:
0347         lockres_free(bm_lockres);
0348         clear_bit(slot, &cinfo->recovery_map);
0349     }
0350 }
0351 
0352 static void recover_prep(void *arg)
0353 {
0354     struct mddev *mddev = arg;
0355     struct md_cluster_info *cinfo = mddev->cluster_info;
0356     set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
0357 }
0358 
0359 static void __recover_slot(struct mddev *mddev, int slot)
0360 {
0361     struct md_cluster_info *cinfo = mddev->cluster_info;
0362 
0363     set_bit(slot, &cinfo->recovery_map);
0364     if (!cinfo->recovery_thread) {
0365         cinfo->recovery_thread = md_register_thread(recover_bitmaps,
0366                 mddev, "recover");
0367         if (!cinfo->recovery_thread) {
0368             pr_warn("md-cluster: Could not create recovery thread\n");
0369             return;
0370         }
0371     }
0372     md_wakeup_thread(cinfo->recovery_thread);
0373 }
0374 
0375 static void recover_slot(void *arg, struct dlm_slot *slot)
0376 {
0377     struct mddev *mddev = arg;
0378     struct md_cluster_info *cinfo = mddev->cluster_info;
0379 
0380     pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
0381             mddev->bitmap_info.cluster_name,
0382             slot->nodeid, slot->slot,
0383             cinfo->slot_number);
0384     /* deduct one since dlm slot starts from one while the num of
0385      * cluster-md begins with 0 */
0386     __recover_slot(mddev, slot->slot - 1);
0387 }
0388 
0389 static void recover_done(void *arg, struct dlm_slot *slots,
0390         int num_slots, int our_slot,
0391         uint32_t generation)
0392 {
0393     struct mddev *mddev = arg;
0394     struct md_cluster_info *cinfo = mddev->cluster_info;
0395 
0396     cinfo->slot_number = our_slot;
0397     /* completion is only need to be complete when node join cluster,
0398      * it doesn't need to run during another node's failure */
0399     if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
0400         complete(&cinfo->completion);
0401         clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
0402     }
0403     clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
0404 }
0405 
0406 /* the ops is called when node join the cluster, and do lock recovery
0407  * if node failure occurs */
0408 static const struct dlm_lockspace_ops md_ls_ops = {
0409     .recover_prep = recover_prep,
0410     .recover_slot = recover_slot,
0411     .recover_done = recover_done,
0412 };
0413 
0414 /*
0415  * The BAST function for the ack lock resource
0416  * This function wakes up the receive thread in
0417  * order to receive and process the message.
0418  */
0419 static void ack_bast(void *arg, int mode)
0420 {
0421     struct dlm_lock_resource *res = arg;
0422     struct md_cluster_info *cinfo = res->mddev->cluster_info;
0423 
0424     if (mode == DLM_LOCK_EX) {
0425         if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state))
0426             md_wakeup_thread(cinfo->recv_thread);
0427         else
0428             set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state);
0429     }
0430 }
0431 
0432 static void remove_suspend_info(struct mddev *mddev, int slot)
0433 {
0434     struct md_cluster_info *cinfo = mddev->cluster_info;
0435     mddev->pers->quiesce(mddev, 1);
0436     spin_lock_irq(&cinfo->suspend_lock);
0437     cinfo->suspend_hi = 0;
0438     cinfo->suspend_lo = 0;
0439     spin_unlock_irq(&cinfo->suspend_lock);
0440     mddev->pers->quiesce(mddev, 0);
0441 }
0442 
0443 static void process_suspend_info(struct mddev *mddev,
0444         int slot, sector_t lo, sector_t hi)
0445 {
0446     struct md_cluster_info *cinfo = mddev->cluster_info;
0447     struct mdp_superblock_1 *sb = NULL;
0448     struct md_rdev *rdev;
0449 
0450     if (!hi) {
0451         /*
0452          * clear the REMOTE flag since resync or recovery is finished
0453          * in remote node.
0454          */
0455         clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
0456         remove_suspend_info(mddev, slot);
0457         set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
0458         md_wakeup_thread(mddev->thread);
0459         return;
0460     }
0461 
0462     rdev_for_each(rdev, mddev)
0463         if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
0464             sb = page_address(rdev->sb_page);
0465             break;
0466         }
0467 
0468     /*
0469      * The bitmaps are not same for different nodes
0470      * if RESYNCING is happening in one node, then
0471      * the node which received the RESYNCING message
0472      * probably will perform resync with the region
0473      * [lo, hi] again, so we could reduce resync time
0474      * a lot if we can ensure that the bitmaps among
0475      * different nodes are match up well.
0476      *
0477      * sync_low/hi is used to record the region which
0478      * arrived in the previous RESYNCING message,
0479      *
0480      * Call md_bitmap_sync_with_cluster to clear NEEDED_MASK
0481      * and set RESYNC_MASK since  resync thread is running
0482      * in another node, so we don't need to do the resync
0483      * again with the same section.
0484      *
0485      * Skip md_bitmap_sync_with_cluster in case reshape
0486      * happening, because reshaping region is small and
0487      * we don't want to trigger lots of WARN.
0488      */
0489     if (sb && !(le32_to_cpu(sb->feature_map) & MD_FEATURE_RESHAPE_ACTIVE))
0490         md_bitmap_sync_with_cluster(mddev, cinfo->sync_low,
0491                         cinfo->sync_hi, lo, hi);
0492     cinfo->sync_low = lo;
0493     cinfo->sync_hi = hi;
0494 
0495     mddev->pers->quiesce(mddev, 1);
0496     spin_lock_irq(&cinfo->suspend_lock);
0497     cinfo->suspend_from = slot;
0498     cinfo->suspend_lo = lo;
0499     cinfo->suspend_hi = hi;
0500     spin_unlock_irq(&cinfo->suspend_lock);
0501     mddev->pers->quiesce(mddev, 0);
0502 }
0503 
0504 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
0505 {
0506     char disk_uuid[64];
0507     struct md_cluster_info *cinfo = mddev->cluster_info;
0508     char event_name[] = "EVENT=ADD_DEVICE";
0509     char raid_slot[16];
0510     char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
0511     int len;
0512 
0513     len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
0514     sprintf(disk_uuid + len, "%pU", cmsg->uuid);
0515     snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
0516     pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
0517     init_completion(&cinfo->newdisk_completion);
0518     set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
0519     kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
0520     wait_for_completion_timeout(&cinfo->newdisk_completion,
0521             NEW_DEV_TIMEOUT);
0522     clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
0523 }
0524 
0525 
0526 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
0527 {
0528     int got_lock = 0;
0529     struct md_cluster_info *cinfo = mddev->cluster_info;
0530     mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
0531 
0532     dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
0533     wait_event(mddev->thread->wqueue,
0534            (got_lock = mddev_trylock(mddev)) ||
0535             test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state));
0536     md_reload_sb(mddev, mddev->good_device_nr);
0537     if (got_lock)
0538         mddev_unlock(mddev);
0539 }
0540 
0541 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
0542 {
0543     struct md_rdev *rdev;
0544 
0545     rcu_read_lock();
0546     rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
0547     if (rdev) {
0548         set_bit(ClusterRemove, &rdev->flags);
0549         set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
0550         md_wakeup_thread(mddev->thread);
0551     }
0552     else
0553         pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
0554             __func__, __LINE__, le32_to_cpu(msg->raid_slot));
0555     rcu_read_unlock();
0556 }
0557 
0558 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
0559 {
0560     struct md_rdev *rdev;
0561 
0562     rcu_read_lock();
0563     rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
0564     if (rdev && test_bit(Faulty, &rdev->flags))
0565         clear_bit(Faulty, &rdev->flags);
0566     else
0567         pr_warn("%s: %d Could not find disk(%d) which is faulty",
0568             __func__, __LINE__, le32_to_cpu(msg->raid_slot));
0569     rcu_read_unlock();
0570 }
0571 
0572 static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
0573 {
0574     int ret = 0;
0575 
0576     if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
0577         "node %d received its own msg\n", le32_to_cpu(msg->slot)))
0578         return -1;
0579     switch (le32_to_cpu(msg->type)) {
0580     case METADATA_UPDATED:
0581         process_metadata_update(mddev, msg);
0582         break;
0583     case CHANGE_CAPACITY:
0584         set_capacity_and_notify(mddev->gendisk, mddev->array_sectors);
0585         break;
0586     case RESYNCING:
0587         set_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
0588         process_suspend_info(mddev, le32_to_cpu(msg->slot),
0589                      le64_to_cpu(msg->low),
0590                      le64_to_cpu(msg->high));
0591         break;
0592     case NEWDISK:
0593         process_add_new_disk(mddev, msg);
0594         break;
0595     case REMOVE:
0596         process_remove_disk(mddev, msg);
0597         break;
0598     case RE_ADD:
0599         process_readd_disk(mddev, msg);
0600         break;
0601     case BITMAP_NEEDS_SYNC:
0602         __recover_slot(mddev, le32_to_cpu(msg->slot));
0603         break;
0604     case BITMAP_RESIZE:
0605         if (le64_to_cpu(msg->high) != mddev->pers->size(mddev, 0, 0))
0606             ret = md_bitmap_resize(mddev->bitmap,
0607                         le64_to_cpu(msg->high), 0, 0);
0608         break;
0609     default:
0610         ret = -1;
0611         pr_warn("%s:%d Received unknown message from %d\n",
0612             __func__, __LINE__, msg->slot);
0613     }
0614     return ret;
0615 }
0616 
0617 /*
0618  * thread for receiving message
0619  */
0620 static void recv_daemon(struct md_thread *thread)
0621 {
0622     struct md_cluster_info *cinfo = thread->mddev->cluster_info;
0623     struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
0624     struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
0625     struct cluster_msg msg;
0626     int ret;
0627 
0628     mutex_lock(&cinfo->recv_mutex);
0629     /*get CR on Message*/
0630     if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
0631         pr_err("md/raid1:failed to get CR on MESSAGE\n");
0632         mutex_unlock(&cinfo->recv_mutex);
0633         return;
0634     }
0635 
0636     /* read lvb and wake up thread to process this message_lockres */
0637     memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
0638     ret = process_recvd_msg(thread->mddev, &msg);
0639     if (ret)
0640         goto out;
0641 
0642     /*release CR on ack_lockres*/
0643     ret = dlm_unlock_sync(ack_lockres);
0644     if (unlikely(ret != 0))
0645         pr_info("unlock ack failed return %d\n", ret);
0646     /*up-convert to PR on message_lockres*/
0647     ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
0648     if (unlikely(ret != 0))
0649         pr_info("lock PR on msg failed return %d\n", ret);
0650     /*get CR on ack_lockres again*/
0651     ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
0652     if (unlikely(ret != 0))
0653         pr_info("lock CR on ack failed return %d\n", ret);
0654 out:
0655     /*release CR on message_lockres*/
0656     ret = dlm_unlock_sync(message_lockres);
0657     if (unlikely(ret != 0))
0658         pr_info("unlock msg failed return %d\n", ret);
0659     mutex_unlock(&cinfo->recv_mutex);
0660 }
0661 
0662 /* lock_token()
0663  * Takes the lock on the TOKEN lock resource so no other
0664  * node can communicate while the operation is underway.
0665  */
0666 static int lock_token(struct md_cluster_info *cinfo)
0667 {
0668     int error;
0669 
0670     error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
0671     if (error) {
0672         pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
0673                 __func__, __LINE__, error);
0674     } else {
0675         /* Lock the receive sequence */
0676         mutex_lock(&cinfo->recv_mutex);
0677     }
0678     return error;
0679 }
0680 
0681 /* lock_comm()
0682  * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
0683  */
0684 static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked)
0685 {
0686     int rv, set_bit = 0;
0687     struct mddev *mddev = cinfo->mddev;
0688 
0689     /*
0690      * If resync thread run after raid1d thread, then process_metadata_update
0691      * could not continue if raid1d held reconfig_mutex (and raid1d is blocked
0692      * since another node already got EX on Token and waiting the EX of Ack),
0693      * so let resync wake up thread in case flag is set.
0694      */
0695     if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
0696                       &cinfo->state)) {
0697         rv = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
0698                           &cinfo->state);
0699         WARN_ON_ONCE(rv);
0700         md_wakeup_thread(mddev->thread);
0701         set_bit = 1;
0702     }
0703 
0704     wait_event(cinfo->wait,
0705            !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
0706     rv = lock_token(cinfo);
0707     if (set_bit)
0708         clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
0709     return rv;
0710 }
0711 
0712 static void unlock_comm(struct md_cluster_info *cinfo)
0713 {
0714     WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
0715     mutex_unlock(&cinfo->recv_mutex);
0716     dlm_unlock_sync(cinfo->token_lockres);
0717     clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
0718     wake_up(&cinfo->wait);
0719 }
0720 
0721 /* __sendmsg()
0722  * This function performs the actual sending of the message. This function is
0723  * usually called after performing the encompassing operation
0724  * The function:
0725  * 1. Grabs the message lockresource in EX mode
0726  * 2. Copies the message to the message LVB
0727  * 3. Downconverts message lockresource to CW
0728  * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
0729  *    and the other nodes read the message. The thread will wait here until all other
0730  *    nodes have released ack lock resource.
0731  * 5. Downconvert ack lockresource to CR
0732  */
0733 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
0734 {
0735     int error;
0736     int slot = cinfo->slot_number - 1;
0737 
0738     cmsg->slot = cpu_to_le32(slot);
0739     /*get EX on Message*/
0740     error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
0741     if (error) {
0742         pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
0743         goto failed_message;
0744     }
0745 
0746     memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
0747             sizeof(struct cluster_msg));
0748     /*down-convert EX to CW on Message*/
0749     error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
0750     if (error) {
0751         pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
0752                 error);
0753         goto failed_ack;
0754     }
0755 
0756     /*up-convert CR to EX on Ack*/
0757     error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
0758     if (error) {
0759         pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
0760                 error);
0761         goto failed_ack;
0762     }
0763 
0764     /*down-convert EX to CR on Ack*/
0765     error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
0766     if (error) {
0767         pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
0768                 error);
0769         goto failed_ack;
0770     }
0771 
0772 failed_ack:
0773     error = dlm_unlock_sync(cinfo->message_lockres);
0774     if (unlikely(error != 0)) {
0775         pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
0776             error);
0777         /* in case the message can't be released due to some reason */
0778         goto failed_ack;
0779     }
0780 failed_message:
0781     return error;
0782 }
0783 
0784 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg,
0785            bool mddev_locked)
0786 {
0787     int ret;
0788 
0789     ret = lock_comm(cinfo, mddev_locked);
0790     if (!ret) {
0791         ret = __sendmsg(cinfo, cmsg);
0792         unlock_comm(cinfo);
0793     }
0794     return ret;
0795 }
0796 
0797 static int gather_all_resync_info(struct mddev *mddev, int total_slots)
0798 {
0799     struct md_cluster_info *cinfo = mddev->cluster_info;
0800     int i, ret = 0;
0801     struct dlm_lock_resource *bm_lockres;
0802     char str[64];
0803     sector_t lo, hi;
0804 
0805 
0806     for (i = 0; i < total_slots; i++) {
0807         memset(str, '\0', 64);
0808         snprintf(str, 64, "bitmap%04d", i);
0809         bm_lockres = lockres_init(mddev, str, NULL, 1);
0810         if (!bm_lockres)
0811             return -ENOMEM;
0812         if (i == (cinfo->slot_number - 1)) {
0813             lockres_free(bm_lockres);
0814             continue;
0815         }
0816 
0817         bm_lockres->flags |= DLM_LKF_NOQUEUE;
0818         ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
0819         if (ret == -EAGAIN) {
0820             if (read_resync_info(mddev, bm_lockres)) {
0821                 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
0822                         __func__, __LINE__,
0823                     (unsigned long long) cinfo->suspend_lo,
0824                     (unsigned long long) cinfo->suspend_hi,
0825                     i);
0826                 cinfo->suspend_from = i;
0827             }
0828             ret = 0;
0829             lockres_free(bm_lockres);
0830             continue;
0831         }
0832         if (ret) {
0833             lockres_free(bm_lockres);
0834             goto out;
0835         }
0836 
0837         /* Read the disk bitmap sb and check if it needs recovery */
0838         ret = md_bitmap_copy_from_slot(mddev, i, &lo, &hi, false);
0839         if (ret) {
0840             pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
0841             lockres_free(bm_lockres);
0842             continue;
0843         }
0844         if ((hi > 0) && (lo < mddev->recovery_cp)) {
0845             set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
0846             mddev->recovery_cp = lo;
0847             md_check_recovery(mddev);
0848         }
0849 
0850         lockres_free(bm_lockres);
0851     }
0852 out:
0853     return ret;
0854 }
0855 
0856 static int join(struct mddev *mddev, int nodes)
0857 {
0858     struct md_cluster_info *cinfo;
0859     int ret, ops_rv;
0860     char str[64];
0861 
0862     cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
0863     if (!cinfo)
0864         return -ENOMEM;
0865 
0866     INIT_LIST_HEAD(&cinfo->suspend_list);
0867     spin_lock_init(&cinfo->suspend_lock);
0868     init_completion(&cinfo->completion);
0869     set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
0870     init_waitqueue_head(&cinfo->wait);
0871     mutex_init(&cinfo->recv_mutex);
0872 
0873     mddev->cluster_info = cinfo;
0874     cinfo->mddev = mddev;
0875 
0876     memset(str, 0, 64);
0877     sprintf(str, "%pU", mddev->uuid);
0878     ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
0879                 DLM_LSFL_FS, LVB_SIZE,
0880                 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
0881     if (ret)
0882         goto err;
0883     wait_for_completion(&cinfo->completion);
0884     if (nodes < cinfo->slot_number) {
0885         pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
0886             cinfo->slot_number, nodes);
0887         ret = -ERANGE;
0888         goto err;
0889     }
0890     /* Initiate the communication resources */
0891     ret = -ENOMEM;
0892     cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
0893     if (!cinfo->recv_thread) {
0894         pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
0895         goto err;
0896     }
0897     cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
0898     if (!cinfo->message_lockres)
0899         goto err;
0900     cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
0901     if (!cinfo->token_lockres)
0902         goto err;
0903     cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
0904     if (!cinfo->no_new_dev_lockres)
0905         goto err;
0906 
0907     ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
0908     if (ret) {
0909         ret = -EAGAIN;
0910         pr_err("md-cluster: can't join cluster to avoid lock issue\n");
0911         goto err;
0912     }
0913     cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
0914     if (!cinfo->ack_lockres) {
0915         ret = -ENOMEM;
0916         goto err;
0917     }
0918     /* get sync CR lock on ACK. */
0919     if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
0920         pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
0921                 ret);
0922     dlm_unlock_sync(cinfo->token_lockres);
0923     /* get sync CR lock on no-new-dev. */
0924     if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
0925         pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
0926 
0927 
0928     pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
0929     snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
0930     cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
0931     if (!cinfo->bitmap_lockres) {
0932         ret = -ENOMEM;
0933         goto err;
0934     }
0935     if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
0936         pr_err("Failed to get bitmap lock\n");
0937         ret = -EINVAL;
0938         goto err;
0939     }
0940 
0941     cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
0942     if (!cinfo->resync_lockres) {
0943         ret = -ENOMEM;
0944         goto err;
0945     }
0946 
0947     return 0;
0948 err:
0949     set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
0950     md_unregister_thread(&cinfo->recovery_thread);
0951     md_unregister_thread(&cinfo->recv_thread);
0952     lockres_free(cinfo->message_lockres);
0953     lockres_free(cinfo->token_lockres);
0954     lockres_free(cinfo->ack_lockres);
0955     lockres_free(cinfo->no_new_dev_lockres);
0956     lockres_free(cinfo->resync_lockres);
0957     lockres_free(cinfo->bitmap_lockres);
0958     if (cinfo->lockspace)
0959         dlm_release_lockspace(cinfo->lockspace, 2);
0960     mddev->cluster_info = NULL;
0961     kfree(cinfo);
0962     return ret;
0963 }
0964 
0965 static void load_bitmaps(struct mddev *mddev, int total_slots)
0966 {
0967     struct md_cluster_info *cinfo = mddev->cluster_info;
0968 
0969     /* load all the node's bitmap info for resync */
0970     if (gather_all_resync_info(mddev, total_slots))
0971         pr_err("md-cluster: failed to gather all resyn infos\n");
0972     set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state);
0973     /* wake up recv thread in case something need to be handled */
0974     if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state))
0975         md_wakeup_thread(cinfo->recv_thread);
0976 }
0977 
0978 static void resync_bitmap(struct mddev *mddev)
0979 {
0980     struct md_cluster_info *cinfo = mddev->cluster_info;
0981     struct cluster_msg cmsg = {0};
0982     int err;
0983 
0984     cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
0985     err = sendmsg(cinfo, &cmsg, 1);
0986     if (err)
0987         pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
0988             __func__, __LINE__, err);
0989 }
0990 
0991 static void unlock_all_bitmaps(struct mddev *mddev);
0992 static int leave(struct mddev *mddev)
0993 {
0994     struct md_cluster_info *cinfo = mddev->cluster_info;
0995 
0996     if (!cinfo)
0997         return 0;
0998 
0999     /*
1000      * BITMAP_NEEDS_SYNC message should be sent when node
1001      * is leaving the cluster with dirty bitmap, also we
1002      * can only deliver it when dlm connection is available.
1003      *
1004      * Also, we should send BITMAP_NEEDS_SYNC message in
1005      * case reshaping is interrupted.
1006      */
1007     if ((cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector) ||
1008         (mddev->reshape_position != MaxSector &&
1009          test_bit(MD_CLOSING, &mddev->flags)))
1010         resync_bitmap(mddev);
1011 
1012     set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1013     md_unregister_thread(&cinfo->recovery_thread);
1014     md_unregister_thread(&cinfo->recv_thread);
1015     lockres_free(cinfo->message_lockres);
1016     lockres_free(cinfo->token_lockres);
1017     lockres_free(cinfo->ack_lockres);
1018     lockres_free(cinfo->no_new_dev_lockres);
1019     lockres_free(cinfo->resync_lockres);
1020     lockres_free(cinfo->bitmap_lockres);
1021     unlock_all_bitmaps(mddev);
1022     dlm_release_lockspace(cinfo->lockspace, 2);
1023     kfree(cinfo);
1024     return 0;
1025 }
1026 
1027 /* slot_number(): Returns the MD slot number to use
1028  * DLM starts the slot numbers from 1, wheras cluster-md
1029  * wants the number to be from zero, so we deduct one
1030  */
1031 static int slot_number(struct mddev *mddev)
1032 {
1033     struct md_cluster_info *cinfo = mddev->cluster_info;
1034 
1035     return cinfo->slot_number - 1;
1036 }
1037 
1038 /*
1039  * Check if the communication is already locked, else lock the communication
1040  * channel.
1041  * If it is already locked, token is in EX mode, and hence lock_token()
1042  * should not be called.
1043  */
1044 static int metadata_update_start(struct mddev *mddev)
1045 {
1046     struct md_cluster_info *cinfo = mddev->cluster_info;
1047     int ret;
1048 
1049     /*
1050      * metadata_update_start is always called with the protection of
1051      * reconfig_mutex, so set WAITING_FOR_TOKEN here.
1052      */
1053     ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
1054                     &cinfo->state);
1055     WARN_ON_ONCE(ret);
1056     md_wakeup_thread(mddev->thread);
1057 
1058     wait_event(cinfo->wait,
1059            !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
1060            test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
1061 
1062     /* If token is already locked, return 0 */
1063     if (cinfo->token_lockres->mode == DLM_LOCK_EX) {
1064         clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1065         return 0;
1066     }
1067 
1068     ret = lock_token(cinfo);
1069     clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1070     return ret;
1071 }
1072 
1073 static int metadata_update_finish(struct mddev *mddev)
1074 {
1075     struct md_cluster_info *cinfo = mddev->cluster_info;
1076     struct cluster_msg cmsg;
1077     struct md_rdev *rdev;
1078     int ret = 0;
1079     int raid_slot = -1;
1080 
1081     memset(&cmsg, 0, sizeof(cmsg));
1082     cmsg.type = cpu_to_le32(METADATA_UPDATED);
1083     /* Pick up a good active device number to send.
1084      */
1085     rdev_for_each(rdev, mddev)
1086         if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
1087             raid_slot = rdev->desc_nr;
1088             break;
1089         }
1090     if (raid_slot >= 0) {
1091         cmsg.raid_slot = cpu_to_le32(raid_slot);
1092         ret = __sendmsg(cinfo, &cmsg);
1093     } else
1094         pr_warn("md-cluster: No good device id found to send\n");
1095     clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1096     unlock_comm(cinfo);
1097     return ret;
1098 }
1099 
1100 static void metadata_update_cancel(struct mddev *mddev)
1101 {
1102     struct md_cluster_info *cinfo = mddev->cluster_info;
1103     clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1104     unlock_comm(cinfo);
1105 }
1106 
1107 static int update_bitmap_size(struct mddev *mddev, sector_t size)
1108 {
1109     struct md_cluster_info *cinfo = mddev->cluster_info;
1110     struct cluster_msg cmsg = {0};
1111     int ret;
1112 
1113     cmsg.type = cpu_to_le32(BITMAP_RESIZE);
1114     cmsg.high = cpu_to_le64(size);
1115     ret = sendmsg(cinfo, &cmsg, 0);
1116     if (ret)
1117         pr_err("%s:%d: failed to send BITMAP_RESIZE message (%d)\n",
1118             __func__, __LINE__, ret);
1119     return ret;
1120 }
1121 
1122 static int resize_bitmaps(struct mddev *mddev, sector_t newsize, sector_t oldsize)
1123 {
1124     struct bitmap_counts *counts;
1125     char str[64];
1126     struct dlm_lock_resource *bm_lockres;
1127     struct bitmap *bitmap = mddev->bitmap;
1128     unsigned long my_pages = bitmap->counts.pages;
1129     int i, rv;
1130 
1131     /*
1132      * We need to ensure all the nodes can grow to a larger
1133      * bitmap size before make the reshaping.
1134      */
1135     rv = update_bitmap_size(mddev, newsize);
1136     if (rv)
1137         return rv;
1138 
1139     for (i = 0; i < mddev->bitmap_info.nodes; i++) {
1140         if (i == md_cluster_ops->slot_number(mddev))
1141             continue;
1142 
1143         bitmap = get_bitmap_from_slot(mddev, i);
1144         if (IS_ERR(bitmap)) {
1145             pr_err("can't get bitmap from slot %d\n", i);
1146             bitmap = NULL;
1147             goto out;
1148         }
1149         counts = &bitmap->counts;
1150 
1151         /*
1152          * If we can hold the bitmap lock of one node then
1153          * the slot is not occupied, update the pages.
1154          */
1155         snprintf(str, 64, "bitmap%04d", i);
1156         bm_lockres = lockres_init(mddev, str, NULL, 1);
1157         if (!bm_lockres) {
1158             pr_err("Cannot initialize %s lock\n", str);
1159             goto out;
1160         }
1161         bm_lockres->flags |= DLM_LKF_NOQUEUE;
1162         rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
1163         if (!rv)
1164             counts->pages = my_pages;
1165         lockres_free(bm_lockres);
1166 
1167         if (my_pages != counts->pages)
1168             /*
1169              * Let's revert the bitmap size if one node
1170              * can't resize bitmap
1171              */
1172             goto out;
1173         md_bitmap_free(bitmap);
1174     }
1175 
1176     return 0;
1177 out:
1178     md_bitmap_free(bitmap);
1179     update_bitmap_size(mddev, oldsize);
1180     return -1;
1181 }
1182 
1183 /*
1184  * return 0 if all the bitmaps have the same sync_size
1185  */
1186 static int cluster_check_sync_size(struct mddev *mddev)
1187 {
1188     int i, rv;
1189     bitmap_super_t *sb;
1190     unsigned long my_sync_size, sync_size = 0;
1191     int node_num = mddev->bitmap_info.nodes;
1192     int current_slot = md_cluster_ops->slot_number(mddev);
1193     struct bitmap *bitmap = mddev->bitmap;
1194     char str[64];
1195     struct dlm_lock_resource *bm_lockres;
1196 
1197     sb = kmap_atomic(bitmap->storage.sb_page);
1198     my_sync_size = sb->sync_size;
1199     kunmap_atomic(sb);
1200 
1201     for (i = 0; i < node_num; i++) {
1202         if (i == current_slot)
1203             continue;
1204 
1205         bitmap = get_bitmap_from_slot(mddev, i);
1206         if (IS_ERR(bitmap)) {
1207             pr_err("can't get bitmap from slot %d\n", i);
1208             return -1;
1209         }
1210 
1211         /*
1212          * If we can hold the bitmap lock of one node then
1213          * the slot is not occupied, update the sb.
1214          */
1215         snprintf(str, 64, "bitmap%04d", i);
1216         bm_lockres = lockres_init(mddev, str, NULL, 1);
1217         if (!bm_lockres) {
1218             pr_err("md-cluster: Cannot initialize %s\n", str);
1219             md_bitmap_free(bitmap);
1220             return -1;
1221         }
1222         bm_lockres->flags |= DLM_LKF_NOQUEUE;
1223         rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
1224         if (!rv)
1225             md_bitmap_update_sb(bitmap);
1226         lockres_free(bm_lockres);
1227 
1228         sb = kmap_atomic(bitmap->storage.sb_page);
1229         if (sync_size == 0)
1230             sync_size = sb->sync_size;
1231         else if (sync_size != sb->sync_size) {
1232             kunmap_atomic(sb);
1233             md_bitmap_free(bitmap);
1234             return -1;
1235         }
1236         kunmap_atomic(sb);
1237         md_bitmap_free(bitmap);
1238     }
1239 
1240     return (my_sync_size == sync_size) ? 0 : -1;
1241 }
1242 
1243 /*
1244  * Update the size for cluster raid is a little more complex, we perform it
1245  * by the steps:
1246  * 1. hold token lock and update superblock in initiator node.
1247  * 2. send METADATA_UPDATED msg to other nodes.
1248  * 3. The initiator node continues to check each bitmap's sync_size, if all
1249  *    bitmaps have the same value of sync_size, then we can set capacity and
1250  *    let other nodes to perform it. If one node can't update sync_size
1251  *    accordingly, we need to revert to previous value.
1252  */
1253 static void update_size(struct mddev *mddev, sector_t old_dev_sectors)
1254 {
1255     struct md_cluster_info *cinfo = mddev->cluster_info;
1256     struct cluster_msg cmsg;
1257     struct md_rdev *rdev;
1258     int ret = 0;
1259     int raid_slot = -1;
1260 
1261     md_update_sb(mddev, 1);
1262     if (lock_comm(cinfo, 1)) {
1263         pr_err("%s: lock_comm failed\n", __func__);
1264         return;
1265     }
1266 
1267     memset(&cmsg, 0, sizeof(cmsg));
1268     cmsg.type = cpu_to_le32(METADATA_UPDATED);
1269     rdev_for_each(rdev, mddev)
1270         if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) {
1271             raid_slot = rdev->desc_nr;
1272             break;
1273         }
1274     if (raid_slot >= 0) {
1275         cmsg.raid_slot = cpu_to_le32(raid_slot);
1276         /*
1277          * We can only change capiticy after all the nodes can do it,
1278          * so need to wait after other nodes already received the msg
1279          * and handled the change
1280          */
1281         ret = __sendmsg(cinfo, &cmsg);
1282         if (ret) {
1283             pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
1284                    __func__, __LINE__);
1285             unlock_comm(cinfo);
1286             return;
1287         }
1288     } else {
1289         pr_err("md-cluster: No good device id found to send\n");
1290         unlock_comm(cinfo);
1291         return;
1292     }
1293 
1294     /*
1295      * check the sync_size from other node's bitmap, if sync_size
1296      * have already updated in other nodes as expected, send an
1297      * empty metadata msg to permit the change of capacity
1298      */
1299     if (cluster_check_sync_size(mddev) == 0) {
1300         memset(&cmsg, 0, sizeof(cmsg));
1301         cmsg.type = cpu_to_le32(CHANGE_CAPACITY);
1302         ret = __sendmsg(cinfo, &cmsg);
1303         if (ret)
1304             pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n",
1305                    __func__, __LINE__);
1306         set_capacity_and_notify(mddev->gendisk, mddev->array_sectors);
1307     } else {
1308         /* revert to previous sectors */
1309         ret = mddev->pers->resize(mddev, old_dev_sectors);
1310         ret = __sendmsg(cinfo, &cmsg);
1311         if (ret)
1312             pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
1313                    __func__, __LINE__);
1314     }
1315     unlock_comm(cinfo);
1316 }
1317 
1318 static int resync_start(struct mddev *mddev)
1319 {
1320     struct md_cluster_info *cinfo = mddev->cluster_info;
1321     return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev);
1322 }
1323 
1324 static void resync_info_get(struct mddev *mddev, sector_t *lo, sector_t *hi)
1325 {
1326     struct md_cluster_info *cinfo = mddev->cluster_info;
1327 
1328     spin_lock_irq(&cinfo->suspend_lock);
1329     *lo = cinfo->suspend_lo;
1330     *hi = cinfo->suspend_hi;
1331     spin_unlock_irq(&cinfo->suspend_lock);
1332 }
1333 
1334 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
1335 {
1336     struct md_cluster_info *cinfo = mddev->cluster_info;
1337     struct resync_info ri;
1338     struct cluster_msg cmsg = {0};
1339 
1340     /* do not send zero again, if we have sent before */
1341     if (hi == 0) {
1342         memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
1343         if (le64_to_cpu(ri.hi) == 0)
1344             return 0;
1345     }
1346 
1347     add_resync_info(cinfo->bitmap_lockres, lo, hi);
1348     /* Re-acquire the lock to refresh LVB */
1349     dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
1350     cmsg.type = cpu_to_le32(RESYNCING);
1351     cmsg.low = cpu_to_le64(lo);
1352     cmsg.high = cpu_to_le64(hi);
1353 
1354     /*
1355      * mddev_lock is held if resync_info_update is called from
1356      * resync_finish (md_reap_sync_thread -> resync_finish)
1357      */
1358     if (lo == 0 && hi == 0)
1359         return sendmsg(cinfo, &cmsg, 1);
1360     else
1361         return sendmsg(cinfo, &cmsg, 0);
1362 }
1363 
1364 static int resync_finish(struct mddev *mddev)
1365 {
1366     struct md_cluster_info *cinfo = mddev->cluster_info;
1367     int ret = 0;
1368 
1369     clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
1370 
1371     /*
1372      * If resync thread is interrupted so we can't say resync is finished,
1373      * another node will launch resync thread to continue.
1374      */
1375     if (!test_bit(MD_CLOSING, &mddev->flags))
1376         ret = resync_info_update(mddev, 0, 0);
1377     dlm_unlock_sync(cinfo->resync_lockres);
1378     return ret;
1379 }
1380 
1381 static int area_resyncing(struct mddev *mddev, int direction,
1382         sector_t lo, sector_t hi)
1383 {
1384     struct md_cluster_info *cinfo = mddev->cluster_info;
1385     int ret = 0;
1386 
1387     if ((direction == READ) &&
1388         test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
1389         return 1;
1390 
1391     spin_lock_irq(&cinfo->suspend_lock);
1392     if (hi > cinfo->suspend_lo && lo < cinfo->suspend_hi)
1393         ret = 1;
1394     spin_unlock_irq(&cinfo->suspend_lock);
1395     return ret;
1396 }
1397 
1398 /* add_new_disk() - initiates a disk add
1399  * However, if this fails before writing md_update_sb(),
1400  * add_new_disk_cancel() must be called to release token lock
1401  */
1402 static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
1403 {
1404     struct md_cluster_info *cinfo = mddev->cluster_info;
1405     struct cluster_msg cmsg;
1406     int ret = 0;
1407     struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
1408     char *uuid = sb->device_uuid;
1409 
1410     memset(&cmsg, 0, sizeof(cmsg));
1411     cmsg.type = cpu_to_le32(NEWDISK);
1412     memcpy(cmsg.uuid, uuid, 16);
1413     cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1414     if (lock_comm(cinfo, 1))
1415         return -EAGAIN;
1416     ret = __sendmsg(cinfo, &cmsg);
1417     if (ret) {
1418         unlock_comm(cinfo);
1419         return ret;
1420     }
1421     cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
1422     ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
1423     cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
1424     /* Some node does not "see" the device */
1425     if (ret == -EAGAIN)
1426         ret = -ENOENT;
1427     if (ret)
1428         unlock_comm(cinfo);
1429     else {
1430         dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
1431         /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
1432          * will run soon after add_new_disk, the below path will be
1433          * invoked:
1434          *   md_wakeup_thread(mddev->thread)
1435          *  -> conf->thread (raid1d)
1436          *  -> md_check_recovery -> md_update_sb
1437          *  -> metadata_update_start/finish
1438          * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
1439          *
1440          * For other failure cases, metadata_update_cancel and
1441          * add_new_disk_cancel also clear below bit as well.
1442          * */
1443         set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1444         wake_up(&cinfo->wait);
1445     }
1446     return ret;
1447 }
1448 
1449 static void add_new_disk_cancel(struct mddev *mddev)
1450 {
1451     struct md_cluster_info *cinfo = mddev->cluster_info;
1452     clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1453     unlock_comm(cinfo);
1454 }
1455 
1456 static int new_disk_ack(struct mddev *mddev, bool ack)
1457 {
1458     struct md_cluster_info *cinfo = mddev->cluster_info;
1459 
1460     if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
1461         pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
1462         return -EINVAL;
1463     }
1464 
1465     if (ack)
1466         dlm_unlock_sync(cinfo->no_new_dev_lockres);
1467     complete(&cinfo->newdisk_completion);
1468     return 0;
1469 }
1470 
1471 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
1472 {
1473     struct cluster_msg cmsg = {0};
1474     struct md_cluster_info *cinfo = mddev->cluster_info;
1475     cmsg.type = cpu_to_le32(REMOVE);
1476     cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1477     return sendmsg(cinfo, &cmsg, 1);
1478 }
1479 
1480 static int lock_all_bitmaps(struct mddev *mddev)
1481 {
1482     int slot, my_slot, ret, held = 1, i = 0;
1483     char str[64];
1484     struct md_cluster_info *cinfo = mddev->cluster_info;
1485 
1486     cinfo->other_bitmap_lockres =
1487         kcalloc(mddev->bitmap_info.nodes - 1,
1488             sizeof(struct dlm_lock_resource *), GFP_KERNEL);
1489     if (!cinfo->other_bitmap_lockres) {
1490         pr_err("md: can't alloc mem for other bitmap locks\n");
1491         return 0;
1492     }
1493 
1494     my_slot = slot_number(mddev);
1495     for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
1496         if (slot == my_slot)
1497             continue;
1498 
1499         memset(str, '\0', 64);
1500         snprintf(str, 64, "bitmap%04d", slot);
1501         cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
1502         if (!cinfo->other_bitmap_lockres[i])
1503             return -ENOMEM;
1504 
1505         cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
1506         ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
1507         if (ret)
1508             held = -1;
1509         i++;
1510     }
1511 
1512     return held;
1513 }
1514 
1515 static void unlock_all_bitmaps(struct mddev *mddev)
1516 {
1517     struct md_cluster_info *cinfo = mddev->cluster_info;
1518     int i;
1519 
1520     /* release other node's bitmap lock if they are existed */
1521     if (cinfo->other_bitmap_lockres) {
1522         for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
1523             if (cinfo->other_bitmap_lockres[i]) {
1524                 lockres_free(cinfo->other_bitmap_lockres[i]);
1525             }
1526         }
1527         kfree(cinfo->other_bitmap_lockres);
1528         cinfo->other_bitmap_lockres = NULL;
1529     }
1530 }
1531 
1532 static int gather_bitmaps(struct md_rdev *rdev)
1533 {
1534     int sn, err;
1535     sector_t lo, hi;
1536     struct cluster_msg cmsg = {0};
1537     struct mddev *mddev = rdev->mddev;
1538     struct md_cluster_info *cinfo = mddev->cluster_info;
1539 
1540     cmsg.type = cpu_to_le32(RE_ADD);
1541     cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1542     err = sendmsg(cinfo, &cmsg, 1);
1543     if (err)
1544         goto out;
1545 
1546     for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
1547         if (sn == (cinfo->slot_number - 1))
1548             continue;
1549         err = md_bitmap_copy_from_slot(mddev, sn, &lo, &hi, false);
1550         if (err) {
1551             pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
1552             goto out;
1553         }
1554         if ((hi > 0) && (lo < mddev->recovery_cp))
1555             mddev->recovery_cp = lo;
1556     }
1557 out:
1558     return err;
1559 }
1560 
1561 static struct md_cluster_operations cluster_ops = {
1562     .join   = join,
1563     .leave  = leave,
1564     .slot_number = slot_number,
1565     .resync_start = resync_start,
1566     .resync_finish = resync_finish,
1567     .resync_info_update = resync_info_update,
1568     .resync_info_get = resync_info_get,
1569     .metadata_update_start = metadata_update_start,
1570     .metadata_update_finish = metadata_update_finish,
1571     .metadata_update_cancel = metadata_update_cancel,
1572     .area_resyncing = area_resyncing,
1573     .add_new_disk = add_new_disk,
1574     .add_new_disk_cancel = add_new_disk_cancel,
1575     .new_disk_ack = new_disk_ack,
1576     .remove_disk = remove_disk,
1577     .load_bitmaps = load_bitmaps,
1578     .gather_bitmaps = gather_bitmaps,
1579     .resize_bitmaps = resize_bitmaps,
1580     .lock_all_bitmaps = lock_all_bitmaps,
1581     .unlock_all_bitmaps = unlock_all_bitmaps,
1582     .update_size = update_size,
1583 };
1584 
1585 static int __init cluster_init(void)
1586 {
1587     pr_warn("md-cluster: support raid1 and raid10 (limited support)\n");
1588     pr_info("Registering Cluster MD functions\n");
1589     register_md_cluster_operations(&cluster_ops, THIS_MODULE);
1590     return 0;
1591 }
1592 
1593 static void cluster_exit(void)
1594 {
1595     unregister_md_cluster_operations();
1596 }
1597 
1598 module_init(cluster_init);
1599 module_exit(cluster_exit);
1600 MODULE_AUTHOR("SUSE");
1601 MODULE_LICENSE("GPL");
1602 MODULE_DESCRIPTION("Clustering support for MD");