Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-or-later
0002 /*
0003  * dlmlock.c
0004  *
0005  * underlying calls for lock creation
0006  *
0007  * Copyright (C) 2004 Oracle.  All rights reserved.
0008  */
0009 
0010 
0011 #include <linux/module.h>
0012 #include <linux/fs.h>
0013 #include <linux/types.h>
0014 #include <linux/slab.h>
0015 #include <linux/highmem.h>
0016 #include <linux/init.h>
0017 #include <linux/sysctl.h>
0018 #include <linux/random.h>
0019 #include <linux/blkdev.h>
0020 #include <linux/socket.h>
0021 #include <linux/inet.h>
0022 #include <linux/spinlock.h>
0023 #include <linux/delay.h>
0024 
0025 
0026 #include "../cluster/heartbeat.h"
0027 #include "../cluster/nodemanager.h"
0028 #include "../cluster/tcp.h"
0029 
0030 #include "dlmapi.h"
0031 #include "dlmcommon.h"
0032 
0033 #include "dlmconvert.h"
0034 
0035 #define MLOG_MASK_PREFIX ML_DLM
0036 #include "../cluster/masklog.h"
0037 
0038 static struct kmem_cache *dlm_lock_cache;
0039 
0040 static DEFINE_SPINLOCK(dlm_cookie_lock);
0041 static u64 dlm_next_cookie = 1;
0042 
0043 static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
0044                            struct dlm_lock_resource *res,
0045                            struct dlm_lock *lock, int flags);
0046 static void dlm_init_lock(struct dlm_lock *newlock, int type,
0047               u8 node, u64 cookie);
0048 static void dlm_lock_release(struct kref *kref);
0049 static void dlm_lock_detach_lockres(struct dlm_lock *lock);
0050 
0051 int dlm_init_lock_cache(void)
0052 {
0053     dlm_lock_cache = kmem_cache_create("o2dlm_lock",
0054                        sizeof(struct dlm_lock),
0055                        0, SLAB_HWCACHE_ALIGN, NULL);
0056     if (dlm_lock_cache == NULL)
0057         return -ENOMEM;
0058     return 0;
0059 }
0060 
0061 void dlm_destroy_lock_cache(void)
0062 {
0063     kmem_cache_destroy(dlm_lock_cache);
0064 }
0065 
0066 /* Tell us whether we can grant a new lock request.
0067  * locking:
0068  *   caller needs:  res->spinlock
0069  *   taken:         none
0070  *   held on exit:  none
0071  * returns: 1 if the lock can be granted, 0 otherwise.
0072  */
0073 static int dlm_can_grant_new_lock(struct dlm_lock_resource *res,
0074                   struct dlm_lock *lock)
0075 {
0076     struct dlm_lock *tmplock;
0077 
0078     list_for_each_entry(tmplock, &res->granted, list) {
0079         if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
0080             return 0;
0081     }
0082 
0083     list_for_each_entry(tmplock, &res->converting, list) {
0084         if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
0085             return 0;
0086         if (!dlm_lock_compatible(tmplock->ml.convert_type,
0087                      lock->ml.type))
0088             return 0;
0089     }
0090 
0091     return 1;
0092 }
0093 
0094 /* performs lock creation at the lockres master site
0095  * locking:
0096  *   caller needs:  none
0097  *   taken:         takes and drops res->spinlock
0098  *   held on exit:  none
0099  * returns: DLM_NORMAL, DLM_NOTQUEUED
0100  */
0101 static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
0102                       struct dlm_lock_resource *res,
0103                       struct dlm_lock *lock, int flags)
0104 {
0105     int call_ast = 0, kick_thread = 0;
0106     enum dlm_status status = DLM_NORMAL;
0107 
0108     mlog(0, "type=%d\n", lock->ml.type);
0109 
0110     spin_lock(&res->spinlock);
0111     /* if called from dlm_create_lock_handler, need to
0112      * ensure it will not sleep in dlm_wait_on_lockres */
0113     status = __dlm_lockres_state_to_status(res);
0114     if (status != DLM_NORMAL &&
0115         lock->ml.node != dlm->node_num) {
0116         /* erf.  state changed after lock was dropped. */
0117         spin_unlock(&res->spinlock);
0118         dlm_error(status);
0119         return status;
0120     }
0121     __dlm_wait_on_lockres(res);
0122     __dlm_lockres_reserve_ast(res);
0123 
0124     if (dlm_can_grant_new_lock(res, lock)) {
0125         mlog(0, "I can grant this lock right away\n");
0126         /* got it right away */
0127         lock->lksb->status = DLM_NORMAL;
0128         status = DLM_NORMAL;
0129         dlm_lock_get(lock);
0130         list_add_tail(&lock->list, &res->granted);
0131 
0132         /* for the recovery lock, we can't allow the ast
0133          * to be queued since the dlmthread is already
0134          * frozen.  but the recovery lock is always locked
0135          * with LKM_NOQUEUE so we do not need the ast in
0136          * this special case */
0137         if (!dlm_is_recovery_lock(res->lockname.name,
0138                       res->lockname.len)) {
0139             kick_thread = 1;
0140             call_ast = 1;
0141         } else {
0142             mlog(0, "%s: returning DLM_NORMAL to "
0143                  "node %u for reco lock\n", dlm->name,
0144                  lock->ml.node);
0145         }
0146     } else {
0147         /* for NOQUEUE request, unless we get the
0148          * lock right away, return DLM_NOTQUEUED */
0149         if (flags & LKM_NOQUEUE) {
0150             status = DLM_NOTQUEUED;
0151             if (dlm_is_recovery_lock(res->lockname.name,
0152                          res->lockname.len)) {
0153                 mlog(0, "%s: returning NOTQUEUED to "
0154                      "node %u for reco lock\n", dlm->name,
0155                      lock->ml.node);
0156             }
0157         } else {
0158             status = DLM_NORMAL;
0159             dlm_lock_get(lock);
0160             list_add_tail(&lock->list, &res->blocked);
0161             kick_thread = 1;
0162         }
0163     }
0164 
0165     spin_unlock(&res->spinlock);
0166     wake_up(&res->wq);
0167 
0168     /* either queue the ast or release it */
0169     if (call_ast)
0170         dlm_queue_ast(dlm, lock);
0171     else
0172         dlm_lockres_release_ast(dlm, res);
0173 
0174     dlm_lockres_calc_usage(dlm, res);
0175     if (kick_thread)
0176         dlm_kick_thread(dlm, res);
0177 
0178     return status;
0179 }
0180 
0181 void dlm_revert_pending_lock(struct dlm_lock_resource *res,
0182                  struct dlm_lock *lock)
0183 {
0184     /* remove from local queue if it failed */
0185     list_del_init(&lock->list);
0186     lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
0187 }
0188 
0189 
0190 /*
0191  * locking:
0192  *   caller needs:  none
0193  *   taken:         takes and drops res->spinlock
0194  *   held on exit:  none
0195  * returns: DLM_DENIED, DLM_RECOVERING, or net status
0196  */
0197 static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
0198                       struct dlm_lock_resource *res,
0199                       struct dlm_lock *lock, int flags)
0200 {
0201     enum dlm_status status = DLM_DENIED;
0202     int lockres_changed = 1;
0203 
0204     mlog(0, "type=%d, lockres %.*s, flags = 0x%x\n",
0205          lock->ml.type, res->lockname.len,
0206          res->lockname.name, flags);
0207 
0208     /*
0209      * Wait if resource is getting recovered, remastered, etc.
0210      * If the resource was remastered and new owner is self, then exit.
0211      */
0212     spin_lock(&res->spinlock);
0213     __dlm_wait_on_lockres(res);
0214     if (res->owner == dlm->node_num) {
0215         spin_unlock(&res->spinlock);
0216         return DLM_RECOVERING;
0217     }
0218     res->state |= DLM_LOCK_RES_IN_PROGRESS;
0219 
0220     /* add lock to local (secondary) queue */
0221     dlm_lock_get(lock);
0222     list_add_tail(&lock->list, &res->blocked);
0223     lock->lock_pending = 1;
0224     spin_unlock(&res->spinlock);
0225 
0226     /* spec seems to say that you will get DLM_NORMAL when the lock
0227      * has been queued, meaning we need to wait for a reply here. */
0228     status = dlm_send_remote_lock_request(dlm, res, lock, flags);
0229 
0230     spin_lock(&res->spinlock);
0231     res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
0232     lock->lock_pending = 0;
0233     if (status != DLM_NORMAL) {
0234         if (status == DLM_RECOVERING &&
0235             dlm_is_recovery_lock(res->lockname.name,
0236                      res->lockname.len)) {
0237             /* recovery lock was mastered by dead node.
0238              * we need to have calc_usage shoot down this
0239              * lockres and completely remaster it. */
0240             mlog(0, "%s: recovery lock was owned by "
0241                  "dead node %u, remaster it now.\n",
0242                  dlm->name, res->owner);
0243         } else if (status != DLM_NOTQUEUED) {
0244             /*
0245              * DO NOT call calc_usage, as this would unhash
0246              * the remote lockres before we ever get to use
0247              * it.  treat as if we never made any change to
0248              * the lockres.
0249              */
0250             lockres_changed = 0;
0251             dlm_error(status);
0252         }
0253         dlm_revert_pending_lock(res, lock);
0254         dlm_lock_put(lock);
0255     } else if (dlm_is_recovery_lock(res->lockname.name,
0256                     res->lockname.len)) {
0257         /* special case for the $RECOVERY lock.
0258          * there will never be an AST delivered to put
0259          * this lock on the proper secondary queue
0260          * (granted), so do it manually. */
0261         mlog(0, "%s: $RECOVERY lock for this node (%u) is "
0262              "mastered by %u; got lock, manually granting (no ast)\n",
0263              dlm->name, dlm->node_num, res->owner);
0264         list_move_tail(&lock->list, &res->granted);
0265     }
0266     spin_unlock(&res->spinlock);
0267 
0268     if (lockres_changed)
0269         dlm_lockres_calc_usage(dlm, res);
0270 
0271     wake_up(&res->wq);
0272     return status;
0273 }
0274 
0275 
0276 /* for remote lock creation.
0277  * locking:
0278  *   caller needs:  none, but need res->state & DLM_LOCK_RES_IN_PROGRESS
0279  *   taken:         none
0280  *   held on exit:  none
0281  * returns: DLM_NOLOCKMGR, or net status
0282  */
0283 static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
0284                            struct dlm_lock_resource *res,
0285                            struct dlm_lock *lock, int flags)
0286 {
0287     struct dlm_create_lock create;
0288     int tmpret, status = 0;
0289     enum dlm_status ret;
0290 
0291     memset(&create, 0, sizeof(create));
0292     create.node_idx = dlm->node_num;
0293     create.requested_type = lock->ml.type;
0294     create.cookie = lock->ml.cookie;
0295     create.namelen = res->lockname.len;
0296     create.flags = cpu_to_be32(flags);
0297     memcpy(create.name, res->lockname.name, create.namelen);
0298 
0299     tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create,
0300                     sizeof(create), res->owner, &status);
0301     if (tmpret >= 0) {
0302         ret = status;
0303         if (ret == DLM_REJECTED) {
0304             mlog(ML_ERROR, "%s: res %.*s, Stale lockres no longer "
0305                  "owned by node %u. That node is coming back up "
0306                  "currently.\n", dlm->name, create.namelen,
0307                  create.name, res->owner);
0308             dlm_print_one_lock_resource(res);
0309             BUG();
0310         }
0311     } else {
0312         mlog(ML_ERROR, "%s: res %.*s, Error %d send CREATE LOCK to "
0313              "node %u\n", dlm->name, create.namelen, create.name,
0314              tmpret, res->owner);
0315         if (dlm_is_host_down(tmpret))
0316             ret = DLM_RECOVERING;
0317         else
0318             ret = dlm_err_to_dlm_status(tmpret);
0319     }
0320 
0321     return ret;
0322 }
0323 
0324 void dlm_lock_get(struct dlm_lock *lock)
0325 {
0326     kref_get(&lock->lock_refs);
0327 }
0328 
0329 void dlm_lock_put(struct dlm_lock *lock)
0330 {
0331     kref_put(&lock->lock_refs, dlm_lock_release);
0332 }
0333 
0334 static void dlm_lock_release(struct kref *kref)
0335 {
0336     struct dlm_lock *lock;
0337 
0338     lock = container_of(kref, struct dlm_lock, lock_refs);
0339 
0340     BUG_ON(!list_empty(&lock->list));
0341     BUG_ON(!list_empty(&lock->ast_list));
0342     BUG_ON(!list_empty(&lock->bast_list));
0343     BUG_ON(lock->ast_pending);
0344     BUG_ON(lock->bast_pending);
0345 
0346     dlm_lock_detach_lockres(lock);
0347 
0348     if (lock->lksb_kernel_allocated) {
0349         mlog(0, "freeing kernel-allocated lksb\n");
0350         kfree(lock->lksb);
0351     }
0352     kmem_cache_free(dlm_lock_cache, lock);
0353 }
0354 
0355 /* associate a lock with it's lockres, getting a ref on the lockres */
0356 void dlm_lock_attach_lockres(struct dlm_lock *lock,
0357                  struct dlm_lock_resource *res)
0358 {
0359     dlm_lockres_get(res);
0360     lock->lockres = res;
0361 }
0362 
0363 /* drop ref on lockres, if there is still one associated with lock */
0364 static void dlm_lock_detach_lockres(struct dlm_lock *lock)
0365 {
0366     struct dlm_lock_resource *res;
0367 
0368     res = lock->lockres;
0369     if (res) {
0370         lock->lockres = NULL;
0371         mlog(0, "removing lock's lockres reference\n");
0372         dlm_lockres_put(res);
0373     }
0374 }
0375 
0376 static void dlm_init_lock(struct dlm_lock *newlock, int type,
0377               u8 node, u64 cookie)
0378 {
0379     INIT_LIST_HEAD(&newlock->list);
0380     INIT_LIST_HEAD(&newlock->ast_list);
0381     INIT_LIST_HEAD(&newlock->bast_list);
0382     spin_lock_init(&newlock->spinlock);
0383     newlock->ml.type = type;
0384     newlock->ml.convert_type = LKM_IVMODE;
0385     newlock->ml.highest_blocked = LKM_IVMODE;
0386     newlock->ml.node = node;
0387     newlock->ml.pad1 = 0;
0388     newlock->ml.list = 0;
0389     newlock->ml.flags = 0;
0390     newlock->ast = NULL;
0391     newlock->bast = NULL;
0392     newlock->astdata = NULL;
0393     newlock->ml.cookie = cpu_to_be64(cookie);
0394     newlock->ast_pending = 0;
0395     newlock->bast_pending = 0;
0396     newlock->convert_pending = 0;
0397     newlock->lock_pending = 0;
0398     newlock->unlock_pending = 0;
0399     newlock->cancel_pending = 0;
0400     newlock->lksb_kernel_allocated = 0;
0401 
0402     kref_init(&newlock->lock_refs);
0403 }
0404 
0405 struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
0406                    struct dlm_lockstatus *lksb)
0407 {
0408     struct dlm_lock *lock;
0409     int kernel_allocated = 0;
0410 
0411     lock = kmem_cache_zalloc(dlm_lock_cache, GFP_NOFS);
0412     if (!lock)
0413         return NULL;
0414 
0415     if (!lksb) {
0416         /* zero memory only if kernel-allocated */
0417         lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
0418         if (!lksb) {
0419             kmem_cache_free(dlm_lock_cache, lock);
0420             return NULL;
0421         }
0422         kernel_allocated = 1;
0423     }
0424 
0425     dlm_init_lock(lock, type, node, cookie);
0426     if (kernel_allocated)
0427         lock->lksb_kernel_allocated = 1;
0428     lock->lksb = lksb;
0429     lksb->lockid = lock;
0430     return lock;
0431 }
0432 
0433 /* handler for lock creation net message
0434  * locking:
0435  *   caller needs:  none
0436  *   taken:         takes and drops res->spinlock
0437  *   held on exit:  none
0438  * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED
0439  */
0440 int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
0441                 void **ret_data)
0442 {
0443     struct dlm_ctxt *dlm = data;
0444     struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf;
0445     struct dlm_lock_resource *res = NULL;
0446     struct dlm_lock *newlock = NULL;
0447     struct dlm_lockstatus *lksb = NULL;
0448     enum dlm_status status = DLM_NORMAL;
0449     char *name;
0450     unsigned int namelen;
0451 
0452     BUG_ON(!dlm);
0453 
0454     if (!dlm_grab(dlm))
0455         return DLM_REJECTED;
0456 
0457     name = create->name;
0458     namelen = create->namelen;
0459     status = DLM_REJECTED;
0460     if (!dlm_domain_fully_joined(dlm)) {
0461         mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
0462              "sending a create_lock message for lock %.*s!\n",
0463              dlm->name, create->node_idx, namelen, name);
0464         dlm_error(status);
0465         goto leave;
0466     }
0467 
0468     status = DLM_IVBUFLEN;
0469     if (namelen > DLM_LOCKID_NAME_MAX) {
0470         dlm_error(status);
0471         goto leave;
0472     }
0473 
0474     status = DLM_SYSERR;
0475     newlock = dlm_new_lock(create->requested_type,
0476                    create->node_idx,
0477                    be64_to_cpu(create->cookie), NULL);
0478     if (!newlock) {
0479         dlm_error(status);
0480         goto leave;
0481     }
0482 
0483     lksb = newlock->lksb;
0484 
0485     if (be32_to_cpu(create->flags) & LKM_GET_LVB) {
0486         lksb->flags |= DLM_LKSB_GET_LVB;
0487         mlog(0, "set DLM_LKSB_GET_LVB flag\n");
0488     }
0489 
0490     status = DLM_IVLOCKID;
0491     res = dlm_lookup_lockres(dlm, name, namelen);
0492     if (!res) {
0493         dlm_error(status);
0494         goto leave;
0495     }
0496 
0497     spin_lock(&res->spinlock);
0498     status = __dlm_lockres_state_to_status(res);
0499     spin_unlock(&res->spinlock);
0500 
0501     if (status != DLM_NORMAL) {
0502         mlog(0, "lockres recovering/migrating/in-progress\n");
0503         goto leave;
0504     }
0505 
0506     dlm_lock_attach_lockres(newlock, res);
0507 
0508     status = dlmlock_master(dlm, res, newlock, be32_to_cpu(create->flags));
0509 leave:
0510     if (status != DLM_NORMAL)
0511         if (newlock)
0512             dlm_lock_put(newlock);
0513 
0514     if (res)
0515         dlm_lockres_put(res);
0516 
0517     dlm_put(dlm);
0518 
0519     return status;
0520 }
0521 
0522 
0523 /* fetch next node-local (u8 nodenum + u56 cookie) into u64 */
0524 static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie)
0525 {
0526     u64 tmpnode = node_num;
0527 
0528     /* shift single byte of node num into top 8 bits */
0529     tmpnode <<= 56;
0530 
0531     spin_lock(&dlm_cookie_lock);
0532     *cookie = (dlm_next_cookie | tmpnode);
0533     if (++dlm_next_cookie & 0xff00000000000000ull) {
0534         mlog(0, "This node's cookie will now wrap!\n");
0535         dlm_next_cookie = 1;
0536     }
0537     spin_unlock(&dlm_cookie_lock);
0538 }
0539 
0540 enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
0541             struct dlm_lockstatus *lksb, int flags,
0542             const char *name, int namelen, dlm_astlockfunc_t *ast,
0543             void *data, dlm_bastlockfunc_t *bast)
0544 {
0545     enum dlm_status status;
0546     struct dlm_lock_resource *res = NULL;
0547     struct dlm_lock *lock = NULL;
0548     int convert = 0, recovery = 0;
0549 
0550     /* yes this function is a mess.
0551      * TODO: clean this up.  lots of common code in the
0552      *       lock and convert paths, especially in the retry blocks */
0553     if (!lksb) {
0554         dlm_error(DLM_BADARGS);
0555         return DLM_BADARGS;
0556     }
0557 
0558     status = DLM_BADPARAM;
0559     if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE) {
0560         dlm_error(status);
0561         goto error;
0562     }
0563 
0564     if (flags & ~LKM_VALID_FLAGS) {
0565         dlm_error(status);
0566         goto error;
0567     }
0568 
0569     convert = (flags & LKM_CONVERT);
0570     recovery = (flags & LKM_RECOVERY);
0571 
0572     if (recovery &&
0573         (!dlm_is_recovery_lock(name, namelen) || convert) ) {
0574         dlm_error(status);
0575         goto error;
0576     }
0577     if (convert && (flags & LKM_LOCAL)) {
0578         mlog(ML_ERROR, "strange LOCAL convert request!\n");
0579         goto error;
0580     }
0581 
0582     if (convert) {
0583         /* CONVERT request */
0584 
0585         /* if converting, must pass in a valid dlm_lock */
0586         lock = lksb->lockid;
0587         if (!lock) {
0588             mlog(ML_ERROR, "NULL lock pointer in convert "
0589                  "request\n");
0590             goto error;
0591         }
0592 
0593         res = lock->lockres;
0594         if (!res) {
0595             mlog(ML_ERROR, "NULL lockres pointer in convert "
0596                  "request\n");
0597             goto error;
0598         }
0599         dlm_lockres_get(res);
0600 
0601         /* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are
0602          * static after the original lock call.  convert requests will
0603          * ensure that everything is the same, or return DLM_BADARGS.
0604          * this means that DLM_DENIED_NOASTS will never be returned.
0605          */
0606         if (lock->lksb != lksb || lock->ast != ast ||
0607             lock->bast != bast || lock->astdata != data) {
0608             status = DLM_BADARGS;
0609             mlog(ML_ERROR, "new args:  lksb=%p, ast=%p, bast=%p, "
0610                  "astdata=%p\n", lksb, ast, bast, data);
0611             mlog(ML_ERROR, "orig args: lksb=%p, ast=%p, bast=%p, "
0612                  "astdata=%p\n", lock->lksb, lock->ast,
0613                  lock->bast, lock->astdata);
0614             goto error;
0615         }
0616 retry_convert:
0617         dlm_wait_for_recovery(dlm);
0618 
0619         if (res->owner == dlm->node_num)
0620             status = dlmconvert_master(dlm, res, lock, flags, mode);
0621         else
0622             status = dlmconvert_remote(dlm, res, lock, flags, mode);
0623         if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
0624             status == DLM_FORWARD) {
0625             /* for now, see how this works without sleeping
0626              * and just retry right away.  I suspect the reco
0627              * or migration will complete fast enough that
0628              * no waiting will be necessary */
0629             mlog(0, "retrying convert with migration/recovery/"
0630                  "in-progress\n");
0631             msleep(100);
0632             goto retry_convert;
0633         }
0634     } else {
0635         u64 tmpcookie;
0636 
0637         /* LOCK request */
0638         status = DLM_BADARGS;
0639         if (!name) {
0640             dlm_error(status);
0641             goto error;
0642         }
0643 
0644         status = DLM_IVBUFLEN;
0645         if (namelen > DLM_LOCKID_NAME_MAX || namelen < 1) {
0646             dlm_error(status);
0647             goto error;
0648         }
0649 
0650         dlm_get_next_cookie(dlm->node_num, &tmpcookie);
0651         lock = dlm_new_lock(mode, dlm->node_num, tmpcookie, lksb);
0652         if (!lock) {
0653             dlm_error(status);
0654             goto error;
0655         }
0656 
0657         if (!recovery)
0658             dlm_wait_for_recovery(dlm);
0659 
0660         /* find or create the lock resource */
0661         res = dlm_get_lock_resource(dlm, name, namelen, flags);
0662         if (!res) {
0663             status = DLM_IVLOCKID;
0664             dlm_error(status);
0665             goto error;
0666         }
0667 
0668         mlog(0, "type=%d, flags = 0x%x\n", mode, flags);
0669         mlog(0, "creating lock: lock=%p res=%p\n", lock, res);
0670 
0671         dlm_lock_attach_lockres(lock, res);
0672         lock->ast = ast;
0673         lock->bast = bast;
0674         lock->astdata = data;
0675 
0676 retry_lock:
0677         if (flags & LKM_VALBLK) {
0678             mlog(0, "LKM_VALBLK passed by caller\n");
0679 
0680             /* LVB requests for non PR, PW or EX locks are
0681              * ignored. */
0682             if (mode < LKM_PRMODE)
0683                 flags &= ~LKM_VALBLK;
0684             else {
0685                 flags |= LKM_GET_LVB;
0686                 lock->lksb->flags |= DLM_LKSB_GET_LVB;
0687             }
0688         }
0689 
0690         if (res->owner == dlm->node_num)
0691             status = dlmlock_master(dlm, res, lock, flags);
0692         else
0693             status = dlmlock_remote(dlm, res, lock, flags);
0694 
0695         if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
0696             status == DLM_FORWARD) {
0697             msleep(100);
0698             if (recovery) {
0699                 if (status != DLM_RECOVERING)
0700                     goto retry_lock;
0701                 /* wait to see the node go down, then
0702                  * drop down and allow the lockres to
0703                  * get cleaned up.  need to remaster. */
0704                 dlm_wait_for_node_death(dlm, res->owner,
0705                         DLM_NODE_DEATH_WAIT_MAX);
0706             } else {
0707                 dlm_wait_for_recovery(dlm);
0708                 goto retry_lock;
0709             }
0710         }
0711 
0712         /* Inflight taken in dlm_get_lock_resource() is dropped here */
0713         spin_lock(&res->spinlock);
0714         dlm_lockres_drop_inflight_ref(dlm, res);
0715         spin_unlock(&res->spinlock);
0716 
0717         dlm_lockres_calc_usage(dlm, res);
0718         dlm_kick_thread(dlm, res);
0719 
0720         if (status != DLM_NORMAL) {
0721             lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
0722             if (status != DLM_NOTQUEUED)
0723                 dlm_error(status);
0724             goto error;
0725         }
0726     }
0727 
0728 error:
0729     if (status != DLM_NORMAL) {
0730         if (lock && !convert)
0731             dlm_lock_put(lock);
0732         // this is kind of unnecessary
0733         lksb->status = status;
0734     }
0735 
0736     /* put lockres ref from the convert path
0737      * or from dlm_get_lock_resource */
0738     if (res)
0739         dlm_lockres_put(res);
0740 
0741     return status;
0742 }
0743 EXPORT_SYMBOL_GPL(dlmlock);