Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-or-later
0002 /*
0003  * userdlm.c
0004  *
0005  * Code which implements the kernel side of a minimal userspace
0006  * interface to our DLM.
0007  *
0008  * Many of the functions here are pared down versions of dlmglue.c
0009  * functions.
0010  *
0011  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
0012  */
0013 
0014 #include <linux/signal.h>
0015 #include <linux/sched/signal.h>
0016 
0017 #include <linux/module.h>
0018 #include <linux/fs.h>
0019 #include <linux/types.h>
0020 #include <linux/crc32.h>
0021 
0022 #include "../ocfs2_lockingver.h"
0023 #include "../stackglue.h"
0024 #include "userdlm.h"
0025 
0026 #define MLOG_MASK_PREFIX ML_DLMFS
0027 #include "../cluster/masklog.h"
0028 
0029 
0030 static inline struct user_lock_res *user_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
0031 {
0032     return container_of(lksb, struct user_lock_res, l_lksb);
0033 }
0034 
0035 static inline int user_check_wait_flag(struct user_lock_res *lockres,
0036                        int flag)
0037 {
0038     int ret;
0039 
0040     spin_lock(&lockres->l_lock);
0041     ret = lockres->l_flags & flag;
0042     spin_unlock(&lockres->l_lock);
0043 
0044     return ret;
0045 }
0046 
0047 static inline void user_wait_on_busy_lock(struct user_lock_res *lockres)
0048 
0049 {
0050     wait_event(lockres->l_event,
0051            !user_check_wait_flag(lockres, USER_LOCK_BUSY));
0052 }
0053 
0054 static inline void user_wait_on_blocked_lock(struct user_lock_res *lockres)
0055 
0056 {
0057     wait_event(lockres->l_event,
0058            !user_check_wait_flag(lockres, USER_LOCK_BLOCKED));
0059 }
0060 
0061 /* I heart container_of... */
0062 static inline struct ocfs2_cluster_connection *
0063 cluster_connection_from_user_lockres(struct user_lock_res *lockres)
0064 {
0065     struct dlmfs_inode_private *ip;
0066 
0067     ip = container_of(lockres,
0068               struct dlmfs_inode_private,
0069               ip_lockres);
0070     return ip->ip_conn;
0071 }
0072 
0073 static struct inode *
0074 user_dlm_inode_from_user_lockres(struct user_lock_res *lockres)
0075 {
0076     struct dlmfs_inode_private *ip;
0077 
0078     ip = container_of(lockres,
0079               struct dlmfs_inode_private,
0080               ip_lockres);
0081     return &ip->ip_vfs_inode;
0082 }
0083 
0084 static inline void user_recover_from_dlm_error(struct user_lock_res *lockres)
0085 {
0086     spin_lock(&lockres->l_lock);
0087     lockres->l_flags &= ~USER_LOCK_BUSY;
0088     spin_unlock(&lockres->l_lock);
0089 }
0090 
0091 #define user_log_dlm_error(_func, _stat, _lockres) do {         \
0092     mlog(ML_ERROR, "Dlm error %d while calling %s on "      \
0093         "resource %.*s\n", _stat, _func,            \
0094         _lockres->l_namelen, _lockres->l_name);         \
0095 } while (0)
0096 
0097 /* WARNING: This function lives in a world where the only three lock
0098  * levels are EX, PR, and NL. It *will* have to be adjusted when more
0099  * lock types are added. */
0100 static inline int user_highest_compat_lock_level(int level)
0101 {
0102     int new_level = DLM_LOCK_EX;
0103 
0104     if (level == DLM_LOCK_EX)
0105         new_level = DLM_LOCK_NL;
0106     else if (level == DLM_LOCK_PR)
0107         new_level = DLM_LOCK_PR;
0108     return new_level;
0109 }
0110 
0111 static void user_ast(struct ocfs2_dlm_lksb *lksb)
0112 {
0113     struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
0114     int status;
0115 
0116     mlog(ML_BASTS, "AST fired for lockres %.*s, level %d => %d\n",
0117          lockres->l_namelen, lockres->l_name, lockres->l_level,
0118          lockres->l_requested);
0119 
0120     spin_lock(&lockres->l_lock);
0121 
0122     status = ocfs2_dlm_lock_status(&lockres->l_lksb);
0123     if (status) {
0124         mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n",
0125              status, lockres->l_namelen, lockres->l_name);
0126         spin_unlock(&lockres->l_lock);
0127         return;
0128     }
0129 
0130     mlog_bug_on_msg(lockres->l_requested == DLM_LOCK_IV,
0131             "Lockres %.*s, requested ivmode. flags 0x%x\n",
0132             lockres->l_namelen, lockres->l_name, lockres->l_flags);
0133 
0134     /* we're downconverting. */
0135     if (lockres->l_requested < lockres->l_level) {
0136         if (lockres->l_requested <=
0137             user_highest_compat_lock_level(lockres->l_blocking)) {
0138             lockres->l_blocking = DLM_LOCK_NL;
0139             lockres->l_flags &= ~USER_LOCK_BLOCKED;
0140         }
0141     }
0142 
0143     lockres->l_level = lockres->l_requested;
0144     lockres->l_requested = DLM_LOCK_IV;
0145     lockres->l_flags |= USER_LOCK_ATTACHED;
0146     lockres->l_flags &= ~USER_LOCK_BUSY;
0147 
0148     spin_unlock(&lockres->l_lock);
0149 
0150     wake_up(&lockres->l_event);
0151 }
0152 
0153 static inline void user_dlm_grab_inode_ref(struct user_lock_res *lockres)
0154 {
0155     struct inode *inode;
0156     inode = user_dlm_inode_from_user_lockres(lockres);
0157     if (!igrab(inode))
0158         BUG();
0159 }
0160 
0161 static void user_dlm_unblock_lock(struct work_struct *work);
0162 
0163 static void __user_dlm_queue_lockres(struct user_lock_res *lockres)
0164 {
0165     if (!(lockres->l_flags & USER_LOCK_QUEUED)) {
0166         user_dlm_grab_inode_ref(lockres);
0167 
0168         INIT_WORK(&lockres->l_work, user_dlm_unblock_lock);
0169 
0170         queue_work(user_dlm_worker, &lockres->l_work);
0171         lockres->l_flags |= USER_LOCK_QUEUED;
0172     }
0173 }
0174 
0175 static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres)
0176 {
0177     int queue = 0;
0178 
0179     if (!(lockres->l_flags & USER_LOCK_BLOCKED))
0180         return;
0181 
0182     switch (lockres->l_blocking) {
0183     case DLM_LOCK_EX:
0184         if (!lockres->l_ex_holders && !lockres->l_ro_holders)
0185             queue = 1;
0186         break;
0187     case DLM_LOCK_PR:
0188         if (!lockres->l_ex_holders)
0189             queue = 1;
0190         break;
0191     default:
0192         BUG();
0193     }
0194 
0195     if (queue)
0196         __user_dlm_queue_lockres(lockres);
0197 }
0198 
0199 static void user_bast(struct ocfs2_dlm_lksb *lksb, int level)
0200 {
0201     struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
0202 
0203     mlog(ML_BASTS, "BAST fired for lockres %.*s, blocking %d, level %d\n",
0204          lockres->l_namelen, lockres->l_name, level, lockres->l_level);
0205 
0206     spin_lock(&lockres->l_lock);
0207     lockres->l_flags |= USER_LOCK_BLOCKED;
0208     if (level > lockres->l_blocking)
0209         lockres->l_blocking = level;
0210 
0211     __user_dlm_queue_lockres(lockres);
0212     spin_unlock(&lockres->l_lock);
0213 
0214     wake_up(&lockres->l_event);
0215 }
0216 
0217 static void user_unlock_ast(struct ocfs2_dlm_lksb *lksb, int status)
0218 {
0219     struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
0220 
0221     mlog(ML_BASTS, "UNLOCK AST fired for lockres %.*s, flags 0x%x\n",
0222          lockres->l_namelen, lockres->l_name, lockres->l_flags);
0223 
0224     if (status)
0225         mlog(ML_ERROR, "dlm returns status %d\n", status);
0226 
0227     spin_lock(&lockres->l_lock);
0228     /* The teardown flag gets set early during the unlock process,
0229      * so test the cancel flag to make sure that this ast isn't
0230      * for a concurrent cancel. */
0231     if (lockres->l_flags & USER_LOCK_IN_TEARDOWN
0232         && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) {
0233         lockres->l_level = DLM_LOCK_IV;
0234     } else if (status == DLM_CANCELGRANT) {
0235         /* We tried to cancel a convert request, but it was
0236          * already granted. Don't clear the busy flag - the
0237          * ast should've done this already. */
0238         BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
0239         lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
0240         goto out_noclear;
0241     } else {
0242         BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
0243         /* Cancel succeeded, we want to re-queue */
0244         lockres->l_requested = DLM_LOCK_IV; /* cancel an
0245                             * upconvert
0246                             * request. */
0247         lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
0248         /* we want the unblock thread to look at it again
0249          * now. */
0250         if (lockres->l_flags & USER_LOCK_BLOCKED)
0251             __user_dlm_queue_lockres(lockres);
0252     }
0253 
0254     lockres->l_flags &= ~USER_LOCK_BUSY;
0255 out_noclear:
0256     spin_unlock(&lockres->l_lock);
0257 
0258     wake_up(&lockres->l_event);
0259 }
0260 
0261 /*
0262  * This is the userdlmfs locking protocol version.
0263  *
0264  * See fs/ocfs2/dlmglue.c for more details on locking versions.
0265  */
0266 static struct ocfs2_locking_protocol user_dlm_lproto = {
0267     .lp_max_version = {
0268         .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
0269         .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
0270     },
0271     .lp_lock_ast        = user_ast,
0272     .lp_blocking_ast    = user_bast,
0273     .lp_unlock_ast      = user_unlock_ast,
0274 };
0275 
0276 static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres)
0277 {
0278     struct inode *inode;
0279     inode = user_dlm_inode_from_user_lockres(lockres);
0280     iput(inode);
0281 }
0282 
0283 static void user_dlm_unblock_lock(struct work_struct *work)
0284 {
0285     int new_level, status;
0286     struct user_lock_res *lockres =
0287         container_of(work, struct user_lock_res, l_work);
0288     struct ocfs2_cluster_connection *conn =
0289         cluster_connection_from_user_lockres(lockres);
0290 
0291     mlog(0, "lockres %.*s\n", lockres->l_namelen, lockres->l_name);
0292 
0293     spin_lock(&lockres->l_lock);
0294 
0295     mlog_bug_on_msg(!(lockres->l_flags & USER_LOCK_QUEUED),
0296             "Lockres %.*s, flags 0x%x\n",
0297             lockres->l_namelen, lockres->l_name, lockres->l_flags);
0298 
0299     /* notice that we don't clear USER_LOCK_BLOCKED here. If it's
0300      * set, we want user_ast clear it. */
0301     lockres->l_flags &= ~USER_LOCK_QUEUED;
0302 
0303     /* It's valid to get here and no longer be blocked - if we get
0304      * several basts in a row, we might be queued by the first
0305      * one, the unblock thread might run and clear the queued
0306      * flag, and finally we might get another bast which re-queues
0307      * us before our ast for the downconvert is called. */
0308     if (!(lockres->l_flags & USER_LOCK_BLOCKED)) {
0309         mlog(ML_BASTS, "lockres %.*s USER_LOCK_BLOCKED\n",
0310              lockres->l_namelen, lockres->l_name);
0311         spin_unlock(&lockres->l_lock);
0312         goto drop_ref;
0313     }
0314 
0315     if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
0316         mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_TEARDOWN\n",
0317              lockres->l_namelen, lockres->l_name);
0318         spin_unlock(&lockres->l_lock);
0319         goto drop_ref;
0320     }
0321 
0322     if (lockres->l_flags & USER_LOCK_BUSY) {
0323         if (lockres->l_flags & USER_LOCK_IN_CANCEL) {
0324             mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_CANCEL\n",
0325                  lockres->l_namelen, lockres->l_name);
0326             spin_unlock(&lockres->l_lock);
0327             goto drop_ref;
0328         }
0329 
0330         lockres->l_flags |= USER_LOCK_IN_CANCEL;
0331         spin_unlock(&lockres->l_lock);
0332 
0333         status = ocfs2_dlm_unlock(conn, &lockres->l_lksb,
0334                       DLM_LKF_CANCEL);
0335         if (status)
0336             user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
0337         goto drop_ref;
0338     }
0339 
0340     /* If there are still incompat holders, we can exit safely
0341      * without worrying about re-queueing this lock as that will
0342      * happen on the last call to user_cluster_unlock. */
0343     if ((lockres->l_blocking == DLM_LOCK_EX)
0344         && (lockres->l_ex_holders || lockres->l_ro_holders)) {
0345         spin_unlock(&lockres->l_lock);
0346         mlog(ML_BASTS, "lockres %.*s, EX/PR Holders %u,%u\n",
0347              lockres->l_namelen, lockres->l_name,
0348              lockres->l_ex_holders, lockres->l_ro_holders);
0349         goto drop_ref;
0350     }
0351 
0352     if ((lockres->l_blocking == DLM_LOCK_PR)
0353         && lockres->l_ex_holders) {
0354         spin_unlock(&lockres->l_lock);
0355         mlog(ML_BASTS, "lockres %.*s, EX Holders %u\n",
0356              lockres->l_namelen, lockres->l_name,
0357              lockres->l_ex_holders);
0358         goto drop_ref;
0359     }
0360 
0361     /* yay, we can downconvert now. */
0362     new_level = user_highest_compat_lock_level(lockres->l_blocking);
0363     lockres->l_requested = new_level;
0364     lockres->l_flags |= USER_LOCK_BUSY;
0365     mlog(ML_BASTS, "lockres %.*s, downconvert %d => %d\n",
0366          lockres->l_namelen, lockres->l_name, lockres->l_level, new_level);
0367     spin_unlock(&lockres->l_lock);
0368 
0369     /* need lock downconvert request now... */
0370     status = ocfs2_dlm_lock(conn, new_level, &lockres->l_lksb,
0371                 DLM_LKF_CONVERT|DLM_LKF_VALBLK,
0372                 lockres->l_name,
0373                 lockres->l_namelen);
0374     if (status) {
0375         user_log_dlm_error("ocfs2_dlm_lock", status, lockres);
0376         user_recover_from_dlm_error(lockres);
0377     }
0378 
0379 drop_ref:
0380     user_dlm_drop_inode_ref(lockres);
0381 }
0382 
0383 static inline void user_dlm_inc_holders(struct user_lock_res *lockres,
0384                     int level)
0385 {
0386     switch(level) {
0387     case DLM_LOCK_EX:
0388         lockres->l_ex_holders++;
0389         break;
0390     case DLM_LOCK_PR:
0391         lockres->l_ro_holders++;
0392         break;
0393     default:
0394         BUG();
0395     }
0396 }
0397 
0398 /* predict what lock level we'll be dropping down to on behalf
0399  * of another node, and return true if the currently wanted
0400  * level will be compatible with it. */
0401 static inline int
0402 user_may_continue_on_blocked_lock(struct user_lock_res *lockres,
0403                   int wanted)
0404 {
0405     BUG_ON(!(lockres->l_flags & USER_LOCK_BLOCKED));
0406 
0407     return wanted <= user_highest_compat_lock_level(lockres->l_blocking);
0408 }
0409 
0410 int user_dlm_cluster_lock(struct user_lock_res *lockres,
0411               int level,
0412               int lkm_flags)
0413 {
0414     int status, local_flags;
0415     struct ocfs2_cluster_connection *conn =
0416         cluster_connection_from_user_lockres(lockres);
0417 
0418     if (level != DLM_LOCK_EX &&
0419         level != DLM_LOCK_PR) {
0420         mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
0421              lockres->l_namelen, lockres->l_name);
0422         status = -EINVAL;
0423         goto bail;
0424     }
0425 
0426     mlog(ML_BASTS, "lockres %.*s, level %d, flags = 0x%x\n",
0427          lockres->l_namelen, lockres->l_name, level, lkm_flags);
0428 
0429 again:
0430     if (signal_pending(current)) {
0431         status = -ERESTARTSYS;
0432         goto bail;
0433     }
0434 
0435     spin_lock(&lockres->l_lock);
0436     if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
0437         spin_unlock(&lockres->l_lock);
0438         status = -EAGAIN;
0439         goto bail;
0440     }
0441 
0442     /* We only compare against the currently granted level
0443      * here. If the lock is blocked waiting on a downconvert,
0444      * we'll get caught below. */
0445     if ((lockres->l_flags & USER_LOCK_BUSY) &&
0446         (level > lockres->l_level)) {
0447         /* is someone sitting in dlm_lock? If so, wait on
0448          * them. */
0449         spin_unlock(&lockres->l_lock);
0450 
0451         user_wait_on_busy_lock(lockres);
0452         goto again;
0453     }
0454 
0455     if ((lockres->l_flags & USER_LOCK_BLOCKED) &&
0456         (!user_may_continue_on_blocked_lock(lockres, level))) {
0457         /* is the lock is currently blocked on behalf of
0458          * another node */
0459         spin_unlock(&lockres->l_lock);
0460 
0461         user_wait_on_blocked_lock(lockres);
0462         goto again;
0463     }
0464 
0465     if (level > lockres->l_level) {
0466         local_flags = lkm_flags | DLM_LKF_VALBLK;
0467         if (lockres->l_level != DLM_LOCK_IV)
0468             local_flags |= DLM_LKF_CONVERT;
0469 
0470         lockres->l_requested = level;
0471         lockres->l_flags |= USER_LOCK_BUSY;
0472         spin_unlock(&lockres->l_lock);
0473 
0474         BUG_ON(level == DLM_LOCK_IV);
0475         BUG_ON(level == DLM_LOCK_NL);
0476 
0477         /* call dlm_lock to upgrade lock now */
0478         status = ocfs2_dlm_lock(conn, level, &lockres->l_lksb,
0479                     local_flags, lockres->l_name,
0480                     lockres->l_namelen);
0481         if (status) {
0482             if ((lkm_flags & DLM_LKF_NOQUEUE) &&
0483                 (status != -EAGAIN))
0484                 user_log_dlm_error("ocfs2_dlm_lock",
0485                            status, lockres);
0486             user_recover_from_dlm_error(lockres);
0487             goto bail;
0488         }
0489 
0490         user_wait_on_busy_lock(lockres);
0491         goto again;
0492     }
0493 
0494     user_dlm_inc_holders(lockres, level);
0495     spin_unlock(&lockres->l_lock);
0496 
0497     status = 0;
0498 bail:
0499     return status;
0500 }
0501 
0502 static inline void user_dlm_dec_holders(struct user_lock_res *lockres,
0503                     int level)
0504 {
0505     switch(level) {
0506     case DLM_LOCK_EX:
0507         BUG_ON(!lockres->l_ex_holders);
0508         lockres->l_ex_holders--;
0509         break;
0510     case DLM_LOCK_PR:
0511         BUG_ON(!lockres->l_ro_holders);
0512         lockres->l_ro_holders--;
0513         break;
0514     default:
0515         BUG();
0516     }
0517 }
0518 
0519 void user_dlm_cluster_unlock(struct user_lock_res *lockres,
0520                  int level)
0521 {
0522     if (level != DLM_LOCK_EX &&
0523         level != DLM_LOCK_PR) {
0524         mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
0525              lockres->l_namelen, lockres->l_name);
0526         return;
0527     }
0528 
0529     spin_lock(&lockres->l_lock);
0530     user_dlm_dec_holders(lockres, level);
0531     __user_dlm_cond_queue_lockres(lockres);
0532     spin_unlock(&lockres->l_lock);
0533 }
0534 
0535 void user_dlm_write_lvb(struct inode *inode,
0536             const char *val,
0537             unsigned int len)
0538 {
0539     struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
0540     char *lvb;
0541 
0542     BUG_ON(len > DLM_LVB_LEN);
0543 
0544     spin_lock(&lockres->l_lock);
0545 
0546     BUG_ON(lockres->l_level < DLM_LOCK_EX);
0547     lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
0548     memcpy(lvb, val, len);
0549 
0550     spin_unlock(&lockres->l_lock);
0551 }
0552 
0553 bool user_dlm_read_lvb(struct inode *inode, char *val)
0554 {
0555     struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
0556     char *lvb;
0557     bool ret = true;
0558 
0559     spin_lock(&lockres->l_lock);
0560 
0561     BUG_ON(lockres->l_level < DLM_LOCK_PR);
0562     if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)) {
0563         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
0564         memcpy(val, lvb, DLM_LVB_LEN);
0565     } else
0566         ret = false;
0567 
0568     spin_unlock(&lockres->l_lock);
0569     return ret;
0570 }
0571 
0572 void user_dlm_lock_res_init(struct user_lock_res *lockres,
0573                 struct dentry *dentry)
0574 {
0575     memset(lockres, 0, sizeof(*lockres));
0576 
0577     spin_lock_init(&lockres->l_lock);
0578     init_waitqueue_head(&lockres->l_event);
0579     lockres->l_level = DLM_LOCK_IV;
0580     lockres->l_requested = DLM_LOCK_IV;
0581     lockres->l_blocking = DLM_LOCK_IV;
0582 
0583     /* should have been checked before getting here. */
0584     BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN);
0585 
0586     memcpy(lockres->l_name,
0587            dentry->d_name.name,
0588            dentry->d_name.len);
0589     lockres->l_namelen = dentry->d_name.len;
0590 }
0591 
0592 int user_dlm_destroy_lock(struct user_lock_res *lockres)
0593 {
0594     int status = -EBUSY;
0595     struct ocfs2_cluster_connection *conn =
0596         cluster_connection_from_user_lockres(lockres);
0597 
0598     mlog(ML_BASTS, "lockres %.*s\n", lockres->l_namelen, lockres->l_name);
0599 
0600     spin_lock(&lockres->l_lock);
0601     if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
0602         spin_unlock(&lockres->l_lock);
0603         goto bail;
0604     }
0605 
0606     lockres->l_flags |= USER_LOCK_IN_TEARDOWN;
0607 
0608     while (lockres->l_flags & USER_LOCK_BUSY) {
0609         spin_unlock(&lockres->l_lock);
0610 
0611         user_wait_on_busy_lock(lockres);
0612 
0613         spin_lock(&lockres->l_lock);
0614     }
0615 
0616     if (lockres->l_ro_holders || lockres->l_ex_holders) {
0617         lockres->l_flags &= ~USER_LOCK_IN_TEARDOWN;
0618         spin_unlock(&lockres->l_lock);
0619         goto bail;
0620     }
0621 
0622     status = 0;
0623     if (!(lockres->l_flags & USER_LOCK_ATTACHED)) {
0624         /*
0625          * lock is never requested, leave USER_LOCK_IN_TEARDOWN set
0626          * to avoid new lock request coming in.
0627          */
0628         spin_unlock(&lockres->l_lock);
0629         goto bail;
0630     }
0631 
0632     lockres->l_flags |= USER_LOCK_BUSY;
0633     spin_unlock(&lockres->l_lock);
0634 
0635     status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, DLM_LKF_VALBLK);
0636     if (status) {
0637         spin_lock(&lockres->l_lock);
0638         lockres->l_flags &= ~USER_LOCK_IN_TEARDOWN;
0639         lockres->l_flags &= ~USER_LOCK_BUSY;
0640         spin_unlock(&lockres->l_lock);
0641         user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
0642         goto bail;
0643     }
0644 
0645     user_wait_on_busy_lock(lockres);
0646 
0647     status = 0;
0648 bail:
0649     return status;
0650 }
0651 
0652 static void user_dlm_recovery_handler_noop(int node_num,
0653                        void *recovery_data)
0654 {
0655     /* We ignore recovery events */
0656     return;
0657 }
0658 
0659 void user_dlm_set_locking_protocol(void)
0660 {
0661     ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto.lp_max_version);
0662 }
0663 
0664 struct ocfs2_cluster_connection *user_dlm_register(const struct qstr *name)
0665 {
0666     int rc;
0667     struct ocfs2_cluster_connection *conn;
0668 
0669     rc = ocfs2_cluster_connect_agnostic(name->name, name->len,
0670                         &user_dlm_lproto,
0671                         user_dlm_recovery_handler_noop,
0672                         NULL, &conn);
0673     if (rc)
0674         mlog_errno(rc);
0675 
0676     return rc ? ERR_PTR(rc) : conn;
0677 }
0678 
0679 void user_dlm_unregister(struct ocfs2_cluster_connection *conn)
0680 {
0681     ocfs2_cluster_disconnect(conn, 0);
0682 }