Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-or-later
0002 /*
0003  * dlmunlock.c
0004  *
0005  * underlying calls for unlocking locks
0006  *
0007  * Copyright (C) 2004 Oracle.  All rights reserved.
0008  */
0009 
0010 
0011 #include <linux/module.h>
0012 #include <linux/fs.h>
0013 #include <linux/types.h>
0014 #include <linux/highmem.h>
0015 #include <linux/init.h>
0016 #include <linux/sysctl.h>
0017 #include <linux/random.h>
0018 #include <linux/blkdev.h>
0019 #include <linux/socket.h>
0020 #include <linux/inet.h>
0021 #include <linux/spinlock.h>
0022 #include <linux/delay.h>
0023 
0024 #include "../cluster/heartbeat.h"
0025 #include "../cluster/nodemanager.h"
0026 #include "../cluster/tcp.h"
0027 
0028 #include "dlmapi.h"
0029 #include "dlmcommon.h"
0030 
0031 #define MLOG_MASK_PREFIX ML_DLM
0032 #include "../cluster/masklog.h"
0033 
0034 #define DLM_UNLOCK_FREE_LOCK           0x00000001
0035 #define DLM_UNLOCK_CALL_AST            0x00000002
0036 #define DLM_UNLOCK_REMOVE_LOCK         0x00000004
0037 #define DLM_UNLOCK_REGRANT_LOCK        0x00000008
0038 #define DLM_UNLOCK_CLEAR_CONVERT_TYPE  0x00000010
0039 
0040 
0041 static enum dlm_status dlm_get_cancel_actions(struct dlm_ctxt *dlm,
0042                           struct dlm_lock_resource *res,
0043                           struct dlm_lock *lock,
0044                           struct dlm_lockstatus *lksb,
0045                           int *actions);
0046 static enum dlm_status dlm_get_unlock_actions(struct dlm_ctxt *dlm,
0047                           struct dlm_lock_resource *res,
0048                           struct dlm_lock *lock,
0049                           struct dlm_lockstatus *lksb,
0050                           int *actions);
0051 
0052 static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm,
0053                          struct dlm_lock_resource *res,
0054                          struct dlm_lock *lock,
0055                          struct dlm_lockstatus *lksb,
0056                          int flags,
0057                          u8 owner);
0058 
0059 
0060 /*
0061  * according to the spec:
0062  * http://opendlm.sourceforge.net/cvsmirror/opendlm/docs/dlmbook_final.pdf
0063  *
0064  *  flags & LKM_CANCEL != 0: must be converting or blocked
0065  *  flags & LKM_CANCEL == 0: must be granted
0066  *
0067  * So to unlock a converting lock, you must first cancel the
0068  * convert (passing LKM_CANCEL in flags), then call the unlock
0069  * again (with no LKM_CANCEL in flags).
0070  */
0071 
0072 
0073 /*
0074  * locking:
0075  *   caller needs:  none
0076  *   taken:         res->spinlock and lock->spinlock taken and dropped
0077  *   held on exit:  none
0078  * returns: DLM_NORMAL, DLM_NOLOCKMGR, status from network
0079  * all callers should have taken an extra ref on lock coming in
0080  */
0081 static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm,
0082                     struct dlm_lock_resource *res,
0083                     struct dlm_lock *lock,
0084                     struct dlm_lockstatus *lksb,
0085                     int flags, int *call_ast,
0086                     int master_node)
0087 {
0088     enum dlm_status status;
0089     int actions = 0;
0090     int in_use;
0091     u8 owner;
0092     int recovery_wait = 0;
0093 
0094     mlog(0, "master_node = %d, valblk = %d\n", master_node,
0095          flags & LKM_VALBLK);
0096 
0097     if (master_node)
0098         BUG_ON(res->owner != dlm->node_num);
0099     else
0100         BUG_ON(res->owner == dlm->node_num);
0101 
0102     spin_lock(&dlm->ast_lock);
0103     /* We want to be sure that we're not freeing a lock
0104      * that still has AST's pending... */
0105     in_use = !list_empty(&lock->ast_list);
0106     spin_unlock(&dlm->ast_lock);
0107     if (in_use && !(flags & LKM_CANCEL)) {
0108            mlog(ML_ERROR, "lockres %.*s: Someone is calling dlmunlock "
0109             "while waiting for an ast!", res->lockname.len,
0110             res->lockname.name);
0111         return DLM_BADPARAM;
0112     }
0113 
0114     spin_lock(&res->spinlock);
0115     if (res->state & DLM_LOCK_RES_IN_PROGRESS) {
0116         if (master_node && !(flags & LKM_CANCEL)) {
0117             mlog(ML_ERROR, "lockres in progress!\n");
0118             spin_unlock(&res->spinlock);
0119             return DLM_FORWARD;
0120         }
0121         /* ok for this to sleep if not in a network handler */
0122         __dlm_wait_on_lockres(res);
0123         res->state |= DLM_LOCK_RES_IN_PROGRESS;
0124     }
0125     spin_lock(&lock->spinlock);
0126 
0127     if (res->state & DLM_LOCK_RES_RECOVERING) {
0128         status = DLM_RECOVERING;
0129         goto leave;
0130     }
0131 
0132     if (res->state & DLM_LOCK_RES_MIGRATING) {
0133         status = DLM_MIGRATING;
0134         goto leave;
0135     }
0136 
0137     /* see above for what the spec says about
0138      * LKM_CANCEL and the lock queue state */
0139     if (flags & LKM_CANCEL)
0140         status = dlm_get_cancel_actions(dlm, res, lock, lksb, &actions);
0141     else
0142         status = dlm_get_unlock_actions(dlm, res, lock, lksb, &actions);
0143 
0144     if (status != DLM_NORMAL && (status != DLM_CANCELGRANT || !master_node))
0145         goto leave;
0146 
0147     /* By now this has been masked out of cancel requests. */
0148     if (flags & LKM_VALBLK) {
0149         /* make the final update to the lvb */
0150         if (master_node)
0151             memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
0152         else
0153             flags |= LKM_PUT_LVB; /* let the send function
0154                            * handle it. */
0155     }
0156 
0157     if (!master_node) {
0158         owner = res->owner;
0159         /* drop locks and send message */
0160         if (flags & LKM_CANCEL)
0161             lock->cancel_pending = 1;
0162         else
0163             lock->unlock_pending = 1;
0164         spin_unlock(&lock->spinlock);
0165         spin_unlock(&res->spinlock);
0166         status = dlm_send_remote_unlock_request(dlm, res, lock, lksb,
0167                             flags, owner);
0168         spin_lock(&res->spinlock);
0169         spin_lock(&lock->spinlock);
0170         /* if the master told us the lock was already granted,
0171          * let the ast handle all of these actions */
0172         if (status == DLM_CANCELGRANT) {
0173             actions &= ~(DLM_UNLOCK_REMOVE_LOCK|
0174                      DLM_UNLOCK_REGRANT_LOCK|
0175                      DLM_UNLOCK_CLEAR_CONVERT_TYPE);
0176         } else if (status == DLM_RECOVERING ||
0177                status == DLM_MIGRATING ||
0178                status == DLM_FORWARD ||
0179                status == DLM_NOLOCKMGR
0180                ) {
0181             /* must clear the actions because this unlock
0182              * is about to be retried.  cannot free or do
0183              * any list manipulation. */
0184             mlog(0, "%s:%.*s: clearing actions, %s\n",
0185                  dlm->name, res->lockname.len,
0186                  res->lockname.name,
0187                  status==DLM_RECOVERING?"recovering":
0188                  (status==DLM_MIGRATING?"migrating":
0189                 (status == DLM_FORWARD ? "forward" :
0190                         "nolockmanager")));
0191             actions = 0;
0192         }
0193         if (flags & LKM_CANCEL)
0194             lock->cancel_pending = 0;
0195         else {
0196             if (!lock->unlock_pending)
0197                 recovery_wait = 1;
0198             else
0199                 lock->unlock_pending = 0;
0200         }
0201     }
0202 
0203     /* get an extra ref on lock.  if we are just switching
0204      * lists here, we dont want the lock to go away. */
0205     dlm_lock_get(lock);
0206 
0207     if (actions & DLM_UNLOCK_REMOVE_LOCK) {
0208         list_del_init(&lock->list);
0209         dlm_lock_put(lock);
0210     }
0211     if (actions & DLM_UNLOCK_REGRANT_LOCK) {
0212         dlm_lock_get(lock);
0213         list_add_tail(&lock->list, &res->granted);
0214     }
0215     if (actions & DLM_UNLOCK_CLEAR_CONVERT_TYPE) {
0216         mlog(0, "clearing convert_type at %smaster node\n",
0217              master_node ? "" : "non-");
0218         lock->ml.convert_type = LKM_IVMODE;
0219     }
0220 
0221     /* remove the extra ref on lock */
0222     dlm_lock_put(lock);
0223 
0224 leave:
0225     res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
0226     if (!dlm_lock_on_list(&res->converting, lock))
0227         BUG_ON(lock->ml.convert_type != LKM_IVMODE);
0228     else
0229         BUG_ON(lock->ml.convert_type == LKM_IVMODE);
0230     spin_unlock(&lock->spinlock);
0231     spin_unlock(&res->spinlock);
0232     wake_up(&res->wq);
0233 
0234     if (recovery_wait) {
0235         spin_lock(&res->spinlock);
0236         /* Unlock request will directly succeed after owner dies,
0237          * and the lock is already removed from grant list. We have to
0238          * wait for RECOVERING done or we miss the chance to purge it
0239          * since the removement is much faster than RECOVERING proc.
0240          */
0241         __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_RECOVERING);
0242         spin_unlock(&res->spinlock);
0243     }
0244 
0245     /* let the caller's final dlm_lock_put handle the actual kfree */
0246     if (actions & DLM_UNLOCK_FREE_LOCK) {
0247         /* this should always be coupled with list removal */
0248         BUG_ON(!(actions & DLM_UNLOCK_REMOVE_LOCK));
0249         mlog(0, "lock %u:%llu should be gone now! refs=%d\n",
0250              dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
0251              dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
0252              kref_read(&lock->lock_refs)-1);
0253         dlm_lock_put(lock);
0254     }
0255     if (actions & DLM_UNLOCK_CALL_AST)
0256         *call_ast = 1;
0257 
0258     /* if cancel or unlock succeeded, lvb work is done */
0259     if (status == DLM_NORMAL)
0260         lksb->flags &= ~(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB);
0261 
0262     return status;
0263 }
0264 
0265 void dlm_commit_pending_unlock(struct dlm_lock_resource *res,
0266                    struct dlm_lock *lock)
0267 {
0268     /* leave DLM_LKSB_PUT_LVB on the lksb so any final
0269      * update of the lvb will be sent to the new master */
0270     list_del_init(&lock->list);
0271 }
0272 
0273 void dlm_commit_pending_cancel(struct dlm_lock_resource *res,
0274                    struct dlm_lock *lock)
0275 {
0276     list_move_tail(&lock->list, &res->granted);
0277     lock->ml.convert_type = LKM_IVMODE;
0278 }
0279 
0280 
0281 static inline enum dlm_status dlmunlock_master(struct dlm_ctxt *dlm,
0282                       struct dlm_lock_resource *res,
0283                       struct dlm_lock *lock,
0284                       struct dlm_lockstatus *lksb,
0285                       int flags,
0286                       int *call_ast)
0287 {
0288     return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 1);
0289 }
0290 
0291 static inline enum dlm_status dlmunlock_remote(struct dlm_ctxt *dlm,
0292                       struct dlm_lock_resource *res,
0293                       struct dlm_lock *lock,
0294                       struct dlm_lockstatus *lksb,
0295                       int flags, int *call_ast)
0296 {
0297     return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 0);
0298 }
0299 
0300 /*
0301  * locking:
0302  *   caller needs:  none
0303  *   taken:         none
0304  *   held on exit:  none
0305  * returns: DLM_NORMAL, DLM_NOLOCKMGR, status from network
0306  */
0307 static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm,
0308                          struct dlm_lock_resource *res,
0309                          struct dlm_lock *lock,
0310                          struct dlm_lockstatus *lksb,
0311                          int flags,
0312                          u8 owner)
0313 {
0314     struct dlm_unlock_lock unlock;
0315     int tmpret;
0316     enum dlm_status ret;
0317     int status = 0;
0318     struct kvec vec[2];
0319     size_t veclen = 1;
0320 
0321     mlog(0, "%.*s\n", res->lockname.len, res->lockname.name);
0322 
0323     if (owner == dlm->node_num) {
0324         /* ended up trying to contact ourself.  this means
0325          * that the lockres had been remote but became local
0326          * via a migration.  just retry it, now as local */
0327         mlog(0, "%s:%.*s: this node became the master due to a "
0328              "migration, re-evaluate now\n", dlm->name,
0329              res->lockname.len, res->lockname.name);
0330         return DLM_FORWARD;
0331     }
0332 
0333     memset(&unlock, 0, sizeof(unlock));
0334     unlock.node_idx = dlm->node_num;
0335     unlock.flags = cpu_to_be32(flags);
0336     unlock.cookie = lock->ml.cookie;
0337     unlock.namelen = res->lockname.len;
0338     memcpy(unlock.name, res->lockname.name, unlock.namelen);
0339 
0340     vec[0].iov_len = sizeof(struct dlm_unlock_lock);
0341     vec[0].iov_base = &unlock;
0342 
0343     if (flags & LKM_PUT_LVB) {
0344         /* extra data to send if we are updating lvb */
0345         vec[1].iov_len = DLM_LVB_LEN;
0346         vec[1].iov_base = lock->lksb->lvb;
0347         veclen++;
0348     }
0349 
0350     tmpret = o2net_send_message_vec(DLM_UNLOCK_LOCK_MSG, dlm->key,
0351                     vec, veclen, owner, &status);
0352     if (tmpret >= 0) {
0353         // successfully sent and received
0354         if (status == DLM_FORWARD)
0355             mlog(0, "master was in-progress.  retry\n");
0356         ret = status;
0357     } else {
0358         mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
0359              "node %u\n", tmpret, DLM_UNLOCK_LOCK_MSG, dlm->key, owner);
0360         if (dlm_is_host_down(tmpret)) {
0361             /* NOTE: this seems strange, but it is what we want.
0362              * when the master goes down during a cancel or
0363              * unlock, the recovery code completes the operation
0364              * as if the master had not died, then passes the
0365              * updated state to the recovery master.  this thread
0366              * just needs to finish out the operation and call
0367              * the unlockast. */
0368             if (dlm_is_node_dead(dlm, owner))
0369                 ret = DLM_NORMAL;
0370             else
0371                 ret = DLM_NOLOCKMGR;
0372         } else {
0373             /* something bad.  this will BUG in ocfs2 */
0374             ret = dlm_err_to_dlm_status(tmpret);
0375         }
0376     }
0377 
0378     return ret;
0379 }
0380 
0381 /*
0382  * locking:
0383  *   caller needs:  none
0384  *   taken:         takes and drops res->spinlock
0385  *   held on exit:  none
0386  * returns: DLM_NORMAL, DLM_BADARGS, DLM_IVLOCKID,
0387  *          return value from dlmunlock_master
0388  */
0389 int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data,
0390                 void **ret_data)
0391 {
0392     struct dlm_ctxt *dlm = data;
0393     struct dlm_unlock_lock *unlock = (struct dlm_unlock_lock *)msg->buf;
0394     struct dlm_lock_resource *res = NULL;
0395     struct dlm_lock *lock = NULL, *iter;
0396     enum dlm_status status = DLM_NORMAL;
0397     int i;
0398     struct dlm_lockstatus *lksb = NULL;
0399     int ignore;
0400     u32 flags;
0401     struct list_head *queue;
0402 
0403     flags = be32_to_cpu(unlock->flags);
0404 
0405     if (flags & LKM_GET_LVB) {
0406         mlog(ML_ERROR, "bad args!  GET_LVB specified on unlock!\n");
0407         return DLM_BADARGS;
0408     }
0409 
0410     if ((flags & (LKM_PUT_LVB|LKM_CANCEL)) == (LKM_PUT_LVB|LKM_CANCEL)) {
0411         mlog(ML_ERROR, "bad args!  cannot modify lvb on a CANCEL "
0412              "request!\n");
0413         return DLM_BADARGS;
0414     }
0415 
0416     if (unlock->namelen > DLM_LOCKID_NAME_MAX) {
0417         mlog(ML_ERROR, "Invalid name length in unlock handler!\n");
0418         return DLM_IVBUFLEN;
0419     }
0420 
0421     if (!dlm_grab(dlm))
0422         return DLM_FORWARD;
0423 
0424     mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
0425             "Domain %s not fully joined!\n", dlm->name);
0426 
0427     mlog(0, "lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : "none");
0428 
0429     res = dlm_lookup_lockres(dlm, unlock->name, unlock->namelen);
0430     if (!res) {
0431         /* We assume here that a no lock resource simply means
0432          * it was migrated away and destroyed before the other
0433          * node could detect it. */
0434         mlog(0, "returning DLM_FORWARD -- res no longer exists\n");
0435         status = DLM_FORWARD;
0436         goto not_found;
0437     }
0438 
0439     queue=&res->granted;
0440     spin_lock(&res->spinlock);
0441     if (res->state & DLM_LOCK_RES_RECOVERING) {
0442         spin_unlock(&res->spinlock);
0443         mlog(0, "returning DLM_RECOVERING\n");
0444         status = DLM_RECOVERING;
0445         goto leave;
0446     }
0447 
0448     if (res->state & DLM_LOCK_RES_MIGRATING) {
0449         spin_unlock(&res->spinlock);
0450         mlog(0, "returning DLM_MIGRATING\n");
0451         status = DLM_MIGRATING;
0452         goto leave;
0453     }
0454 
0455     if (res->owner != dlm->node_num) {
0456         spin_unlock(&res->spinlock);
0457         mlog(0, "returning DLM_FORWARD -- not master\n");
0458         status = DLM_FORWARD;
0459         goto leave;
0460     }
0461 
0462     for (i=0; i<3; i++) {
0463         list_for_each_entry(iter, queue, list) {
0464             if (iter->ml.cookie == unlock->cookie &&
0465                 iter->ml.node == unlock->node_idx) {
0466                 dlm_lock_get(iter);
0467                 lock = iter;
0468                 break;
0469             }
0470         }
0471         if (lock)
0472             break;
0473         /* scan granted -> converting -> blocked queues */
0474         queue++;
0475     }
0476     spin_unlock(&res->spinlock);
0477     if (!lock) {
0478         status = DLM_IVLOCKID;
0479         goto not_found;
0480     }
0481 
0482     /* lock was found on queue */
0483     lksb = lock->lksb;
0484     if (flags & (LKM_VALBLK|LKM_PUT_LVB) &&
0485         lock->ml.type != LKM_EXMODE)
0486         flags &= ~(LKM_VALBLK|LKM_PUT_LVB);
0487 
0488     /* unlockast only called on originating node */
0489     if (flags & LKM_PUT_LVB) {
0490         lksb->flags |= DLM_LKSB_PUT_LVB;
0491         memcpy(&lksb->lvb[0], &unlock->lvb[0], DLM_LVB_LEN);
0492     }
0493 
0494     /* if this is in-progress, propagate the DLM_FORWARD
0495      * all the way back out */
0496     status = dlmunlock_master(dlm, res, lock, lksb, flags, &ignore);
0497     if (status == DLM_FORWARD)
0498         mlog(0, "lockres is in progress\n");
0499 
0500     if (flags & LKM_PUT_LVB)
0501         lksb->flags &= ~DLM_LKSB_PUT_LVB;
0502 
0503     dlm_lockres_calc_usage(dlm, res);
0504     dlm_kick_thread(dlm, res);
0505 
0506 not_found:
0507     if (!lock)
0508         mlog(ML_ERROR, "failed to find lock to unlock! "
0509                    "cookie=%u:%llu\n",
0510              dlm_get_lock_cookie_node(be64_to_cpu(unlock->cookie)),
0511              dlm_get_lock_cookie_seq(be64_to_cpu(unlock->cookie)));
0512     else
0513         dlm_lock_put(lock);
0514 
0515 leave:
0516     if (res)
0517         dlm_lockres_put(res);
0518 
0519     dlm_put(dlm);
0520 
0521     return status;
0522 }
0523 
0524 
0525 static enum dlm_status dlm_get_cancel_actions(struct dlm_ctxt *dlm,
0526                           struct dlm_lock_resource *res,
0527                           struct dlm_lock *lock,
0528                           struct dlm_lockstatus *lksb,
0529                           int *actions)
0530 {
0531     enum dlm_status status;
0532 
0533     if (dlm_lock_on_list(&res->blocked, lock)) {
0534         /* cancel this outright */
0535         status = DLM_NORMAL;
0536         *actions = (DLM_UNLOCK_CALL_AST |
0537                 DLM_UNLOCK_REMOVE_LOCK);
0538     } else if (dlm_lock_on_list(&res->converting, lock)) {
0539         /* cancel the request, put back on granted */
0540         status = DLM_NORMAL;
0541         *actions = (DLM_UNLOCK_CALL_AST |
0542                 DLM_UNLOCK_REMOVE_LOCK |
0543                 DLM_UNLOCK_REGRANT_LOCK |
0544                 DLM_UNLOCK_CLEAR_CONVERT_TYPE);
0545     } else if (dlm_lock_on_list(&res->granted, lock)) {
0546         /* too late, already granted. */
0547         status = DLM_CANCELGRANT;
0548         *actions = DLM_UNLOCK_CALL_AST;
0549     } else {
0550         mlog(ML_ERROR, "lock to cancel is not on any list!\n");
0551         status = DLM_IVLOCKID;
0552         *actions = 0;
0553     }
0554     return status;
0555 }
0556 
0557 static enum dlm_status dlm_get_unlock_actions(struct dlm_ctxt *dlm,
0558                           struct dlm_lock_resource *res,
0559                           struct dlm_lock *lock,
0560                           struct dlm_lockstatus *lksb,
0561                           int *actions)
0562 {
0563     enum dlm_status status;
0564 
0565     /* unlock request */
0566     if (!dlm_lock_on_list(&res->granted, lock)) {
0567         status = DLM_DENIED;
0568         dlm_error(status);
0569         *actions = 0;
0570     } else {
0571         /* unlock granted lock */
0572         status = DLM_NORMAL;
0573         *actions = (DLM_UNLOCK_FREE_LOCK |
0574                 DLM_UNLOCK_CALL_AST |
0575                 DLM_UNLOCK_REMOVE_LOCK);
0576     }
0577     return status;
0578 }
0579 
0580 /* there seems to be no point in doing this async
0581  * since (even for the remote case) there is really
0582  * no work to queue up... so just do it and fire the
0583  * unlockast by hand when done... */
0584 enum dlm_status dlmunlock(struct dlm_ctxt *dlm, struct dlm_lockstatus *lksb,
0585               int flags, dlm_astunlockfunc_t *unlockast, void *data)
0586 {
0587     enum dlm_status status;
0588     struct dlm_lock_resource *res;
0589     struct dlm_lock *lock = NULL;
0590     int call_ast, is_master;
0591 
0592     if (!lksb) {
0593         dlm_error(DLM_BADARGS);
0594         return DLM_BADARGS;
0595     }
0596 
0597     if (flags & ~(LKM_CANCEL | LKM_VALBLK | LKM_INVVALBLK)) {
0598         dlm_error(DLM_BADPARAM);
0599         return DLM_BADPARAM;
0600     }
0601 
0602     if ((flags & (LKM_VALBLK | LKM_CANCEL)) == (LKM_VALBLK | LKM_CANCEL)) {
0603         mlog(0, "VALBLK given with CANCEL: ignoring VALBLK\n");
0604         flags &= ~LKM_VALBLK;
0605     }
0606 
0607     if (!lksb->lockid || !lksb->lockid->lockres) {
0608         dlm_error(DLM_BADPARAM);
0609         return DLM_BADPARAM;
0610     }
0611 
0612     lock = lksb->lockid;
0613     BUG_ON(!lock);
0614     dlm_lock_get(lock);
0615 
0616     res = lock->lockres;
0617     BUG_ON(!res);
0618     dlm_lockres_get(res);
0619 retry:
0620     call_ast = 0;
0621     /* need to retry up here because owner may have changed */
0622     mlog(0, "lock=%p res=%p\n", lock, res);
0623 
0624     spin_lock(&res->spinlock);
0625     is_master = (res->owner == dlm->node_num);
0626     if (flags & LKM_VALBLK && lock->ml.type != LKM_EXMODE)
0627         flags &= ~LKM_VALBLK;
0628     spin_unlock(&res->spinlock);
0629 
0630     if (is_master) {
0631         status = dlmunlock_master(dlm, res, lock, lksb, flags,
0632                       &call_ast);
0633         mlog(0, "done calling dlmunlock_master: returned %d, "
0634              "call_ast is %d\n", status, call_ast);
0635     } else {
0636         status = dlmunlock_remote(dlm, res, lock, lksb, flags,
0637                       &call_ast);
0638         mlog(0, "done calling dlmunlock_remote: returned %d, "
0639              "call_ast is %d\n", status, call_ast);
0640     }
0641 
0642     if (status == DLM_RECOVERING ||
0643         status == DLM_MIGRATING ||
0644         status == DLM_FORWARD ||
0645         status == DLM_NOLOCKMGR) {
0646 
0647         /* We want to go away for a tiny bit to allow recovery
0648          * / migration to complete on this resource. I don't
0649          * know of any wait queue we could sleep on as this
0650          * may be happening on another node. Perhaps the
0651          * proper solution is to queue up requests on the
0652          * other end? */
0653 
0654         /* do we want to yield(); ?? */
0655         msleep(50);
0656 
0657         mlog(0, "retrying unlock due to pending recovery/"
0658              "migration/in-progress/reconnect\n");
0659         goto retry;
0660     }
0661 
0662     if (call_ast) {
0663         mlog(0, "calling unlockast(%p, %d)\n", data, status);
0664         if (is_master) {
0665             /* it is possible that there is one last bast
0666              * pending.  make sure it is flushed, then
0667              * call the unlockast.
0668              * not an issue if this is a mastered remotely,
0669              * since this lock has been removed from the
0670              * lockres queues and cannot be found. */
0671             dlm_kick_thread(dlm, NULL);
0672             wait_event(dlm->ast_wq,
0673                    dlm_lock_basts_flushed(dlm, lock));
0674         }
0675         (*unlockast)(data, status);
0676     }
0677 
0678     if (status == DLM_CANCELGRANT)
0679         status = DLM_NORMAL;
0680 
0681     if (status == DLM_NORMAL) {
0682         mlog(0, "kicking the thread\n");
0683         dlm_kick_thread(dlm, res);
0684     } else
0685         dlm_error(status);
0686 
0687     dlm_lockres_calc_usage(dlm, res);
0688     dlm_lockres_put(res);
0689     dlm_lock_put(lock);
0690 
0691     mlog(0, "returning status=%d!\n", status);
0692     return status;
0693 }
0694 EXPORT_SYMBOL_GPL(dlmunlock);
0695