Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-or-later
0002 /*
0003  * dlmconvert.c
0004  *
0005  * underlying calls for lock conversion
0006  *
0007  * Copyright (C) 2004 Oracle.  All rights reserved.
0008  */
0009 
0010 
0011 #include <linux/module.h>
0012 #include <linux/fs.h>
0013 #include <linux/types.h>
0014 #include <linux/highmem.h>
0015 #include <linux/init.h>
0016 #include <linux/sysctl.h>
0017 #include <linux/random.h>
0018 #include <linux/blkdev.h>
0019 #include <linux/socket.h>
0020 #include <linux/inet.h>
0021 #include <linux/spinlock.h>
0022 
0023 
0024 #include "../cluster/heartbeat.h"
0025 #include "../cluster/nodemanager.h"
0026 #include "../cluster/tcp.h"
0027 
0028 #include "dlmapi.h"
0029 #include "dlmcommon.h"
0030 
0031 #include "dlmconvert.h"
0032 
0033 #define MLOG_MASK_PREFIX ML_DLM
0034 #include "../cluster/masklog.h"
0035 
0036 /* NOTE: __dlmconvert_master is the only function in here that
0037  * needs a spinlock held on entry (res->spinlock) and it is the
0038  * only one that holds a lock on exit (res->spinlock).
0039  * All other functions in here need no locks and drop all of
0040  * the locks that they acquire. */
0041 static enum dlm_status __dlmconvert_master(struct dlm_ctxt *dlm,
0042                        struct dlm_lock_resource *res,
0043                        struct dlm_lock *lock, int flags,
0044                        int type, int *call_ast,
0045                        int *kick_thread);
0046 static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
0047                        struct dlm_lock_resource *res,
0048                        struct dlm_lock *lock, int flags, int type);
0049 
0050 /*
0051  * this is only called directly by dlmlock(), and only when the
0052  * local node is the owner of the lockres
0053  * locking:
0054  *   caller needs:  none
0055  *   taken:         takes and drops res->spinlock
0056  *   held on exit:  none
0057  * returns: see __dlmconvert_master
0058  */
0059 enum dlm_status dlmconvert_master(struct dlm_ctxt *dlm,
0060                   struct dlm_lock_resource *res,
0061                   struct dlm_lock *lock, int flags, int type)
0062 {
0063     int call_ast = 0, kick_thread = 0;
0064     enum dlm_status status;
0065 
0066     spin_lock(&res->spinlock);
0067     /* we are not in a network handler, this is fine */
0068     __dlm_wait_on_lockres(res);
0069     __dlm_lockres_reserve_ast(res);
0070     res->state |= DLM_LOCK_RES_IN_PROGRESS;
0071 
0072     status = __dlmconvert_master(dlm, res, lock, flags, type,
0073                      &call_ast, &kick_thread);
0074 
0075     res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
0076     spin_unlock(&res->spinlock);
0077     wake_up(&res->wq);
0078     if (status != DLM_NORMAL && status != DLM_NOTQUEUED)
0079         dlm_error(status);
0080 
0081     /* either queue the ast or release it */
0082     if (call_ast)
0083         dlm_queue_ast(dlm, lock);
0084     else
0085         dlm_lockres_release_ast(dlm, res);
0086 
0087     if (kick_thread)
0088         dlm_kick_thread(dlm, res);
0089 
0090     return status;
0091 }
0092 
0093 /* performs lock conversion at the lockres master site
0094  * locking:
0095  *   caller needs:  res->spinlock
0096  *   taken:         takes and drops lock->spinlock
0097  *   held on exit:  res->spinlock
0098  * returns: DLM_NORMAL, DLM_NOTQUEUED, DLM_DENIED
0099  *   call_ast: whether ast should be called for this lock
0100  *   kick_thread: whether dlm_kick_thread should be called
0101  */
0102 static enum dlm_status __dlmconvert_master(struct dlm_ctxt *dlm,
0103                        struct dlm_lock_resource *res,
0104                        struct dlm_lock *lock, int flags,
0105                        int type, int *call_ast,
0106                        int *kick_thread)
0107 {
0108     enum dlm_status status = DLM_NORMAL;
0109     struct dlm_lock *tmplock=NULL;
0110 
0111     assert_spin_locked(&res->spinlock);
0112 
0113     mlog(0, "type=%d, convert_type=%d, new convert_type=%d\n",
0114          lock->ml.type, lock->ml.convert_type, type);
0115 
0116     spin_lock(&lock->spinlock);
0117 
0118     /* already converting? */
0119     if (lock->ml.convert_type != LKM_IVMODE) {
0120         mlog(ML_ERROR, "attempted to convert a lock with a lock "
0121              "conversion pending\n");
0122         status = DLM_DENIED;
0123         goto unlock_exit;
0124     }
0125 
0126     /* must be on grant queue to convert */
0127     if (!dlm_lock_on_list(&res->granted, lock)) {
0128         mlog(ML_ERROR, "attempted to convert a lock not on grant "
0129              "queue\n");
0130         status = DLM_DENIED;
0131         goto unlock_exit;
0132     }
0133 
0134     if (flags & LKM_VALBLK) {
0135         switch (lock->ml.type) {
0136             case LKM_EXMODE:
0137                 /* EX + LKM_VALBLK + convert == set lvb */
0138                 mlog(0, "will set lvb: converting %s->%s\n",
0139                      dlm_lock_mode_name(lock->ml.type),
0140                      dlm_lock_mode_name(type));
0141                 lock->lksb->flags |= DLM_LKSB_PUT_LVB;
0142                 break;
0143             case LKM_PRMODE:
0144             case LKM_NLMODE:
0145                 /* refetch if new level is not NL */
0146                 if (type > LKM_NLMODE) {
0147                     mlog(0, "will fetch new value into "
0148                          "lvb: converting %s->%s\n",
0149                          dlm_lock_mode_name(lock->ml.type),
0150                          dlm_lock_mode_name(type));
0151                     lock->lksb->flags |= DLM_LKSB_GET_LVB;
0152                 } else {
0153                     mlog(0, "will NOT fetch new value "
0154                          "into lvb: converting %s->%s\n",
0155                          dlm_lock_mode_name(lock->ml.type),
0156                          dlm_lock_mode_name(type));
0157                     flags &= ~(LKM_VALBLK);
0158                 }
0159                 break;
0160         }
0161     }
0162 
0163 
0164     /* in-place downconvert? */
0165     if (type <= lock->ml.type)
0166         goto grant;
0167 
0168     /* upconvert from here on */
0169     status = DLM_NORMAL;
0170     list_for_each_entry(tmplock, &res->granted, list) {
0171         if (tmplock == lock)
0172             continue;
0173         if (!dlm_lock_compatible(tmplock->ml.type, type))
0174             goto switch_queues;
0175     }
0176 
0177     list_for_each_entry(tmplock, &res->converting, list) {
0178         if (!dlm_lock_compatible(tmplock->ml.type, type))
0179             goto switch_queues;
0180         /* existing conversion requests take precedence */
0181         if (!dlm_lock_compatible(tmplock->ml.convert_type, type))
0182             goto switch_queues;
0183     }
0184 
0185     /* fall thru to grant */
0186 
0187 grant:
0188     mlog(0, "res %.*s, granting %s lock\n", res->lockname.len,
0189          res->lockname.name, dlm_lock_mode_name(type));
0190     /* immediately grant the new lock type */
0191     lock->lksb->status = DLM_NORMAL;
0192     if (lock->ml.node == dlm->node_num)
0193         mlog(0, "doing in-place convert for nonlocal lock\n");
0194     lock->ml.type = type;
0195     if (lock->lksb->flags & DLM_LKSB_PUT_LVB)
0196         memcpy(res->lvb, lock->lksb->lvb, DLM_LVB_LEN);
0197 
0198     /*
0199      * Move the lock to the tail because it may be the only lock which has
0200      * an invalid lvb.
0201      */
0202     list_move_tail(&lock->list, &res->granted);
0203 
0204     status = DLM_NORMAL;
0205     *call_ast = 1;
0206     goto unlock_exit;
0207 
0208 switch_queues:
0209     if (flags & LKM_NOQUEUE) {
0210         mlog(0, "failed to convert NOQUEUE lock %.*s from "
0211              "%d to %d...\n", res->lockname.len, res->lockname.name,
0212              lock->ml.type, type);
0213         status = DLM_NOTQUEUED;
0214         goto unlock_exit;
0215     }
0216     mlog(0, "res %.*s, queueing...\n", res->lockname.len,
0217          res->lockname.name);
0218 
0219     lock->ml.convert_type = type;
0220     /* do not alter lock refcount.  switching lists. */
0221     list_move_tail(&lock->list, &res->converting);
0222 
0223 unlock_exit:
0224     spin_unlock(&lock->spinlock);
0225     if (status == DLM_DENIED) {
0226         __dlm_print_one_lock_resource(res);
0227     }
0228     if (status == DLM_NORMAL)
0229         *kick_thread = 1;
0230     return status;
0231 }
0232 
0233 void dlm_revert_pending_convert(struct dlm_lock_resource *res,
0234                 struct dlm_lock *lock)
0235 {
0236     /* do not alter lock refcount.  switching lists. */
0237     list_move_tail(&lock->list, &res->granted);
0238     lock->ml.convert_type = LKM_IVMODE;
0239     lock->lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
0240 }
0241 
0242 /* messages the master site to do lock conversion
0243  * locking:
0244  *   caller needs:  none
0245  *   taken:         takes and drops res->spinlock, uses DLM_LOCK_RES_IN_PROGRESS
0246  *   held on exit:  none
0247  * returns: DLM_NORMAL, DLM_RECOVERING, status from remote node
0248  */
0249 enum dlm_status dlmconvert_remote(struct dlm_ctxt *dlm,
0250                   struct dlm_lock_resource *res,
0251                   struct dlm_lock *lock, int flags, int type)
0252 {
0253     enum dlm_status status;
0254 
0255     mlog(0, "type=%d, convert_type=%d, busy=%d\n", lock->ml.type,
0256          lock->ml.convert_type, res->state & DLM_LOCK_RES_IN_PROGRESS);
0257 
0258     spin_lock(&res->spinlock);
0259     if (res->state & DLM_LOCK_RES_RECOVERING) {
0260         mlog(0, "bailing out early since res is RECOVERING "
0261              "on secondary queue\n");
0262         /* __dlm_print_one_lock_resource(res); */
0263         status = DLM_RECOVERING;
0264         goto bail;
0265     }
0266     /* will exit this call with spinlock held */
0267     __dlm_wait_on_lockres(res);
0268 
0269     if (lock->ml.convert_type != LKM_IVMODE) {
0270         __dlm_print_one_lock_resource(res);
0271         mlog(ML_ERROR, "converting a remote lock that is already "
0272              "converting! (cookie=%u:%llu, conv=%d)\n",
0273              dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
0274              dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
0275              lock->ml.convert_type);
0276         status = DLM_DENIED;
0277         goto bail;
0278     }
0279 
0280     if (lock->ml.type == type && lock->ml.convert_type == LKM_IVMODE) {
0281         mlog(0, "last convert request returned DLM_RECOVERING, but "
0282              "owner has already queued and sent ast to me. res %.*s, "
0283              "(cookie=%u:%llu, type=%d, conv=%d)\n",
0284              res->lockname.len, res->lockname.name,
0285              dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
0286              dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
0287              lock->ml.type, lock->ml.convert_type);
0288         status = DLM_NORMAL;
0289         goto bail;
0290     }
0291 
0292     res->state |= DLM_LOCK_RES_IN_PROGRESS;
0293     /* move lock to local convert queue */
0294     /* do not alter lock refcount.  switching lists. */
0295     list_move_tail(&lock->list, &res->converting);
0296     lock->convert_pending = 1;
0297     lock->ml.convert_type = type;
0298 
0299     if (flags & LKM_VALBLK) {
0300         if (lock->ml.type == LKM_EXMODE) {
0301             flags |= LKM_PUT_LVB;
0302             lock->lksb->flags |= DLM_LKSB_PUT_LVB;
0303         } else {
0304             if (lock->ml.convert_type == LKM_NLMODE)
0305                 flags &= ~LKM_VALBLK;
0306             else {
0307                 flags |= LKM_GET_LVB;
0308                 lock->lksb->flags |= DLM_LKSB_GET_LVB;
0309             }
0310         }
0311     }
0312     spin_unlock(&res->spinlock);
0313 
0314     /* no locks held here.
0315      * need to wait for a reply as to whether it got queued or not. */
0316     status = dlm_send_remote_convert_request(dlm, res, lock, flags, type);
0317 
0318     spin_lock(&res->spinlock);
0319     res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
0320     /* if it failed, move it back to granted queue.
0321      * if master returns DLM_NORMAL and then down before sending ast,
0322      * it may have already been moved to granted queue, reset to
0323      * DLM_RECOVERING and retry convert */
0324     if (status != DLM_NORMAL) {
0325         if (status != DLM_NOTQUEUED)
0326             dlm_error(status);
0327         dlm_revert_pending_convert(res, lock);
0328     } else if (!lock->convert_pending) {
0329         mlog(0, "%s: res %.*s, owner died and lock has been moved back "
0330                 "to granted list, retry convert.\n",
0331                 dlm->name, res->lockname.len, res->lockname.name);
0332         status = DLM_RECOVERING;
0333     }
0334 
0335     lock->convert_pending = 0;
0336 bail:
0337     spin_unlock(&res->spinlock);
0338 
0339     /* TODO: should this be a wake_one? */
0340     /* wake up any IN_PROGRESS waiters */
0341     wake_up(&res->wq);
0342 
0343     return status;
0344 }
0345 
0346 /* sends DLM_CONVERT_LOCK_MSG to master site
0347  * locking:
0348  *   caller needs:  none
0349  *   taken:         none
0350  *   held on exit:  none
0351  * returns: DLM_NOLOCKMGR, status from remote node
0352  */
0353 static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
0354                        struct dlm_lock_resource *res,
0355                        struct dlm_lock *lock, int flags, int type)
0356 {
0357     struct dlm_convert_lock convert;
0358     int tmpret;
0359     enum dlm_status ret;
0360     int status = 0;
0361     struct kvec vec[2];
0362     size_t veclen = 1;
0363 
0364     mlog(0, "%.*s\n", res->lockname.len, res->lockname.name);
0365 
0366     memset(&convert, 0, sizeof(struct dlm_convert_lock));
0367     convert.node_idx = dlm->node_num;
0368     convert.requested_type = type;
0369     convert.cookie = lock->ml.cookie;
0370     convert.namelen = res->lockname.len;
0371     convert.flags = cpu_to_be32(flags);
0372     memcpy(convert.name, res->lockname.name, convert.namelen);
0373 
0374     vec[0].iov_len = sizeof(struct dlm_convert_lock);
0375     vec[0].iov_base = &convert;
0376 
0377     if (flags & LKM_PUT_LVB) {
0378         /* extra data to send if we are updating lvb */
0379         vec[1].iov_len = DLM_LVB_LEN;
0380         vec[1].iov_base = lock->lksb->lvb;
0381         veclen++;
0382     }
0383 
0384     tmpret = o2net_send_message_vec(DLM_CONVERT_LOCK_MSG, dlm->key,
0385                     vec, veclen, res->owner, &status);
0386     if (tmpret >= 0) {
0387         // successfully sent and received
0388         ret = status;  // this is already a dlm_status
0389         if (ret == DLM_RECOVERING) {
0390             mlog(0, "node %u returned DLM_RECOVERING from convert "
0391                  "message!\n", res->owner);
0392         } else if (ret == DLM_MIGRATING) {
0393             mlog(0, "node %u returned DLM_MIGRATING from convert "
0394                  "message!\n", res->owner);
0395         } else if (ret == DLM_FORWARD) {
0396             mlog(0, "node %u returned DLM_FORWARD from convert "
0397                  "message!\n", res->owner);
0398         } else if (ret != DLM_NORMAL && ret != DLM_NOTQUEUED)
0399             dlm_error(ret);
0400     } else {
0401         mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
0402              "node %u\n", tmpret, DLM_CONVERT_LOCK_MSG, dlm->key,
0403              res->owner);
0404         if (dlm_is_host_down(tmpret)) {
0405             /* instead of logging the same network error over
0406              * and over, sleep here and wait for the heartbeat
0407              * to notice the node is dead.  times out after 5s. */
0408             dlm_wait_for_node_death(dlm, res->owner,
0409                         DLM_NODE_DEATH_WAIT_MAX);
0410             ret = DLM_RECOVERING;
0411             mlog(0, "node %u died so returning DLM_RECOVERING "
0412                  "from convert message!\n", res->owner);
0413         } else {
0414             ret = dlm_err_to_dlm_status(tmpret);
0415         }
0416     }
0417 
0418     return ret;
0419 }
0420 
0421 /* handler for DLM_CONVERT_LOCK_MSG on master site
0422  * locking:
0423  *   caller needs:  none
0424  *   taken:         takes and drop res->spinlock
0425  *   held on exit:  none
0426  * returns: DLM_NORMAL, DLM_IVLOCKID, DLM_BADARGS,
0427  *          status from __dlmconvert_master
0428  */
0429 int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
0430                  void **ret_data)
0431 {
0432     struct dlm_ctxt *dlm = data;
0433     struct dlm_convert_lock *cnv = (struct dlm_convert_lock *)msg->buf;
0434     struct dlm_lock_resource *res = NULL;
0435     struct dlm_lock *lock = NULL;
0436     struct dlm_lock *tmp_lock;
0437     struct dlm_lockstatus *lksb;
0438     enum dlm_status status = DLM_NORMAL;
0439     u32 flags;
0440     int call_ast = 0, kick_thread = 0, ast_reserved = 0, wake = 0;
0441 
0442     if (!dlm_grab(dlm)) {
0443         dlm_error(DLM_REJECTED);
0444         return DLM_REJECTED;
0445     }
0446 
0447     mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
0448             "Domain %s not fully joined!\n", dlm->name);
0449 
0450     if (cnv->namelen > DLM_LOCKID_NAME_MAX) {
0451         status = DLM_IVBUFLEN;
0452         dlm_error(status);
0453         goto leave;
0454     }
0455 
0456     flags = be32_to_cpu(cnv->flags);
0457 
0458     if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
0459          (LKM_PUT_LVB|LKM_GET_LVB)) {
0460         mlog(ML_ERROR, "both PUT and GET lvb specified\n");
0461         status = DLM_BADARGS;
0462         goto leave;
0463     }
0464 
0465     mlog(0, "lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" :
0466          (flags & LKM_GET_LVB ? "get lvb" : "none"));
0467 
0468     status = DLM_IVLOCKID;
0469     res = dlm_lookup_lockres(dlm, cnv->name, cnv->namelen);
0470     if (!res) {
0471         dlm_error(status);
0472         goto leave;
0473     }
0474 
0475     spin_lock(&res->spinlock);
0476     status = __dlm_lockres_state_to_status(res);
0477     if (status != DLM_NORMAL) {
0478         spin_unlock(&res->spinlock);
0479         dlm_error(status);
0480         goto leave;
0481     }
0482     list_for_each_entry(tmp_lock, &res->granted, list) {
0483         if (tmp_lock->ml.cookie == cnv->cookie &&
0484             tmp_lock->ml.node == cnv->node_idx) {
0485             lock = tmp_lock;
0486             dlm_lock_get(lock);
0487             break;
0488         }
0489     }
0490     spin_unlock(&res->spinlock);
0491     if (!lock) {
0492         status = DLM_IVLOCKID;
0493         mlog(ML_ERROR, "did not find lock to convert on grant queue! "
0494                    "cookie=%u:%llu\n",
0495              dlm_get_lock_cookie_node(be64_to_cpu(cnv->cookie)),
0496              dlm_get_lock_cookie_seq(be64_to_cpu(cnv->cookie)));
0497         dlm_print_one_lock_resource(res);
0498         goto leave;
0499     }
0500 
0501     /* found the lock */
0502     lksb = lock->lksb;
0503 
0504     /* see if caller needed to get/put lvb */
0505     if (flags & LKM_PUT_LVB) {
0506         BUG_ON(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
0507         lksb->flags |= DLM_LKSB_PUT_LVB;
0508         memcpy(&lksb->lvb[0], &cnv->lvb[0], DLM_LVB_LEN);
0509     } else if (flags & LKM_GET_LVB) {
0510         BUG_ON(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
0511         lksb->flags |= DLM_LKSB_GET_LVB;
0512     }
0513 
0514     spin_lock(&res->spinlock);
0515     status = __dlm_lockres_state_to_status(res);
0516     if (status == DLM_NORMAL) {
0517         __dlm_lockres_reserve_ast(res);
0518         ast_reserved = 1;
0519         res->state |= DLM_LOCK_RES_IN_PROGRESS;
0520         status = __dlmconvert_master(dlm, res, lock, flags,
0521                          cnv->requested_type,
0522                          &call_ast, &kick_thread);
0523         res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
0524         wake = 1;
0525     }
0526     spin_unlock(&res->spinlock);
0527     if (wake)
0528         wake_up(&res->wq);
0529 
0530     if (status != DLM_NORMAL) {
0531         if (status != DLM_NOTQUEUED)
0532             dlm_error(status);
0533         lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
0534     }
0535 
0536 leave:
0537     if (lock)
0538         dlm_lock_put(lock);
0539 
0540     /* either queue the ast or release it, if reserved */
0541     if (call_ast)
0542         dlm_queue_ast(dlm, lock);
0543     else if (ast_reserved)
0544         dlm_lockres_release_ast(dlm, res);
0545 
0546     if (kick_thread)
0547         dlm_kick_thread(dlm, res);
0548 
0549     if (res)
0550         dlm_lockres_put(res);
0551 
0552     dlm_put(dlm);
0553 
0554     return status;
0555 }