Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-or-later
0002 /*
0003  * dlmglue.c
0004  *
0005  * Code which implements an OCFS2 specific interface to our DLM.
0006  *
0007  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
0008  */
0009 
0010 #include <linux/types.h>
0011 #include <linux/slab.h>
0012 #include <linux/highmem.h>
0013 #include <linux/mm.h>
0014 #include <linux/kthread.h>
0015 #include <linux/pagemap.h>
0016 #include <linux/debugfs.h>
0017 #include <linux/seq_file.h>
0018 #include <linux/time.h>
0019 #include <linux/delay.h>
0020 #include <linux/quotaops.h>
0021 #include <linux/sched/signal.h>
0022 
0023 #define MLOG_MASK_PREFIX ML_DLM_GLUE
0024 #include <cluster/masklog.h>
0025 
0026 #include "ocfs2.h"
0027 #include "ocfs2_lockingver.h"
0028 
0029 #include "alloc.h"
0030 #include "dcache.h"
0031 #include "dlmglue.h"
0032 #include "extent_map.h"
0033 #include "file.h"
0034 #include "heartbeat.h"
0035 #include "inode.h"
0036 #include "journal.h"
0037 #include "stackglue.h"
0038 #include "slot_map.h"
0039 #include "super.h"
0040 #include "uptodate.h"
0041 #include "quota.h"
0042 #include "refcounttree.h"
0043 #include "acl.h"
0044 
0045 #include "buffer_head_io.h"
0046 
0047 struct ocfs2_mask_waiter {
0048     struct list_head    mw_item;
0049     int         mw_status;
0050     struct completion   mw_complete;
0051     unsigned long       mw_mask;
0052     unsigned long       mw_goal;
0053 #ifdef CONFIG_OCFS2_FS_STATS
0054     ktime_t         mw_lock_start;
0055 #endif
0056 };
0057 
0058 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
0059 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
0060 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
0061 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres);
0062 
0063 /*
0064  * Return value from ->downconvert_worker functions.
0065  *
0066  * These control the precise actions of ocfs2_unblock_lock()
0067  * and ocfs2_process_blocked_lock()
0068  *
0069  */
0070 enum ocfs2_unblock_action {
0071     UNBLOCK_CONTINUE    = 0, /* Continue downconvert */
0072     UNBLOCK_CONTINUE_POST   = 1, /* Continue downconvert, fire
0073                       * ->post_unlock callback */
0074     UNBLOCK_STOP_POST   = 2, /* Do not downconvert, fire
0075                       * ->post_unlock() callback. */
0076 };
0077 
0078 struct ocfs2_unblock_ctl {
0079     int requeue;
0080     enum ocfs2_unblock_action unblock_action;
0081 };
0082 
0083 /* Lockdep class keys */
0084 #ifdef CONFIG_DEBUG_LOCK_ALLOC
0085 static struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES];
0086 #endif
0087 
0088 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
0089                     int new_level);
0090 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
0091 
0092 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
0093                      int blocking);
0094 
0095 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
0096                        int blocking);
0097 
0098 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
0099                      struct ocfs2_lock_res *lockres);
0100 
0101 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres);
0102 
0103 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
0104                         int new_level);
0105 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
0106                      int blocking);
0107 
0108 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
0109 
0110 /* This aids in debugging situations where a bad LVB might be involved. */
0111 static void ocfs2_dump_meta_lvb_info(u64 level,
0112                      const char *function,
0113                      unsigned int line,
0114                      struct ocfs2_lock_res *lockres)
0115 {
0116     struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
0117 
0118     mlog(level, "LVB information for %s (called from %s:%u):\n",
0119          lockres->l_name, function, line);
0120     mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
0121          lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
0122          be32_to_cpu(lvb->lvb_igeneration));
0123     mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
0124          (unsigned long long)be64_to_cpu(lvb->lvb_isize),
0125          be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
0126          be16_to_cpu(lvb->lvb_imode));
0127     mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
0128          "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
0129          (long long)be64_to_cpu(lvb->lvb_iatime_packed),
0130          (long long)be64_to_cpu(lvb->lvb_ictime_packed),
0131          (long long)be64_to_cpu(lvb->lvb_imtime_packed),
0132          be32_to_cpu(lvb->lvb_iattr));
0133 }
0134 
0135 
0136 /*
0137  * OCFS2 Lock Resource Operations
0138  *
0139  * These fine tune the behavior of the generic dlmglue locking infrastructure.
0140  *
0141  * The most basic of lock types can point ->l_priv to their respective
0142  * struct ocfs2_super and allow the default actions to manage things.
0143  *
0144  * Right now, each lock type also needs to implement an init function,
0145  * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
0146  * should be called when the lock is no longer needed (i.e., object
0147  * destruction time).
0148  */
0149 struct ocfs2_lock_res_ops {
0150     /*
0151      * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
0152      * this callback if ->l_priv is not an ocfs2_super pointer
0153      */
0154     struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
0155 
0156     /*
0157      * Optionally called in the downconvert thread after a
0158      * successful downconvert. The lockres will not be referenced
0159      * after this callback is called, so it is safe to free
0160      * memory, etc.
0161      *
0162      * The exact semantics of when this is called are controlled
0163      * by ->downconvert_worker()
0164      */
0165     void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
0166 
0167     /*
0168      * Allow a lock type to add checks to determine whether it is
0169      * safe to downconvert a lock. Return 0 to re-queue the
0170      * downconvert at a later time, nonzero to continue.
0171      *
0172      * For most locks, the default checks that there are no
0173      * incompatible holders are sufficient.
0174      *
0175      * Called with the lockres spinlock held.
0176      */
0177     int (*check_downconvert)(struct ocfs2_lock_res *, int);
0178 
0179     /*
0180      * Allows a lock type to populate the lock value block. This
0181      * is called on downconvert, and when we drop a lock.
0182      *
0183      * Locks that want to use this should set LOCK_TYPE_USES_LVB
0184      * in the flags field.
0185      *
0186      * Called with the lockres spinlock held.
0187      */
0188     void (*set_lvb)(struct ocfs2_lock_res *);
0189 
0190     /*
0191      * Called from the downconvert thread when it is determined
0192      * that a lock will be downconverted. This is called without
0193      * any locks held so the function can do work that might
0194      * schedule (syncing out data, etc).
0195      *
0196      * This should return any one of the ocfs2_unblock_action
0197      * values, depending on what it wants the thread to do.
0198      */
0199     int (*downconvert_worker)(struct ocfs2_lock_res *, int);
0200 
0201     /*
0202      * LOCK_TYPE_* flags which describe the specific requirements
0203      * of a lock type. Descriptions of each individual flag follow.
0204      */
0205     int flags;
0206 };
0207 
0208 /*
0209  * Some locks want to "refresh" potentially stale data when a
0210  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
0211  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
0212  * individual lockres l_flags member from the ast function. It is
0213  * expected that the locking wrapper will clear the
0214  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
0215  */
0216 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
0217 
0218 /*
0219  * Indicate that a lock type makes use of the lock value block. The
0220  * ->set_lvb lock type callback must be defined.
0221  */
0222 #define LOCK_TYPE_USES_LVB      0x2
0223 
0224 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
0225     .get_osb    = ocfs2_get_inode_osb,
0226     .flags      = 0,
0227 };
0228 
0229 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
0230     .get_osb    = ocfs2_get_inode_osb,
0231     .check_downconvert = ocfs2_check_meta_downconvert,
0232     .set_lvb    = ocfs2_set_meta_lvb,
0233     .downconvert_worker = ocfs2_data_convert_worker,
0234     .flags      = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
0235 };
0236 
0237 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
0238     .flags      = LOCK_TYPE_REQUIRES_REFRESH,
0239 };
0240 
0241 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
0242     .flags      = 0,
0243 };
0244 
0245 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
0246     .flags      = 0,
0247 };
0248 
0249 static struct ocfs2_lock_res_ops ocfs2_trim_fs_lops = {
0250     .flags      = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
0251 };
0252 
0253 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
0254     .flags      = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
0255 };
0256 
0257 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
0258     .get_osb    = ocfs2_get_dentry_osb,
0259     .post_unlock    = ocfs2_dentry_post_unlock,
0260     .downconvert_worker = ocfs2_dentry_convert_worker,
0261     .flags      = 0,
0262 };
0263 
0264 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
0265     .get_osb    = ocfs2_get_inode_osb,
0266     .flags      = 0,
0267 };
0268 
0269 static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
0270     .get_osb    = ocfs2_get_file_osb,
0271     .flags      = 0,
0272 };
0273 
0274 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = {
0275     .set_lvb    = ocfs2_set_qinfo_lvb,
0276     .get_osb    = ocfs2_get_qinfo_osb,
0277     .flags      = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB,
0278 };
0279 
0280 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = {
0281     .check_downconvert = ocfs2_check_refcount_downconvert,
0282     .downconvert_worker = ocfs2_refcount_convert_worker,
0283     .flags      = 0,
0284 };
0285 
0286 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
0287 {
0288     return lockres->l_type == OCFS2_LOCK_TYPE_META ||
0289         lockres->l_type == OCFS2_LOCK_TYPE_RW ||
0290         lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
0291 }
0292 
0293 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
0294 {
0295     return container_of(lksb, struct ocfs2_lock_res, l_lksb);
0296 }
0297 
0298 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
0299 {
0300     BUG_ON(!ocfs2_is_inode_lock(lockres));
0301 
0302     return (struct inode *) lockres->l_priv;
0303 }
0304 
0305 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
0306 {
0307     BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
0308 
0309     return (struct ocfs2_dentry_lock *)lockres->l_priv;
0310 }
0311 
0312 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres)
0313 {
0314     BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO);
0315 
0316     return (struct ocfs2_mem_dqinfo *)lockres->l_priv;
0317 }
0318 
0319 static inline struct ocfs2_refcount_tree *
0320 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res)
0321 {
0322     return container_of(res, struct ocfs2_refcount_tree, rf_lockres);
0323 }
0324 
0325 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
0326 {
0327     if (lockres->l_ops->get_osb)
0328         return lockres->l_ops->get_osb(lockres);
0329 
0330     return (struct ocfs2_super *)lockres->l_priv;
0331 }
0332 
0333 static int ocfs2_lock_create(struct ocfs2_super *osb,
0334                  struct ocfs2_lock_res *lockres,
0335                  int level,
0336                  u32 dlm_flags);
0337 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
0338                              int wanted);
0339 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
0340                    struct ocfs2_lock_res *lockres,
0341                    int level, unsigned long caller_ip);
0342 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb,
0343                     struct ocfs2_lock_res *lockres,
0344                     int level)
0345 {
0346     __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_);
0347 }
0348 
0349 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
0350 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
0351 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
0352 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
0353 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
0354                     struct ocfs2_lock_res *lockres);
0355 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
0356                         int convert);
0357 #define ocfs2_log_dlm_error(_func, _err, _lockres) do {                 \
0358     if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY)               \
0359         mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n",    \
0360              _err, _func, _lockres->l_name);                    \
0361     else                                        \
0362         mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n",  \
0363              _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name,  \
0364              (unsigned int)ocfs2_get_dentry_lock_ino(_lockres));        \
0365 } while (0)
0366 static int ocfs2_downconvert_thread(void *arg);
0367 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
0368                     struct ocfs2_lock_res *lockres);
0369 static int ocfs2_inode_lock_update(struct inode *inode,
0370                   struct buffer_head **bh);
0371 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
0372 static inline int ocfs2_highest_compat_lock_level(int level);
0373 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
0374                           int new_level);
0375 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
0376                   struct ocfs2_lock_res *lockres,
0377                   int new_level,
0378                   int lvb,
0379                   unsigned int generation);
0380 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
0381                         struct ocfs2_lock_res *lockres);
0382 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
0383                 struct ocfs2_lock_res *lockres);
0384 
0385 
0386 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
0387                   u64 blkno,
0388                   u32 generation,
0389                   char *name)
0390 {
0391     int len;
0392 
0393     BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
0394 
0395     len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
0396                ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
0397                (long long)blkno, generation);
0398 
0399     BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
0400 
0401     mlog(0, "built lock resource with name: %s\n", name);
0402 }
0403 
0404 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
0405 
0406 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
0407                        struct ocfs2_dlm_debug *dlm_debug)
0408 {
0409     mlog(0, "Add tracking for lockres %s\n", res->l_name);
0410 
0411     spin_lock(&ocfs2_dlm_tracking_lock);
0412     list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
0413     spin_unlock(&ocfs2_dlm_tracking_lock);
0414 }
0415 
0416 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
0417 {
0418     spin_lock(&ocfs2_dlm_tracking_lock);
0419     if (!list_empty(&res->l_debug_list))
0420         list_del_init(&res->l_debug_list);
0421     spin_unlock(&ocfs2_dlm_tracking_lock);
0422 }
0423 
0424 #ifdef CONFIG_OCFS2_FS_STATS
0425 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
0426 {
0427     res->l_lock_refresh = 0;
0428     res->l_lock_wait = 0;
0429     memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats));
0430     memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats));
0431 }
0432 
0433 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
0434                     struct ocfs2_mask_waiter *mw, int ret)
0435 {
0436     u32 usec;
0437     ktime_t kt;
0438     struct ocfs2_lock_stats *stats;
0439 
0440     if (level == LKM_PRMODE)
0441         stats = &res->l_lock_prmode;
0442     else if (level == LKM_EXMODE)
0443         stats = &res->l_lock_exmode;
0444     else
0445         return;
0446 
0447     kt = ktime_sub(ktime_get(), mw->mw_lock_start);
0448     usec = ktime_to_us(kt);
0449 
0450     stats->ls_gets++;
0451     stats->ls_total += ktime_to_ns(kt);
0452     /* overflow */
0453     if (unlikely(stats->ls_gets == 0)) {
0454         stats->ls_gets++;
0455         stats->ls_total = ktime_to_ns(kt);
0456     }
0457 
0458     if (stats->ls_max < usec)
0459         stats->ls_max = usec;
0460 
0461     if (ret)
0462         stats->ls_fail++;
0463 
0464     stats->ls_last = ktime_to_us(ktime_get_real());
0465 }
0466 
0467 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
0468 {
0469     lockres->l_lock_refresh++;
0470 }
0471 
0472 static inline void ocfs2_track_lock_wait(struct ocfs2_lock_res *lockres)
0473 {
0474     struct ocfs2_mask_waiter *mw;
0475 
0476     if (list_empty(&lockres->l_mask_waiters)) {
0477         lockres->l_lock_wait = 0;
0478         return;
0479     }
0480 
0481     mw = list_first_entry(&lockres->l_mask_waiters,
0482                 struct ocfs2_mask_waiter, mw_item);
0483     lockres->l_lock_wait =
0484             ktime_to_us(ktime_mono_to_real(mw->mw_lock_start));
0485 }
0486 
0487 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
0488 {
0489     mw->mw_lock_start = ktime_get();
0490 }
0491 #else
0492 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
0493 {
0494 }
0495 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
0496                int level, struct ocfs2_mask_waiter *mw, int ret)
0497 {
0498 }
0499 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
0500 {
0501 }
0502 static inline void ocfs2_track_lock_wait(struct ocfs2_lock_res *lockres)
0503 {
0504 }
0505 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
0506 {
0507 }
0508 #endif
0509 
0510 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
0511                        struct ocfs2_lock_res *res,
0512                        enum ocfs2_lock_type type,
0513                        struct ocfs2_lock_res_ops *ops,
0514                        void *priv)
0515 {
0516     res->l_type          = type;
0517     res->l_ops           = ops;
0518     res->l_priv          = priv;
0519 
0520     res->l_level         = DLM_LOCK_IV;
0521     res->l_requested     = DLM_LOCK_IV;
0522     res->l_blocking      = DLM_LOCK_IV;
0523     res->l_action        = OCFS2_AST_INVALID;
0524     res->l_unlock_action = OCFS2_UNLOCK_INVALID;
0525 
0526     res->l_flags         = OCFS2_LOCK_INITIALIZED;
0527 
0528     ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
0529 
0530     ocfs2_init_lock_stats(res);
0531 #ifdef CONFIG_DEBUG_LOCK_ALLOC
0532     if (type != OCFS2_LOCK_TYPE_OPEN)
0533         lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type],
0534                  &lockdep_keys[type], 0);
0535     else
0536         res->l_lockdep_map.key = NULL;
0537 #endif
0538 }
0539 
0540 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
0541 {
0542     /* This also clears out the lock status block */
0543     memset(res, 0, sizeof(struct ocfs2_lock_res));
0544     spin_lock_init(&res->l_lock);
0545     init_waitqueue_head(&res->l_event);
0546     INIT_LIST_HEAD(&res->l_blocked_list);
0547     INIT_LIST_HEAD(&res->l_mask_waiters);
0548     INIT_LIST_HEAD(&res->l_holders);
0549 }
0550 
0551 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
0552                    enum ocfs2_lock_type type,
0553                    unsigned int generation,
0554                    struct inode *inode)
0555 {
0556     struct ocfs2_lock_res_ops *ops;
0557 
0558     switch(type) {
0559         case OCFS2_LOCK_TYPE_RW:
0560             ops = &ocfs2_inode_rw_lops;
0561             break;
0562         case OCFS2_LOCK_TYPE_META:
0563             ops = &ocfs2_inode_inode_lops;
0564             break;
0565         case OCFS2_LOCK_TYPE_OPEN:
0566             ops = &ocfs2_inode_open_lops;
0567             break;
0568         default:
0569             mlog_bug_on_msg(1, "type: %d\n", type);
0570             ops = NULL; /* thanks, gcc */
0571             break;
0572     }
0573 
0574     ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
0575                   generation, res->l_name);
0576     ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
0577 }
0578 
0579 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
0580 {
0581     struct inode *inode = ocfs2_lock_res_inode(lockres);
0582 
0583     return OCFS2_SB(inode->i_sb);
0584 }
0585 
0586 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres)
0587 {
0588     struct ocfs2_mem_dqinfo *info = lockres->l_priv;
0589 
0590     return OCFS2_SB(info->dqi_gi.dqi_sb);
0591 }
0592 
0593 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
0594 {
0595     struct ocfs2_file_private *fp = lockres->l_priv;
0596 
0597     return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
0598 }
0599 
0600 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
0601 {
0602     __be64 inode_blkno_be;
0603 
0604     memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
0605            sizeof(__be64));
0606 
0607     return be64_to_cpu(inode_blkno_be);
0608 }
0609 
0610 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
0611 {
0612     struct ocfs2_dentry_lock *dl = lockres->l_priv;
0613 
0614     return OCFS2_SB(dl->dl_inode->i_sb);
0615 }
0616 
0617 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
0618                 u64 parent, struct inode *inode)
0619 {
0620     int len;
0621     u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
0622     __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
0623     struct ocfs2_lock_res *lockres = &dl->dl_lockres;
0624 
0625     ocfs2_lock_res_init_once(lockres);
0626 
0627     /*
0628      * Unfortunately, the standard lock naming scheme won't work
0629      * here because we have two 16 byte values to use. Instead,
0630      * we'll stuff the inode number as a binary value. We still
0631      * want error prints to show something without garbling the
0632      * display, so drop a null byte in there before the inode
0633      * number. A future version of OCFS2 will likely use all
0634      * binary lock names. The stringified names have been a
0635      * tremendous aid in debugging, but now that the debugfs
0636      * interface exists, we can mangle things there if need be.
0637      *
0638      * NOTE: We also drop the standard "pad" value (the total lock
0639      * name size stays the same though - the last part is all
0640      * zeros due to the memset in ocfs2_lock_res_init_once()
0641      */
0642     len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
0643                "%c%016llx",
0644                ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
0645                (long long)parent);
0646 
0647     BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
0648 
0649     memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
0650            sizeof(__be64));
0651 
0652     ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
0653                    OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
0654                    dl);
0655 }
0656 
0657 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
0658                       struct ocfs2_super *osb)
0659 {
0660     /* Superblock lockres doesn't come from a slab so we call init
0661      * once on it manually.  */
0662     ocfs2_lock_res_init_once(res);
0663     ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
0664                   0, res->l_name);
0665     ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
0666                    &ocfs2_super_lops, osb);
0667 }
0668 
0669 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
0670                        struct ocfs2_super *osb)
0671 {
0672     /* Rename lockres doesn't come from a slab so we call init
0673      * once on it manually.  */
0674     ocfs2_lock_res_init_once(res);
0675     ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
0676     ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
0677                    &ocfs2_rename_lops, osb);
0678 }
0679 
0680 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
0681                      struct ocfs2_super *osb)
0682 {
0683     /* nfs_sync lockres doesn't come from a slab so we call init
0684      * once on it manually.  */
0685     ocfs2_lock_res_init_once(res);
0686     ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name);
0687     ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC,
0688                    &ocfs2_nfs_sync_lops, osb);
0689 }
0690 
0691 static void ocfs2_nfs_sync_lock_init(struct ocfs2_super *osb)
0692 {
0693     ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
0694     init_rwsem(&osb->nfs_sync_rwlock);
0695 }
0696 
0697 void ocfs2_trim_fs_lock_res_init(struct ocfs2_super *osb)
0698 {
0699     struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
0700 
0701     /* Only one trimfs thread are allowed to work at the same time. */
0702     mutex_lock(&osb->obs_trim_fs_mutex);
0703 
0704     ocfs2_lock_res_init_once(lockres);
0705     ocfs2_build_lock_name(OCFS2_LOCK_TYPE_TRIM_FS, 0, 0, lockres->l_name);
0706     ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_TRIM_FS,
0707                    &ocfs2_trim_fs_lops, osb);
0708 }
0709 
0710 void ocfs2_trim_fs_lock_res_uninit(struct ocfs2_super *osb)
0711 {
0712     struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
0713 
0714     ocfs2_simple_drop_lockres(osb, lockres);
0715     ocfs2_lock_res_free(lockres);
0716 
0717     mutex_unlock(&osb->obs_trim_fs_mutex);
0718 }
0719 
0720 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
0721                         struct ocfs2_super *osb)
0722 {
0723     ocfs2_lock_res_init_once(res);
0724     ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
0725     ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
0726                    &ocfs2_orphan_scan_lops, osb);
0727 }
0728 
0729 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
0730                   struct ocfs2_file_private *fp)
0731 {
0732     struct inode *inode = fp->fp_file->f_mapping->host;
0733     struct ocfs2_inode_info *oi = OCFS2_I(inode);
0734 
0735     ocfs2_lock_res_init_once(lockres);
0736     ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
0737                   inode->i_generation, lockres->l_name);
0738     ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
0739                    OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
0740                    fp);
0741     lockres->l_flags |= OCFS2_LOCK_NOCACHE;
0742 }
0743 
0744 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres,
0745                    struct ocfs2_mem_dqinfo *info)
0746 {
0747     ocfs2_lock_res_init_once(lockres);
0748     ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type,
0749                   0, lockres->l_name);
0750     ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres,
0751                    OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops,
0752                    info);
0753 }
0754 
0755 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres,
0756                   struct ocfs2_super *osb, u64 ref_blkno,
0757                   unsigned int generation)
0758 {
0759     ocfs2_lock_res_init_once(lockres);
0760     ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno,
0761                   generation, lockres->l_name);
0762     ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT,
0763                    &ocfs2_refcount_block_lops, osb);
0764 }
0765 
0766 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
0767 {
0768     if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
0769         return;
0770 
0771     ocfs2_remove_lockres_tracking(res);
0772 
0773     mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
0774             "Lockres %s is on the blocked list\n",
0775             res->l_name);
0776     mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
0777             "Lockres %s has mask waiters pending\n",
0778             res->l_name);
0779     mlog_bug_on_msg(spin_is_locked(&res->l_lock),
0780             "Lockres %s is locked\n",
0781             res->l_name);
0782     mlog_bug_on_msg(res->l_ro_holders,
0783             "Lockres %s has %u ro holders\n",
0784             res->l_name, res->l_ro_holders);
0785     mlog_bug_on_msg(res->l_ex_holders,
0786             "Lockres %s has %u ex holders\n",
0787             res->l_name, res->l_ex_holders);
0788 
0789     /* Need to clear out the lock status block for the dlm */
0790     memset(&res->l_lksb, 0, sizeof(res->l_lksb));
0791 
0792     res->l_flags = 0UL;
0793 }
0794 
0795 /*
0796  * Keep a list of processes who have interest in a lockres.
0797  * Note: this is now only uesed for check recursive cluster locking.
0798  */
0799 static inline void ocfs2_add_holder(struct ocfs2_lock_res *lockres,
0800                    struct ocfs2_lock_holder *oh)
0801 {
0802     INIT_LIST_HEAD(&oh->oh_list);
0803     oh->oh_owner_pid = get_pid(task_pid(current));
0804 
0805     spin_lock(&lockres->l_lock);
0806     list_add_tail(&oh->oh_list, &lockres->l_holders);
0807     spin_unlock(&lockres->l_lock);
0808 }
0809 
0810 static struct ocfs2_lock_holder *
0811 ocfs2_pid_holder(struct ocfs2_lock_res *lockres,
0812         struct pid *pid)
0813 {
0814     struct ocfs2_lock_holder *oh;
0815 
0816     spin_lock(&lockres->l_lock);
0817     list_for_each_entry(oh, &lockres->l_holders, oh_list) {
0818         if (oh->oh_owner_pid == pid) {
0819             spin_unlock(&lockres->l_lock);
0820             return oh;
0821         }
0822     }
0823     spin_unlock(&lockres->l_lock);
0824     return NULL;
0825 }
0826 
0827 static inline void ocfs2_remove_holder(struct ocfs2_lock_res *lockres,
0828                        struct ocfs2_lock_holder *oh)
0829 {
0830     spin_lock(&lockres->l_lock);
0831     list_del(&oh->oh_list);
0832     spin_unlock(&lockres->l_lock);
0833 
0834     put_pid(oh->oh_owner_pid);
0835 }
0836 
0837 
0838 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
0839                      int level)
0840 {
0841     BUG_ON(!lockres);
0842 
0843     switch(level) {
0844     case DLM_LOCK_EX:
0845         lockres->l_ex_holders++;
0846         break;
0847     case DLM_LOCK_PR:
0848         lockres->l_ro_holders++;
0849         break;
0850     default:
0851         BUG();
0852     }
0853 }
0854 
0855 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
0856                      int level)
0857 {
0858     BUG_ON(!lockres);
0859 
0860     switch(level) {
0861     case DLM_LOCK_EX:
0862         BUG_ON(!lockres->l_ex_holders);
0863         lockres->l_ex_holders--;
0864         break;
0865     case DLM_LOCK_PR:
0866         BUG_ON(!lockres->l_ro_holders);
0867         lockres->l_ro_holders--;
0868         break;
0869     default:
0870         BUG();
0871     }
0872 }
0873 
0874 /* WARNING: This function lives in a world where the only three lock
0875  * levels are EX, PR, and NL. It *will* have to be adjusted when more
0876  * lock types are added. */
0877 static inline int ocfs2_highest_compat_lock_level(int level)
0878 {
0879     int new_level = DLM_LOCK_EX;
0880 
0881     if (level == DLM_LOCK_EX)
0882         new_level = DLM_LOCK_NL;
0883     else if (level == DLM_LOCK_PR)
0884         new_level = DLM_LOCK_PR;
0885     return new_level;
0886 }
0887 
0888 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
0889                   unsigned long newflags)
0890 {
0891     struct ocfs2_mask_waiter *mw, *tmp;
0892 
0893     assert_spin_locked(&lockres->l_lock);
0894 
0895     lockres->l_flags = newflags;
0896 
0897     list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
0898         if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
0899             continue;
0900 
0901         list_del_init(&mw->mw_item);
0902         mw->mw_status = 0;
0903         complete(&mw->mw_complete);
0904         ocfs2_track_lock_wait(lockres);
0905     }
0906 }
0907 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
0908 {
0909     lockres_set_flags(lockres, lockres->l_flags | or);
0910 }
0911 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
0912                 unsigned long clear)
0913 {
0914     lockres_set_flags(lockres, lockres->l_flags & ~clear);
0915 }
0916 
0917 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
0918 {
0919     BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
0920     BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
0921     BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
0922     BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
0923 
0924     lockres->l_level = lockres->l_requested;
0925     if (lockres->l_level <=
0926         ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
0927         lockres->l_blocking = DLM_LOCK_NL;
0928         lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
0929     }
0930     lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
0931 }
0932 
0933 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
0934 {
0935     BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
0936     BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
0937 
0938     /* Convert from RO to EX doesn't really need anything as our
0939      * information is already up to data. Convert from NL to
0940      * *anything* however should mark ourselves as needing an
0941      * update */
0942     if (lockres->l_level == DLM_LOCK_NL &&
0943         lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
0944         lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
0945 
0946     lockres->l_level = lockres->l_requested;
0947 
0948     /*
0949      * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing
0950      * the OCFS2_LOCK_BUSY flag to prevent the dc thread from
0951      * downconverting the lock before the upconvert has fully completed.
0952      * Do not prevent the dc thread from downconverting if NONBLOCK lock
0953      * had already returned.
0954      */
0955     if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED))
0956         lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
0957     else
0958         lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED);
0959 
0960     lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
0961 }
0962 
0963 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
0964 {
0965     BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
0966     BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
0967 
0968     if (lockres->l_requested > DLM_LOCK_NL &&
0969         !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
0970         lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
0971         lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
0972 
0973     lockres->l_level = lockres->l_requested;
0974     lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
0975     lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
0976 }
0977 
0978 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
0979                      int level)
0980 {
0981     int needs_downconvert = 0;
0982 
0983     assert_spin_locked(&lockres->l_lock);
0984 
0985     if (level > lockres->l_blocking) {
0986         /* only schedule a downconvert if we haven't already scheduled
0987          * one that goes low enough to satisfy the level we're
0988          * blocking.  this also catches the case where we get
0989          * duplicate BASTs */
0990         if (ocfs2_highest_compat_lock_level(level) <
0991             ocfs2_highest_compat_lock_level(lockres->l_blocking))
0992             needs_downconvert = 1;
0993 
0994         lockres->l_blocking = level;
0995     }
0996 
0997     mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n",
0998          lockres->l_name, level, lockres->l_level, lockres->l_blocking,
0999          needs_downconvert);
1000 
1001     if (needs_downconvert)
1002         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
1003     mlog(0, "needs_downconvert = %d\n", needs_downconvert);
1004     return needs_downconvert;
1005 }
1006 
1007 /*
1008  * OCFS2_LOCK_PENDING and l_pending_gen.
1009  *
1010  * Why does OCFS2_LOCK_PENDING exist?  To close a race between setting
1011  * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock().  See ocfs2_unblock_lock()
1012  * for more details on the race.
1013  *
1014  * OCFS2_LOCK_PENDING closes the race quite nicely.  However, it introduces
1015  * a race on itself.  In o2dlm, we can get the ast before ocfs2_dlm_lock()
1016  * returns.  The ast clears OCFS2_LOCK_BUSY, and must therefore clear
1017  * OCFS2_LOCK_PENDING at the same time.  When ocfs2_dlm_lock() returns,
1018  * the caller is going to try to clear PENDING again.  If nothing else is
1019  * happening, __lockres_clear_pending() sees PENDING is unset and does
1020  * nothing.
1021  *
1022  * But what if another path (eg downconvert thread) has just started a
1023  * new locking action?  The other path has re-set PENDING.  Our path
1024  * cannot clear PENDING, because that will re-open the original race
1025  * window.
1026  *
1027  * [Example]
1028  *
1029  * ocfs2_meta_lock()
1030  *  ocfs2_cluster_lock()
1031  *   set BUSY
1032  *   set PENDING
1033  *   drop l_lock
1034  *   ocfs2_dlm_lock()
1035  *    ocfs2_locking_ast()       ocfs2_downconvert_thread()
1036  *     clear PENDING             ocfs2_unblock_lock()
1037  *                    take_l_lock
1038  *                    !BUSY
1039  *                    ocfs2_prepare_downconvert()
1040  *                     set BUSY
1041  *                     set PENDING
1042  *                    drop l_lock
1043  *   take l_lock
1044  *   clear PENDING
1045  *   drop l_lock
1046  *          <window>
1047  *                    ocfs2_dlm_lock()
1048  *
1049  * So as you can see, we now have a window where l_lock is not held,
1050  * PENDING is not set, and ocfs2_dlm_lock() has not been called.
1051  *
1052  * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
1053  * set by ocfs2_prepare_downconvert().  That wasn't nice.
1054  *
1055  * To solve this we introduce l_pending_gen.  A call to
1056  * lockres_clear_pending() will only do so when it is passed a generation
1057  * number that matches the lockres.  lockres_set_pending() will return the
1058  * current generation number.  When ocfs2_cluster_lock() goes to clear
1059  * PENDING, it passes the generation it got from set_pending().  In our
1060  * example above, the generation numbers will *not* match.  Thus,
1061  * ocfs2_cluster_lock() will not clear the PENDING set by
1062  * ocfs2_prepare_downconvert().
1063  */
1064 
1065 /* Unlocked version for ocfs2_locking_ast() */
1066 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
1067                     unsigned int generation,
1068                     struct ocfs2_super *osb)
1069 {
1070     assert_spin_locked(&lockres->l_lock);
1071 
1072     /*
1073      * The ast and locking functions can race us here.  The winner
1074      * will clear pending, the loser will not.
1075      */
1076     if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
1077         (lockres->l_pending_gen != generation))
1078         return;
1079 
1080     lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
1081     lockres->l_pending_gen++;
1082 
1083     /*
1084      * The downconvert thread may have skipped us because we
1085      * were PENDING.  Wake it up.
1086      */
1087     if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1088         ocfs2_wake_downconvert_thread(osb);
1089 }
1090 
1091 /* Locked version for callers of ocfs2_dlm_lock() */
1092 static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
1093                   unsigned int generation,
1094                   struct ocfs2_super *osb)
1095 {
1096     unsigned long flags;
1097 
1098     spin_lock_irqsave(&lockres->l_lock, flags);
1099     __lockres_clear_pending(lockres, generation, osb);
1100     spin_unlock_irqrestore(&lockres->l_lock, flags);
1101 }
1102 
1103 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
1104 {
1105     assert_spin_locked(&lockres->l_lock);
1106     BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
1107 
1108     lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
1109 
1110     return lockres->l_pending_gen;
1111 }
1112 
1113 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
1114 {
1115     struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1116     struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1117     int needs_downconvert;
1118     unsigned long flags;
1119 
1120     BUG_ON(level <= DLM_LOCK_NL);
1121 
1122     mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, "
1123          "type %s\n", lockres->l_name, level, lockres->l_level,
1124          ocfs2_lock_type_string(lockres->l_type));
1125 
1126     /*
1127      * We can skip the bast for locks which don't enable caching -
1128      * they'll be dropped at the earliest possible time anyway.
1129      */
1130     if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
1131         return;
1132 
1133     spin_lock_irqsave(&lockres->l_lock, flags);
1134     needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
1135     if (needs_downconvert)
1136         ocfs2_schedule_blocked_lock(osb, lockres);
1137     spin_unlock_irqrestore(&lockres->l_lock, flags);
1138 
1139     wake_up(&lockres->l_event);
1140 
1141     ocfs2_wake_downconvert_thread(osb);
1142 }
1143 
1144 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb)
1145 {
1146     struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1147     struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1148     unsigned long flags;
1149     int status;
1150 
1151     spin_lock_irqsave(&lockres->l_lock, flags);
1152 
1153     status = ocfs2_dlm_lock_status(&lockres->l_lksb);
1154 
1155     if (status == -EAGAIN) {
1156         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1157         goto out;
1158     }
1159 
1160     if (status) {
1161         mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
1162              lockres->l_name, status);
1163         spin_unlock_irqrestore(&lockres->l_lock, flags);
1164         return;
1165     }
1166 
1167     mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, "
1168          "level %d => %d\n", lockres->l_name, lockres->l_action,
1169          lockres->l_unlock_action, lockres->l_level, lockres->l_requested);
1170 
1171     switch(lockres->l_action) {
1172     case OCFS2_AST_ATTACH:
1173         ocfs2_generic_handle_attach_action(lockres);
1174         lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
1175         break;
1176     case OCFS2_AST_CONVERT:
1177         ocfs2_generic_handle_convert_action(lockres);
1178         break;
1179     case OCFS2_AST_DOWNCONVERT:
1180         ocfs2_generic_handle_downconvert_action(lockres);
1181         break;
1182     default:
1183         mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, "
1184              "flags 0x%lx, unlock: %u\n",
1185              lockres->l_name, lockres->l_action, lockres->l_flags,
1186              lockres->l_unlock_action);
1187         BUG();
1188     }
1189 out:
1190     /* set it to something invalid so if we get called again we
1191      * can catch it. */
1192     lockres->l_action = OCFS2_AST_INVALID;
1193 
1194     /* Did we try to cancel this lock?  Clear that state */
1195     if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
1196         lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1197 
1198     /*
1199      * We may have beaten the locking functions here.  We certainly
1200      * know that dlm_lock() has been called :-)
1201      * Because we can't have two lock calls in flight at once, we
1202      * can use lockres->l_pending_gen.
1203      */
1204     __lockres_clear_pending(lockres, lockres->l_pending_gen,  osb);
1205 
1206     wake_up(&lockres->l_event);
1207     spin_unlock_irqrestore(&lockres->l_lock, flags);
1208 }
1209 
1210 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
1211 {
1212     struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1213     unsigned long flags;
1214 
1215     mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n",
1216          lockres->l_name, lockres->l_unlock_action);
1217 
1218     spin_lock_irqsave(&lockres->l_lock, flags);
1219     if (error) {
1220         mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
1221              "unlock_action %d\n", error, lockres->l_name,
1222              lockres->l_unlock_action);
1223         spin_unlock_irqrestore(&lockres->l_lock, flags);
1224         return;
1225     }
1226 
1227     switch(lockres->l_unlock_action) {
1228     case OCFS2_UNLOCK_CANCEL_CONVERT:
1229         mlog(0, "Cancel convert success for %s\n", lockres->l_name);
1230         lockres->l_action = OCFS2_AST_INVALID;
1231         /* Downconvert thread may have requeued this lock, we
1232          * need to wake it. */
1233         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1234             ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
1235         break;
1236     case OCFS2_UNLOCK_DROP_LOCK:
1237         lockres->l_level = DLM_LOCK_IV;
1238         break;
1239     default:
1240         BUG();
1241     }
1242 
1243     lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1244     lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1245     wake_up(&lockres->l_event);
1246     spin_unlock_irqrestore(&lockres->l_lock, flags);
1247 }
1248 
1249 /*
1250  * This is the filesystem locking protocol.  It provides the lock handling
1251  * hooks for the underlying DLM.  It has a maximum version number.
1252  * The version number allows interoperability with systems running at
1253  * the same major number and an equal or smaller minor number.
1254  *
1255  * Whenever the filesystem does new things with locks (adds or removes a
1256  * lock, orders them differently, does different things underneath a lock),
1257  * the version must be changed.  The protocol is negotiated when joining
1258  * the dlm domain.  A node may join the domain if its major version is
1259  * identical to all other nodes and its minor version is greater than
1260  * or equal to all other nodes.  When its minor version is greater than
1261  * the other nodes, it will run at the minor version specified by the
1262  * other nodes.
1263  *
1264  * If a locking change is made that will not be compatible with older
1265  * versions, the major number must be increased and the minor version set
1266  * to zero.  If a change merely adds a behavior that can be disabled when
1267  * speaking to older versions, the minor version must be increased.  If a
1268  * change adds a fully backwards compatible change (eg, LVB changes that
1269  * are just ignored by older versions), the version does not need to be
1270  * updated.
1271  */
1272 static struct ocfs2_locking_protocol lproto = {
1273     .lp_max_version = {
1274         .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
1275         .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
1276     },
1277     .lp_lock_ast        = ocfs2_locking_ast,
1278     .lp_blocking_ast    = ocfs2_blocking_ast,
1279     .lp_unlock_ast      = ocfs2_unlock_ast,
1280 };
1281 
1282 void ocfs2_set_locking_protocol(void)
1283 {
1284     ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
1285 }
1286 
1287 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
1288                         int convert)
1289 {
1290     unsigned long flags;
1291 
1292     spin_lock_irqsave(&lockres->l_lock, flags);
1293     lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1294     lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1295     if (convert)
1296         lockres->l_action = OCFS2_AST_INVALID;
1297     else
1298         lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1299     spin_unlock_irqrestore(&lockres->l_lock, flags);
1300 
1301     wake_up(&lockres->l_event);
1302 }
1303 
1304 /* Note: If we detect another process working on the lock (i.e.,
1305  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
1306  * to do the right thing in that case.
1307  */
1308 static int ocfs2_lock_create(struct ocfs2_super *osb,
1309                  struct ocfs2_lock_res *lockres,
1310                  int level,
1311                  u32 dlm_flags)
1312 {
1313     int ret = 0;
1314     unsigned long flags;
1315     unsigned int gen;
1316 
1317     mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
1318          dlm_flags);
1319 
1320     spin_lock_irqsave(&lockres->l_lock, flags);
1321     if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
1322         (lockres->l_flags & OCFS2_LOCK_BUSY)) {
1323         spin_unlock_irqrestore(&lockres->l_lock, flags);
1324         goto bail;
1325     }
1326 
1327     lockres->l_action = OCFS2_AST_ATTACH;
1328     lockres->l_requested = level;
1329     lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1330     gen = lockres_set_pending(lockres);
1331     spin_unlock_irqrestore(&lockres->l_lock, flags);
1332 
1333     ret = ocfs2_dlm_lock(osb->cconn,
1334                  level,
1335                  &lockres->l_lksb,
1336                  dlm_flags,
1337                  lockres->l_name,
1338                  OCFS2_LOCK_ID_MAX_LEN - 1);
1339     lockres_clear_pending(lockres, gen, osb);
1340     if (ret) {
1341         ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1342         ocfs2_recover_from_dlm_error(lockres, 1);
1343     }
1344 
1345     mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
1346 
1347 bail:
1348     return ret;
1349 }
1350 
1351 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
1352                     int flag)
1353 {
1354     unsigned long flags;
1355     int ret;
1356 
1357     spin_lock_irqsave(&lockres->l_lock, flags);
1358     ret = lockres->l_flags & flag;
1359     spin_unlock_irqrestore(&lockres->l_lock, flags);
1360 
1361     return ret;
1362 }
1363 
1364 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
1365 
1366 {
1367     wait_event(lockres->l_event,
1368            !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
1369 }
1370 
1371 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
1372 
1373 {
1374     wait_event(lockres->l_event,
1375            !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
1376 }
1377 
1378 /* predict what lock level we'll be dropping down to on behalf
1379  * of another node, and return true if the currently wanted
1380  * level will be compatible with it. */
1381 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
1382                              int wanted)
1383 {
1384     BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
1385 
1386     return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
1387 }
1388 
1389 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
1390 {
1391     INIT_LIST_HEAD(&mw->mw_item);
1392     init_completion(&mw->mw_complete);
1393     ocfs2_init_start_time(mw);
1394 }
1395 
1396 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
1397 {
1398     wait_for_completion(&mw->mw_complete);
1399     /* Re-arm the completion in case we want to wait on it again */
1400     reinit_completion(&mw->mw_complete);
1401     return mw->mw_status;
1402 }
1403 
1404 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
1405                     struct ocfs2_mask_waiter *mw,
1406                     unsigned long mask,
1407                     unsigned long goal)
1408 {
1409     BUG_ON(!list_empty(&mw->mw_item));
1410 
1411     assert_spin_locked(&lockres->l_lock);
1412 
1413     list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
1414     mw->mw_mask = mask;
1415     mw->mw_goal = goal;
1416     ocfs2_track_lock_wait(lockres);
1417 }
1418 
1419 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
1420  * if the mask still hadn't reached its goal */
1421 static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1422                       struct ocfs2_mask_waiter *mw)
1423 {
1424     int ret = 0;
1425 
1426     assert_spin_locked(&lockres->l_lock);
1427     if (!list_empty(&mw->mw_item)) {
1428         if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
1429             ret = -EBUSY;
1430 
1431         list_del_init(&mw->mw_item);
1432         init_completion(&mw->mw_complete);
1433         ocfs2_track_lock_wait(lockres);
1434     }
1435 
1436     return ret;
1437 }
1438 
1439 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1440                       struct ocfs2_mask_waiter *mw)
1441 {
1442     unsigned long flags;
1443     int ret = 0;
1444 
1445     spin_lock_irqsave(&lockres->l_lock, flags);
1446     ret = __lockres_remove_mask_waiter(lockres, mw);
1447     spin_unlock_irqrestore(&lockres->l_lock, flags);
1448 
1449     return ret;
1450 
1451 }
1452 
1453 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
1454                          struct ocfs2_lock_res *lockres)
1455 {
1456     int ret;
1457 
1458     ret = wait_for_completion_interruptible(&mw->mw_complete);
1459     if (ret)
1460         lockres_remove_mask_waiter(lockres, mw);
1461     else
1462         ret = mw->mw_status;
1463     /* Re-arm the completion in case we want to wait on it again */
1464     reinit_completion(&mw->mw_complete);
1465     return ret;
1466 }
1467 
1468 static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
1469                 struct ocfs2_lock_res *lockres,
1470                 int level,
1471                 u32 lkm_flags,
1472                 int arg_flags,
1473                 int l_subclass,
1474                 unsigned long caller_ip)
1475 {
1476     struct ocfs2_mask_waiter mw;
1477     int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1478     int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1479     unsigned long flags;
1480     unsigned int gen;
1481     int noqueue_attempted = 0;
1482     int dlm_locked = 0;
1483     int kick_dc = 0;
1484 
1485     if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) {
1486         mlog_errno(-EINVAL);
1487         return -EINVAL;
1488     }
1489 
1490     ocfs2_init_mask_waiter(&mw);
1491 
1492     if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1493         lkm_flags |= DLM_LKF_VALBLK;
1494 
1495 again:
1496     wait = 0;
1497 
1498     spin_lock_irqsave(&lockres->l_lock, flags);
1499 
1500     if (catch_signals && signal_pending(current)) {
1501         ret = -ERESTARTSYS;
1502         goto unlock;
1503     }
1504 
1505     mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1506             "Cluster lock called on freeing lockres %s! flags "
1507             "0x%lx\n", lockres->l_name, lockres->l_flags);
1508 
1509     /* We only compare against the currently granted level
1510      * here. If the lock is blocked waiting on a downconvert,
1511      * we'll get caught below. */
1512     if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1513         level > lockres->l_level) {
1514         /* is someone sitting in dlm_lock? If so, wait on
1515          * them. */
1516         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1517         wait = 1;
1518         goto unlock;
1519     }
1520 
1521     if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) {
1522         /*
1523          * We've upconverted. If the lock now has a level we can
1524          * work with, we take it. If, however, the lock is not at the
1525          * required level, we go thru the full cycle. One way this could
1526          * happen is if a process requesting an upconvert to PR is
1527          * closely followed by another requesting upconvert to an EX.
1528          * If the process requesting EX lands here, we want it to
1529          * continue attempting to upconvert and let the process
1530          * requesting PR take the lock.
1531          * If multiple processes request upconvert to PR, the first one
1532          * here will take the lock. The others will have to go thru the
1533          * OCFS2_LOCK_BLOCKED check to ensure that there is no pending
1534          * downconvert request.
1535          */
1536         if (level <= lockres->l_level)
1537             goto update_holders;
1538     }
1539 
1540     if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1541         !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1542         /* is the lock is currently blocked on behalf of
1543          * another node */
1544         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1545         wait = 1;
1546         goto unlock;
1547     }
1548 
1549     if (level > lockres->l_level) {
1550         if (noqueue_attempted > 0) {
1551             ret = -EAGAIN;
1552             goto unlock;
1553         }
1554         if (lkm_flags & DLM_LKF_NOQUEUE)
1555             noqueue_attempted = 1;
1556 
1557         if (lockres->l_action != OCFS2_AST_INVALID)
1558             mlog(ML_ERROR, "lockres %s has action %u pending\n",
1559                  lockres->l_name, lockres->l_action);
1560 
1561         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1562             lockres->l_action = OCFS2_AST_ATTACH;
1563             lkm_flags &= ~DLM_LKF_CONVERT;
1564         } else {
1565             lockres->l_action = OCFS2_AST_CONVERT;
1566             lkm_flags |= DLM_LKF_CONVERT;
1567         }
1568 
1569         lockres->l_requested = level;
1570         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1571         gen = lockres_set_pending(lockres);
1572         spin_unlock_irqrestore(&lockres->l_lock, flags);
1573 
1574         BUG_ON(level == DLM_LOCK_IV);
1575         BUG_ON(level == DLM_LOCK_NL);
1576 
1577         mlog(ML_BASTS, "lockres %s, convert from %d to %d\n",
1578              lockres->l_name, lockres->l_level, level);
1579 
1580         /* call dlm_lock to upgrade lock now */
1581         ret = ocfs2_dlm_lock(osb->cconn,
1582                      level,
1583                      &lockres->l_lksb,
1584                      lkm_flags,
1585                      lockres->l_name,
1586                      OCFS2_LOCK_ID_MAX_LEN - 1);
1587         lockres_clear_pending(lockres, gen, osb);
1588         if (ret) {
1589             if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1590                 (ret != -EAGAIN)) {
1591                 ocfs2_log_dlm_error("ocfs2_dlm_lock",
1592                             ret, lockres);
1593             }
1594             ocfs2_recover_from_dlm_error(lockres, 1);
1595             goto out;
1596         }
1597         dlm_locked = 1;
1598 
1599         mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n",
1600              lockres->l_name);
1601 
1602         /* At this point we've gone inside the dlm and need to
1603          * complete our work regardless. */
1604         catch_signals = 0;
1605 
1606         /* wait for busy to clear and carry on */
1607         goto again;
1608     }
1609 
1610 update_holders:
1611     /* Ok, if we get here then we're good to go. */
1612     ocfs2_inc_holders(lockres, level);
1613 
1614     ret = 0;
1615 unlock:
1616     lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1617 
1618     /* ocfs2_unblock_lock reques on seeing OCFS2_LOCK_UPCONVERT_FINISHING */
1619     kick_dc = (lockres->l_flags & OCFS2_LOCK_BLOCKED);
1620 
1621     spin_unlock_irqrestore(&lockres->l_lock, flags);
1622     if (kick_dc)
1623         ocfs2_wake_downconvert_thread(osb);
1624 out:
1625     /*
1626      * This is helping work around a lock inversion between the page lock
1627      * and dlm locks.  One path holds the page lock while calling aops
1628      * which block acquiring dlm locks.  The voting thread holds dlm
1629      * locks while acquiring page locks while down converting data locks.
1630      * This block is helping an aop path notice the inversion and back
1631      * off to unlock its page lock before trying the dlm lock again.
1632      */
1633     if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1634         mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1635         wait = 0;
1636         spin_lock_irqsave(&lockres->l_lock, flags);
1637         if (__lockres_remove_mask_waiter(lockres, &mw)) {
1638             if (dlm_locked)
1639                 lockres_or_flags(lockres,
1640                     OCFS2_LOCK_NONBLOCK_FINISHED);
1641             spin_unlock_irqrestore(&lockres->l_lock, flags);
1642             ret = -EAGAIN;
1643         } else {
1644             spin_unlock_irqrestore(&lockres->l_lock, flags);
1645             goto again;
1646         }
1647     }
1648     if (wait) {
1649         ret = ocfs2_wait_for_mask(&mw);
1650         if (ret == 0)
1651             goto again;
1652         mlog_errno(ret);
1653     }
1654     ocfs2_update_lock_stats(lockres, level, &mw, ret);
1655 
1656 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1657     if (!ret && lockres->l_lockdep_map.key != NULL) {
1658         if (level == DLM_LOCK_PR)
1659             rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass,
1660                 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1661                 caller_ip);
1662         else
1663             rwsem_acquire(&lockres->l_lockdep_map, l_subclass,
1664                 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1665                 caller_ip);
1666     }
1667 #endif
1668     return ret;
1669 }
1670 
1671 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb,
1672                      struct ocfs2_lock_res *lockres,
1673                      int level,
1674                      u32 lkm_flags,
1675                      int arg_flags)
1676 {
1677     return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags,
1678                     0, _RET_IP_);
1679 }
1680 
1681 
1682 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
1683                    struct ocfs2_lock_res *lockres,
1684                    int level,
1685                    unsigned long caller_ip)
1686 {
1687     unsigned long flags;
1688 
1689     spin_lock_irqsave(&lockres->l_lock, flags);
1690     ocfs2_dec_holders(lockres, level);
1691     ocfs2_downconvert_on_unlock(osb, lockres);
1692     spin_unlock_irqrestore(&lockres->l_lock, flags);
1693 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1694     if (lockres->l_lockdep_map.key != NULL)
1695         rwsem_release(&lockres->l_lockdep_map, caller_ip);
1696 #endif
1697 }
1698 
1699 static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1700                  struct ocfs2_lock_res *lockres,
1701                  int ex,
1702                  int local)
1703 {
1704     int level =  ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1705     unsigned long flags;
1706     u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
1707 
1708     spin_lock_irqsave(&lockres->l_lock, flags);
1709     BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1710     lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1711     spin_unlock_irqrestore(&lockres->l_lock, flags);
1712 
1713     return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1714 }
1715 
1716 /* Grants us an EX lock on the data and metadata resources, skipping
1717  * the normal cluster directory lookup. Use this ONLY on newly created
1718  * inodes which other nodes can't possibly see, and which haven't been
1719  * hashed in the inode hash yet. This can give us a good performance
1720  * increase as it'll skip the network broadcast normally associated
1721  * with creating a new lock resource. */
1722 int ocfs2_create_new_inode_locks(struct inode *inode)
1723 {
1724     int ret;
1725     struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1726 
1727     BUG_ON(!ocfs2_inode_is_new(inode));
1728 
1729     mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1730 
1731     /* NOTE: That we don't increment any of the holder counts, nor
1732      * do we add anything to a journal handle. Since this is
1733      * supposed to be a new inode which the cluster doesn't know
1734      * about yet, there is no need to.  As far as the LVB handling
1735      * is concerned, this is basically like acquiring an EX lock
1736      * on a resource which has an invalid one -- we'll set it
1737      * valid when we release the EX. */
1738 
1739     ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1740     if (ret) {
1741         mlog_errno(ret);
1742         goto bail;
1743     }
1744 
1745     /*
1746      * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
1747      * don't use a generation in their lock names.
1748      */
1749     ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1750     if (ret) {
1751         mlog_errno(ret);
1752         goto bail;
1753     }
1754 
1755     ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1756     if (ret)
1757         mlog_errno(ret);
1758 
1759 bail:
1760     return ret;
1761 }
1762 
1763 int ocfs2_rw_lock(struct inode *inode, int write)
1764 {
1765     int status, level;
1766     struct ocfs2_lock_res *lockres;
1767     struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1768 
1769     mlog(0, "inode %llu take %s RW lock\n",
1770          (unsigned long long)OCFS2_I(inode)->ip_blkno,
1771          write ? "EXMODE" : "PRMODE");
1772 
1773     if (ocfs2_mount_local(osb))
1774         return 0;
1775 
1776     lockres = &OCFS2_I(inode)->ip_rw_lockres;
1777 
1778     level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1779 
1780     status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1781     if (status < 0)
1782         mlog_errno(status);
1783 
1784     return status;
1785 }
1786 
1787 int ocfs2_try_rw_lock(struct inode *inode, int write)
1788 {
1789     int status, level;
1790     struct ocfs2_lock_res *lockres;
1791     struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1792 
1793     mlog(0, "inode %llu try to take %s RW lock\n",
1794          (unsigned long long)OCFS2_I(inode)->ip_blkno,
1795          write ? "EXMODE" : "PRMODE");
1796 
1797     if (ocfs2_mount_local(osb))
1798         return 0;
1799 
1800     lockres = &OCFS2_I(inode)->ip_rw_lockres;
1801 
1802     level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1803 
1804     status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0);
1805     return status;
1806 }
1807 
1808 void ocfs2_rw_unlock(struct inode *inode, int write)
1809 {
1810     int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1811     struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1812     struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1813 
1814     mlog(0, "inode %llu drop %s RW lock\n",
1815          (unsigned long long)OCFS2_I(inode)->ip_blkno,
1816          write ? "EXMODE" : "PRMODE");
1817 
1818     if (!ocfs2_mount_local(osb))
1819         ocfs2_cluster_unlock(osb, lockres, level);
1820 }
1821 
1822 /*
1823  * ocfs2_open_lock always get PR mode lock.
1824  */
1825 int ocfs2_open_lock(struct inode *inode)
1826 {
1827     int status = 0;
1828     struct ocfs2_lock_res *lockres;
1829     struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1830 
1831     mlog(0, "inode %llu take PRMODE open lock\n",
1832          (unsigned long long)OCFS2_I(inode)->ip_blkno);
1833 
1834     if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb))
1835         goto out;
1836 
1837     lockres = &OCFS2_I(inode)->ip_open_lockres;
1838 
1839     status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_PR, 0, 0);
1840     if (status < 0)
1841         mlog_errno(status);
1842 
1843 out:
1844     return status;
1845 }
1846 
1847 int ocfs2_try_open_lock(struct inode *inode, int write)
1848 {
1849     int status = 0, level;
1850     struct ocfs2_lock_res *lockres;
1851     struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1852 
1853     mlog(0, "inode %llu try to take %s open lock\n",
1854          (unsigned long long)OCFS2_I(inode)->ip_blkno,
1855          write ? "EXMODE" : "PRMODE");
1856 
1857     if (ocfs2_is_hard_readonly(osb)) {
1858         if (write)
1859             status = -EROFS;
1860         goto out;
1861     }
1862 
1863     if (ocfs2_mount_local(osb))
1864         goto out;
1865 
1866     lockres = &OCFS2_I(inode)->ip_open_lockres;
1867 
1868     level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1869 
1870     /*
1871      * The file system may already holding a PRMODE/EXMODE open lock.
1872      * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
1873      * other nodes and the -EAGAIN will indicate to the caller that
1874      * this inode is still in use.
1875      */
1876     status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0);
1877 
1878 out:
1879     return status;
1880 }
1881 
1882 /*
1883  * ocfs2_open_unlock unlock PR and EX mode open locks.
1884  */
1885 void ocfs2_open_unlock(struct inode *inode)
1886 {
1887     struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1888     struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1889 
1890     mlog(0, "inode %llu drop open lock\n",
1891          (unsigned long long)OCFS2_I(inode)->ip_blkno);
1892 
1893     if (ocfs2_mount_local(osb))
1894         goto out;
1895 
1896     if(lockres->l_ro_holders)
1897         ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_PR);
1898     if(lockres->l_ex_holders)
1899         ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
1900 
1901 out:
1902     return;
1903 }
1904 
1905 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1906                      int level)
1907 {
1908     int ret;
1909     struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1910     unsigned long flags;
1911     struct ocfs2_mask_waiter mw;
1912 
1913     ocfs2_init_mask_waiter(&mw);
1914 
1915 retry_cancel:
1916     spin_lock_irqsave(&lockres->l_lock, flags);
1917     if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1918         ret = ocfs2_prepare_cancel_convert(osb, lockres);
1919         if (ret) {
1920             spin_unlock_irqrestore(&lockres->l_lock, flags);
1921             ret = ocfs2_cancel_convert(osb, lockres);
1922             if (ret < 0) {
1923                 mlog_errno(ret);
1924                 goto out;
1925             }
1926             goto retry_cancel;
1927         }
1928         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1929         spin_unlock_irqrestore(&lockres->l_lock, flags);
1930 
1931         ocfs2_wait_for_mask(&mw);
1932         goto retry_cancel;
1933     }
1934 
1935     ret = -ERESTARTSYS;
1936     /*
1937      * We may still have gotten the lock, in which case there's no
1938      * point to restarting the syscall.
1939      */
1940     if (lockres->l_level == level)
1941         ret = 0;
1942 
1943     mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1944          lockres->l_flags, lockres->l_level, lockres->l_action);
1945 
1946     spin_unlock_irqrestore(&lockres->l_lock, flags);
1947 
1948 out:
1949     return ret;
1950 }
1951 
1952 /*
1953  * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1954  * flock() calls. The locking approach this requires is sufficiently
1955  * different from all other cluster lock types that we implement a
1956  * separate path to the "low-level" dlm calls. In particular:
1957  *
1958  * - No optimization of lock levels is done - we take at exactly
1959  *   what's been requested.
1960  *
1961  * - No lock caching is employed. We immediately downconvert to
1962  *   no-lock at unlock time. This also means flock locks never go on
1963  *   the blocking list).
1964  *
1965  * - Since userspace can trivially deadlock itself with flock, we make
1966  *   sure to allow cancellation of a misbehaving applications flock()
1967  *   request.
1968  *
1969  * - Access to any flock lockres doesn't require concurrency, so we
1970  *   can simplify the code by requiring the caller to guarantee
1971  *   serialization of dlmglue flock calls.
1972  */
1973 int ocfs2_file_lock(struct file *file, int ex, int trylock)
1974 {
1975     int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1976     unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
1977     unsigned long flags;
1978     struct ocfs2_file_private *fp = file->private_data;
1979     struct ocfs2_lock_res *lockres = &fp->fp_flock;
1980     struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1981     struct ocfs2_mask_waiter mw;
1982 
1983     ocfs2_init_mask_waiter(&mw);
1984 
1985     if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1986         (lockres->l_level > DLM_LOCK_NL)) {
1987         mlog(ML_ERROR,
1988              "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1989              "level: %u\n", lockres->l_name, lockres->l_flags,
1990              lockres->l_level);
1991         return -EINVAL;
1992     }
1993 
1994     spin_lock_irqsave(&lockres->l_lock, flags);
1995     if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1996         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1997         spin_unlock_irqrestore(&lockres->l_lock, flags);
1998 
1999         /*
2000          * Get the lock at NLMODE to start - that way we
2001          * can cancel the upconvert request if need be.
2002          */
2003         ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
2004         if (ret < 0) {
2005             mlog_errno(ret);
2006             goto out;
2007         }
2008 
2009         ret = ocfs2_wait_for_mask(&mw);
2010         if (ret) {
2011             mlog_errno(ret);
2012             goto out;
2013         }
2014         spin_lock_irqsave(&lockres->l_lock, flags);
2015     }
2016 
2017     lockres->l_action = OCFS2_AST_CONVERT;
2018     lkm_flags |= DLM_LKF_CONVERT;
2019     lockres->l_requested = level;
2020     lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2021 
2022     lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
2023     spin_unlock_irqrestore(&lockres->l_lock, flags);
2024 
2025     ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
2026                  lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1);
2027     if (ret) {
2028         if (!trylock || (ret != -EAGAIN)) {
2029             ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
2030             ret = -EINVAL;
2031         }
2032 
2033         ocfs2_recover_from_dlm_error(lockres, 1);
2034         lockres_remove_mask_waiter(lockres, &mw);
2035         goto out;
2036     }
2037 
2038     ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
2039     if (ret == -ERESTARTSYS) {
2040         /*
2041          * Userspace can cause deadlock itself with
2042          * flock(). Current behavior locally is to allow the
2043          * deadlock, but abort the system call if a signal is
2044          * received. We follow this example, otherwise a
2045          * poorly written program could sit in kernel until
2046          * reboot.
2047          *
2048          * Handling this is a bit more complicated for Ocfs2
2049          * though. We can't exit this function with an
2050          * outstanding lock request, so a cancel convert is
2051          * required. We intentionally overwrite 'ret' - if the
2052          * cancel fails and the lock was granted, it's easier
2053          * to just bubble success back up to the user.
2054          */
2055         ret = ocfs2_flock_handle_signal(lockres, level);
2056     } else if (!ret && (level > lockres->l_level)) {
2057         /* Trylock failed asynchronously */
2058         BUG_ON(!trylock);
2059         ret = -EAGAIN;
2060     }
2061 
2062 out:
2063 
2064     mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
2065          lockres->l_name, ex, trylock, ret);
2066     return ret;
2067 }
2068 
2069 void ocfs2_file_unlock(struct file *file)
2070 {
2071     int ret;
2072     unsigned int gen;
2073     unsigned long flags;
2074     struct ocfs2_file_private *fp = file->private_data;
2075     struct ocfs2_lock_res *lockres = &fp->fp_flock;
2076     struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
2077     struct ocfs2_mask_waiter mw;
2078 
2079     ocfs2_init_mask_waiter(&mw);
2080 
2081     if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
2082         return;
2083 
2084     if (lockres->l_level == DLM_LOCK_NL)
2085         return;
2086 
2087     mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
2088          lockres->l_name, lockres->l_flags, lockres->l_level,
2089          lockres->l_action);
2090 
2091     spin_lock_irqsave(&lockres->l_lock, flags);
2092     /*
2093      * Fake a blocking ast for the downconvert code.
2094      */
2095     lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
2096     lockres->l_blocking = DLM_LOCK_EX;
2097 
2098     gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
2099     lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
2100     spin_unlock_irqrestore(&lockres->l_lock, flags);
2101 
2102     ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
2103     if (ret) {
2104         mlog_errno(ret);
2105         return;
2106     }
2107 
2108     ret = ocfs2_wait_for_mask(&mw);
2109     if (ret)
2110         mlog_errno(ret);
2111 }
2112 
2113 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
2114                     struct ocfs2_lock_res *lockres)
2115 {
2116     int kick = 0;
2117 
2118     /* If we know that another node is waiting on our lock, kick
2119      * the downconvert thread * pre-emptively when we reach a release
2120      * condition. */
2121     if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
2122         switch(lockres->l_blocking) {
2123         case DLM_LOCK_EX:
2124             if (!lockres->l_ex_holders && !lockres->l_ro_holders)
2125                 kick = 1;
2126             break;
2127         case DLM_LOCK_PR:
2128             if (!lockres->l_ex_holders)
2129                 kick = 1;
2130             break;
2131         default:
2132             BUG();
2133         }
2134     }
2135 
2136     if (kick)
2137         ocfs2_wake_downconvert_thread(osb);
2138 }
2139 
2140 #define OCFS2_SEC_BITS   34
2141 #define OCFS2_SEC_SHIFT  (64 - OCFS2_SEC_BITS)
2142 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
2143 
2144 /* LVB only has room for 64 bits of time here so we pack it for
2145  * now. */
2146 static u64 ocfs2_pack_timespec(struct timespec64 *spec)
2147 {
2148     u64 res;
2149     u64 sec = clamp_t(time64_t, spec->tv_sec, 0, 0x3ffffffffull);
2150     u32 nsec = spec->tv_nsec;
2151 
2152     res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
2153 
2154     return res;
2155 }
2156 
2157 /* Call this with the lockres locked. I am reasonably sure we don't
2158  * need ip_lock in this function as anyone who would be changing those
2159  * values is supposed to be blocked in ocfs2_inode_lock right now. */
2160 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
2161 {
2162     struct ocfs2_inode_info *oi = OCFS2_I(inode);
2163     struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2164     struct ocfs2_meta_lvb *lvb;
2165 
2166     lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2167 
2168     /*
2169      * Invalidate the LVB of a deleted inode - this way other
2170      * nodes are forced to go to disk and discover the new inode
2171      * status.
2172      */
2173     if (oi->ip_flags & OCFS2_INODE_DELETED) {
2174         lvb->lvb_version = 0;
2175         goto out;
2176     }
2177 
2178     lvb->lvb_version   = OCFS2_LVB_VERSION;
2179     lvb->lvb_isize     = cpu_to_be64(i_size_read(inode));
2180     lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
2181     lvb->lvb_iuid      = cpu_to_be32(i_uid_read(inode));
2182     lvb->lvb_igid      = cpu_to_be32(i_gid_read(inode));
2183     lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
2184     lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
2185     lvb->lvb_iatime_packed  =
2186         cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
2187     lvb->lvb_ictime_packed =
2188         cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
2189     lvb->lvb_imtime_packed =
2190         cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
2191     lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
2192     lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
2193     lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
2194 
2195 out:
2196     mlog_meta_lvb(0, lockres);
2197 }
2198 
2199 static void ocfs2_unpack_timespec(struct timespec64 *spec,
2200                   u64 packed_time)
2201 {
2202     spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
2203     spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
2204 }
2205 
2206 static int ocfs2_refresh_inode_from_lvb(struct inode *inode)
2207 {
2208     struct ocfs2_inode_info *oi = OCFS2_I(inode);
2209     struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2210     struct ocfs2_meta_lvb *lvb;
2211 
2212     mlog_meta_lvb(0, lockres);
2213 
2214     lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2215     if (inode_wrong_type(inode, be16_to_cpu(lvb->lvb_imode)))
2216         return -ESTALE;
2217 
2218     /* We're safe here without the lockres lock... */
2219     spin_lock(&oi->ip_lock);
2220     oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
2221     i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
2222 
2223     oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
2224     oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
2225     ocfs2_set_inode_flags(inode);
2226 
2227     /* fast-symlinks are a special case */
2228     if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
2229         inode->i_blocks = 0;
2230     else
2231         inode->i_blocks = ocfs2_inode_sector_count(inode);
2232 
2233     i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid));
2234     i_gid_write(inode, be32_to_cpu(lvb->lvb_igid));
2235     inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
2236     set_nlink(inode, be16_to_cpu(lvb->lvb_inlink));
2237     ocfs2_unpack_timespec(&inode->i_atime,
2238                   be64_to_cpu(lvb->lvb_iatime_packed));
2239     ocfs2_unpack_timespec(&inode->i_mtime,
2240                   be64_to_cpu(lvb->lvb_imtime_packed));
2241     ocfs2_unpack_timespec(&inode->i_ctime,
2242                   be64_to_cpu(lvb->lvb_ictime_packed));
2243     spin_unlock(&oi->ip_lock);
2244     return 0;
2245 }
2246 
2247 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
2248                           struct ocfs2_lock_res *lockres)
2249 {
2250     struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2251 
2252     if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)
2253         && lvb->lvb_version == OCFS2_LVB_VERSION
2254         && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
2255         return 1;
2256     return 0;
2257 }
2258 
2259 /* Determine whether a lock resource needs to be refreshed, and
2260  * arbitrate who gets to refresh it.
2261  *
2262  *   0 means no refresh needed.
2263  *
2264  *   > 0 means you need to refresh this and you MUST call
2265  *   ocfs2_complete_lock_res_refresh afterwards. */
2266 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
2267 {
2268     unsigned long flags;
2269     int status = 0;
2270 
2271 refresh_check:
2272     spin_lock_irqsave(&lockres->l_lock, flags);
2273     if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2274         spin_unlock_irqrestore(&lockres->l_lock, flags);
2275         goto bail;
2276     }
2277 
2278     if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2279         spin_unlock_irqrestore(&lockres->l_lock, flags);
2280 
2281         ocfs2_wait_on_refreshing_lock(lockres);
2282         goto refresh_check;
2283     }
2284 
2285     /* Ok, I'll be the one to refresh this lock. */
2286     lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
2287     spin_unlock_irqrestore(&lockres->l_lock, flags);
2288 
2289     status = 1;
2290 bail:
2291     mlog(0, "status %d\n", status);
2292     return status;
2293 }
2294 
2295 /* If status is non zero, I'll mark it as not being in refresh
2296  * anymroe, but i won't clear the needs refresh flag. */
2297 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
2298                            int status)
2299 {
2300     unsigned long flags;
2301 
2302     spin_lock_irqsave(&lockres->l_lock, flags);
2303     lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
2304     if (!status)
2305         lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
2306     spin_unlock_irqrestore(&lockres->l_lock, flags);
2307 
2308     wake_up(&lockres->l_event);
2309 }
2310 
2311 /* may or may not return a bh if it went to disk. */
2312 static int ocfs2_inode_lock_update(struct inode *inode,
2313                   struct buffer_head **bh)
2314 {
2315     int status = 0;
2316     struct ocfs2_inode_info *oi = OCFS2_I(inode);
2317     struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2318     struct ocfs2_dinode *fe;
2319     struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2320 
2321     if (ocfs2_mount_local(osb))
2322         goto bail;
2323 
2324     spin_lock(&oi->ip_lock);
2325     if (oi->ip_flags & OCFS2_INODE_DELETED) {
2326         mlog(0, "Orphaned inode %llu was deleted while we "
2327              "were waiting on a lock. ip_flags = 0x%x\n",
2328              (unsigned long long)oi->ip_blkno, oi->ip_flags);
2329         spin_unlock(&oi->ip_lock);
2330         status = -ENOENT;
2331         goto bail;
2332     }
2333     spin_unlock(&oi->ip_lock);
2334 
2335     if (!ocfs2_should_refresh_lock_res(lockres))
2336         goto bail;
2337 
2338     /* This will discard any caching information we might have had
2339      * for the inode metadata. */
2340     ocfs2_metadata_cache_purge(INODE_CACHE(inode));
2341 
2342     ocfs2_extent_map_trunc(inode, 0);
2343 
2344     if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
2345         mlog(0, "Trusting LVB on inode %llu\n",
2346              (unsigned long long)oi->ip_blkno);
2347         status = ocfs2_refresh_inode_from_lvb(inode);
2348         goto bail_refresh;
2349     } else {
2350         /* Boo, we have to go to disk. */
2351         /* read bh, cast, ocfs2_refresh_inode */
2352         status = ocfs2_read_inode_block(inode, bh);
2353         if (status < 0) {
2354             mlog_errno(status);
2355             goto bail_refresh;
2356         }
2357         fe = (struct ocfs2_dinode *) (*bh)->b_data;
2358         if (inode_wrong_type(inode, le16_to_cpu(fe->i_mode))) {
2359             status = -ESTALE;
2360             goto bail_refresh;
2361         }
2362 
2363         /* This is a good chance to make sure we're not
2364          * locking an invalid object.  ocfs2_read_inode_block()
2365          * already checked that the inode block is sane.
2366          *
2367          * We bug on a stale inode here because we checked
2368          * above whether it was wiped from disk. The wiping
2369          * node provides a guarantee that we receive that
2370          * message and can mark the inode before dropping any
2371          * locks associated with it. */
2372         mlog_bug_on_msg(inode->i_generation !=
2373                 le32_to_cpu(fe->i_generation),
2374                 "Invalid dinode %llu disk generation: %u "
2375                 "inode->i_generation: %u\n",
2376                 (unsigned long long)oi->ip_blkno,
2377                 le32_to_cpu(fe->i_generation),
2378                 inode->i_generation);
2379         mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
2380                 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
2381                 "Stale dinode %llu dtime: %llu flags: 0x%x\n",
2382                 (unsigned long long)oi->ip_blkno,
2383                 (unsigned long long)le64_to_cpu(fe->i_dtime),
2384                 le32_to_cpu(fe->i_flags));
2385 
2386         ocfs2_refresh_inode(inode, fe);
2387         ocfs2_track_lock_refresh(lockres);
2388     }
2389 
2390     status = 0;
2391 bail_refresh:
2392     ocfs2_complete_lock_res_refresh(lockres, status);
2393 bail:
2394     return status;
2395 }
2396 
2397 static int ocfs2_assign_bh(struct inode *inode,
2398                struct buffer_head **ret_bh,
2399                struct buffer_head *passed_bh)
2400 {
2401     int status;
2402 
2403     if (passed_bh) {
2404         /* Ok, the update went to disk for us, use the
2405          * returned bh. */
2406         *ret_bh = passed_bh;
2407         get_bh(*ret_bh);
2408 
2409         return 0;
2410     }
2411 
2412     status = ocfs2_read_inode_block(inode, ret_bh);
2413     if (status < 0)
2414         mlog_errno(status);
2415 
2416     return status;
2417 }
2418 
2419 /*
2420  * returns < 0 error if the callback will never be called, otherwise
2421  * the result of the lock will be communicated via the callback.
2422  */
2423 int ocfs2_inode_lock_full_nested(struct inode *inode,
2424                  struct buffer_head **ret_bh,
2425                  int ex,
2426                  int arg_flags,
2427                  int subclass)
2428 {
2429     int status, level, acquired;
2430     u32 dlm_flags;
2431     struct ocfs2_lock_res *lockres = NULL;
2432     struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2433     struct buffer_head *local_bh = NULL;
2434 
2435     mlog(0, "inode %llu, take %s META lock\n",
2436          (unsigned long long)OCFS2_I(inode)->ip_blkno,
2437          ex ? "EXMODE" : "PRMODE");
2438 
2439     status = 0;
2440     acquired = 0;
2441     /* We'll allow faking a readonly metadata lock for
2442      * rodevices. */
2443     if (ocfs2_is_hard_readonly(osb)) {
2444         if (ex)
2445             status = -EROFS;
2446         goto getbh;
2447     }
2448 
2449     if ((arg_flags & OCFS2_META_LOCK_GETBH) ||
2450         ocfs2_mount_local(osb))
2451         goto update;
2452 
2453     if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2454         ocfs2_wait_for_recovery(osb);
2455 
2456     lockres = &OCFS2_I(inode)->ip_inode_lockres;
2457     level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2458     dlm_flags = 0;
2459     if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
2460         dlm_flags |= DLM_LKF_NOQUEUE;
2461 
2462     status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
2463                       arg_flags, subclass, _RET_IP_);
2464     if (status < 0) {
2465         if (status != -EAGAIN)
2466             mlog_errno(status);
2467         goto bail;
2468     }
2469 
2470     /* Notify the error cleanup path to drop the cluster lock. */
2471     acquired = 1;
2472 
2473     /* We wait twice because a node may have died while we were in
2474      * the lower dlm layers. The second time though, we've
2475      * committed to owning this lock so we don't allow signals to
2476      * abort the operation. */
2477     if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2478         ocfs2_wait_for_recovery(osb);
2479 
2480 update:
2481     /*
2482      * We only see this flag if we're being called from
2483      * ocfs2_read_locked_inode(). It means we're locking an inode
2484      * which hasn't been populated yet, so clear the refresh flag
2485      * and let the caller handle it.
2486      */
2487     if (inode->i_state & I_NEW) {
2488         status = 0;
2489         if (lockres)
2490             ocfs2_complete_lock_res_refresh(lockres, 0);
2491         goto bail;
2492     }
2493 
2494     /* This is fun. The caller may want a bh back, or it may
2495      * not. ocfs2_inode_lock_update definitely wants one in, but
2496      * may or may not read one, depending on what's in the
2497      * LVB. The result of all of this is that we've *only* gone to
2498      * disk if we have to, so the complexity is worthwhile. */
2499     status = ocfs2_inode_lock_update(inode, &local_bh);
2500     if (status < 0) {
2501         if (status != -ENOENT)
2502             mlog_errno(status);
2503         goto bail;
2504     }
2505 getbh:
2506     if (ret_bh) {
2507         status = ocfs2_assign_bh(inode, ret_bh, local_bh);
2508         if (status < 0) {
2509             mlog_errno(status);
2510             goto bail;
2511         }
2512     }
2513 
2514 bail:
2515     if (status < 0) {
2516         if (ret_bh && (*ret_bh)) {
2517             brelse(*ret_bh);
2518             *ret_bh = NULL;
2519         }
2520         if (acquired)
2521             ocfs2_inode_unlock(inode, ex);
2522     }
2523 
2524     brelse(local_bh);
2525     return status;
2526 }
2527 
2528 /*
2529  * This is working around a lock inversion between tasks acquiring DLM
2530  * locks while holding a page lock and the downconvert thread which
2531  * blocks dlm lock acquiry while acquiring page locks.
2532  *
2533  * ** These _with_page variantes are only intended to be called from aop
2534  * methods that hold page locks and return a very specific *positive* error
2535  * code that aop methods pass up to the VFS -- test for errors with != 0. **
2536  *
2537  * The DLM is called such that it returns -EAGAIN if it would have
2538  * blocked waiting for the downconvert thread.  In that case we unlock
2539  * our page so the downconvert thread can make progress.  Once we've
2540  * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2541  * that called us can bubble that back up into the VFS who will then
2542  * immediately retry the aop call.
2543  */
2544 int ocfs2_inode_lock_with_page(struct inode *inode,
2545                   struct buffer_head **ret_bh,
2546                   int ex,
2547                   struct page *page)
2548 {
2549     int ret;
2550 
2551     ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2552     if (ret == -EAGAIN) {
2553         unlock_page(page);
2554         /*
2555          * If we can't get inode lock immediately, we should not return
2556          * directly here, since this will lead to a softlockup problem.
2557          * The method is to get a blocking lock and immediately unlock
2558          * before returning, this can avoid CPU resource waste due to
2559          * lots of retries, and benefits fairness in getting lock.
2560          */
2561         if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2562             ocfs2_inode_unlock(inode, ex);
2563         ret = AOP_TRUNCATED_PAGE;
2564     }
2565 
2566     return ret;
2567 }
2568 
2569 int ocfs2_inode_lock_atime(struct inode *inode,
2570               struct vfsmount *vfsmnt,
2571               int *level, int wait)
2572 {
2573     int ret;
2574 
2575     if (wait)
2576         ret = ocfs2_inode_lock(inode, NULL, 0);
2577     else
2578         ret = ocfs2_try_inode_lock(inode, NULL, 0);
2579 
2580     if (ret < 0) {
2581         if (ret != -EAGAIN)
2582             mlog_errno(ret);
2583         return ret;
2584     }
2585 
2586     /*
2587      * If we should update atime, we will get EX lock,
2588      * otherwise we just get PR lock.
2589      */
2590     if (ocfs2_should_update_atime(inode, vfsmnt)) {
2591         struct buffer_head *bh = NULL;
2592 
2593         ocfs2_inode_unlock(inode, 0);
2594         if (wait)
2595             ret = ocfs2_inode_lock(inode, &bh, 1);
2596         else
2597             ret = ocfs2_try_inode_lock(inode, &bh, 1);
2598 
2599         if (ret < 0) {
2600             if (ret != -EAGAIN)
2601                 mlog_errno(ret);
2602             return ret;
2603         }
2604         *level = 1;
2605         if (ocfs2_should_update_atime(inode, vfsmnt))
2606             ocfs2_update_inode_atime(inode, bh);
2607         brelse(bh);
2608     } else
2609         *level = 0;
2610 
2611     return ret;
2612 }
2613 
2614 void ocfs2_inode_unlock(struct inode *inode,
2615                int ex)
2616 {
2617     int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2618     struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2619     struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2620 
2621     mlog(0, "inode %llu drop %s META lock\n",
2622          (unsigned long long)OCFS2_I(inode)->ip_blkno,
2623          ex ? "EXMODE" : "PRMODE");
2624 
2625     if (!ocfs2_is_hard_readonly(osb) &&
2626         !ocfs2_mount_local(osb))
2627         ocfs2_cluster_unlock(osb, lockres, level);
2628 }
2629 
2630 /*
2631  * This _tracker variantes are introduced to deal with the recursive cluster
2632  * locking issue. The idea is to keep track of a lock holder on the stack of
2633  * the current process. If there's a lock holder on the stack, we know the
2634  * task context is already protected by cluster locking. Currently, they're
2635  * used in some VFS entry routines.
2636  *
2637  * return < 0 on error, return == 0 if there's no lock holder on the stack
2638  * before this call, return == 1 if this call would be a recursive locking.
2639  * return == -1 if this lock attempt will cause an upgrade which is forbidden.
2640  *
2641  * When taking lock levels into account,we face some different situations.
2642  *
2643  * 1. no lock is held
2644  *    In this case, just lock the inode as requested and return 0
2645  *
2646  * 2. We are holding a lock
2647  *    For this situation, things diverges into several cases
2648  *
2649  *    wanted     holding         what to do
2650  *    ex        ex      see 2.1 below
2651  *    ex        pr      see 2.2 below
2652  *    pr        ex      see 2.1 below
2653  *    pr        pr      see 2.1 below
2654  *
2655  *    2.1 lock level that is been held is compatible
2656  *    with the wanted level, so no lock action will be tacken.
2657  *
2658  *    2.2 Otherwise, an upgrade is needed, but it is forbidden.
2659  *
2660  * Reason why upgrade within a process is forbidden is that
2661  * lock upgrade may cause dead lock. The following illustrates
2662  * how it happens.
2663  *
2664  *         thread on node1                             thread on node2
2665  * ocfs2_inode_lock_tracker(ex=0)
2666  *
2667  *                                <======   ocfs2_inode_lock_tracker(ex=1)
2668  *
2669  * ocfs2_inode_lock_tracker(ex=1)
2670  */
2671 int ocfs2_inode_lock_tracker(struct inode *inode,
2672                  struct buffer_head **ret_bh,
2673                  int ex,
2674                  struct ocfs2_lock_holder *oh)
2675 {
2676     int status = 0;
2677     struct ocfs2_lock_res *lockres;
2678     struct ocfs2_lock_holder *tmp_oh;
2679     struct pid *pid = task_pid(current);
2680 
2681 
2682     lockres = &OCFS2_I(inode)->ip_inode_lockres;
2683     tmp_oh = ocfs2_pid_holder(lockres, pid);
2684 
2685     if (!tmp_oh) {
2686         /*
2687          * This corresponds to the case 1.
2688          * We haven't got any lock before.
2689          */
2690         status = ocfs2_inode_lock_full(inode, ret_bh, ex, 0);
2691         if (status < 0) {
2692             if (status != -ENOENT)
2693                 mlog_errno(status);
2694             return status;
2695         }
2696 
2697         oh->oh_ex = ex;
2698         ocfs2_add_holder(lockres, oh);
2699         return 0;
2700     }
2701 
2702     if (unlikely(ex && !tmp_oh->oh_ex)) {
2703         /*
2704          * case 2.2 upgrade may cause dead lock, forbid it.
2705          */
2706         mlog(ML_ERROR, "Recursive locking is not permitted to "
2707              "upgrade to EX level from PR level.\n");
2708         dump_stack();
2709         return -EINVAL;
2710     }
2711 
2712     /*
2713      *  case 2.1 OCFS2_META_LOCK_GETBH flag make ocfs2_inode_lock_full.
2714      *  ignore the lock level and just update it.
2715      */
2716     if (ret_bh) {
2717         status = ocfs2_inode_lock_full(inode, ret_bh, ex,
2718                            OCFS2_META_LOCK_GETBH);
2719         if (status < 0) {
2720             if (status != -ENOENT)
2721                 mlog_errno(status);
2722             return status;
2723         }
2724     }
2725     return 1;
2726 }
2727 
2728 void ocfs2_inode_unlock_tracker(struct inode *inode,
2729                 int ex,
2730                 struct ocfs2_lock_holder *oh,
2731                 int had_lock)
2732 {
2733     struct ocfs2_lock_res *lockres;
2734 
2735     lockres = &OCFS2_I(inode)->ip_inode_lockres;
2736     /* had_lock means that the currect process already takes the cluster
2737      * lock previously.
2738      * If had_lock is 1, we have nothing to do here.
2739      * If had_lock is 0, we will release the lock.
2740      */
2741     if (!had_lock) {
2742         ocfs2_inode_unlock(inode, oh->oh_ex);
2743         ocfs2_remove_holder(lockres, oh);
2744     }
2745 }
2746 
2747 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno)
2748 {
2749     struct ocfs2_lock_res *lockres;
2750     struct ocfs2_orphan_scan_lvb *lvb;
2751     int status = 0;
2752 
2753     if (ocfs2_is_hard_readonly(osb))
2754         return -EROFS;
2755 
2756     if (ocfs2_mount_local(osb))
2757         return 0;
2758 
2759     lockres = &osb->osb_orphan_scan.os_lockres;
2760     status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2761     if (status < 0)
2762         return status;
2763 
2764     lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2765     if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2766         lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
2767         *seqno = be32_to_cpu(lvb->lvb_os_seqno);
2768     else
2769         *seqno = osb->osb_orphan_scan.os_seqno + 1;
2770 
2771     return status;
2772 }
2773 
2774 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno)
2775 {
2776     struct ocfs2_lock_res *lockres;
2777     struct ocfs2_orphan_scan_lvb *lvb;
2778 
2779     if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) {
2780         lockres = &osb->osb_orphan_scan.os_lockres;
2781         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2782         lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
2783         lvb->lvb_os_seqno = cpu_to_be32(seqno);
2784         ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2785     }
2786 }
2787 
2788 int ocfs2_super_lock(struct ocfs2_super *osb,
2789              int ex)
2790 {
2791     int status = 0;
2792     int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2793     struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2794 
2795     if (ocfs2_is_hard_readonly(osb))
2796         return -EROFS;
2797 
2798     if (ocfs2_mount_local(osb))
2799         goto bail;
2800 
2801     status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2802     if (status < 0) {
2803         mlog_errno(status);
2804         goto bail;
2805     }
2806 
2807     /* The super block lock path is really in the best position to
2808      * know when resources covered by the lock need to be
2809      * refreshed, so we do it here. Of course, making sense of
2810      * everything is up to the caller :) */
2811     status = ocfs2_should_refresh_lock_res(lockres);
2812     if (status) {
2813         status = ocfs2_refresh_slot_info(osb);
2814 
2815         ocfs2_complete_lock_res_refresh(lockres, status);
2816 
2817         if (status < 0) {
2818             ocfs2_cluster_unlock(osb, lockres, level);
2819             mlog_errno(status);
2820         }
2821         ocfs2_track_lock_refresh(lockres);
2822     }
2823 bail:
2824     return status;
2825 }
2826 
2827 void ocfs2_super_unlock(struct ocfs2_super *osb,
2828             int ex)
2829 {
2830     int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2831     struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2832 
2833     if (!ocfs2_mount_local(osb))
2834         ocfs2_cluster_unlock(osb, lockres, level);
2835 }
2836 
2837 int ocfs2_rename_lock(struct ocfs2_super *osb)
2838 {
2839     int status;
2840     struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2841 
2842     if (ocfs2_is_hard_readonly(osb))
2843         return -EROFS;
2844 
2845     if (ocfs2_mount_local(osb))
2846         return 0;
2847 
2848     status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2849     if (status < 0)
2850         mlog_errno(status);
2851 
2852     return status;
2853 }
2854 
2855 void ocfs2_rename_unlock(struct ocfs2_super *osb)
2856 {
2857     struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2858 
2859     if (!ocfs2_mount_local(osb))
2860         ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2861 }
2862 
2863 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex)
2864 {
2865     int status;
2866     struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2867 
2868     if (ocfs2_is_hard_readonly(osb))
2869         return -EROFS;
2870 
2871     if (ex)
2872         down_write(&osb->nfs_sync_rwlock);
2873     else
2874         down_read(&osb->nfs_sync_rwlock);
2875 
2876     if (ocfs2_mount_local(osb))
2877         return 0;
2878 
2879     status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE,
2880                     0, 0);
2881     if (status < 0) {
2882         mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status);
2883 
2884         if (ex)
2885             up_write(&osb->nfs_sync_rwlock);
2886         else
2887             up_read(&osb->nfs_sync_rwlock);
2888     }
2889 
2890     return status;
2891 }
2892 
2893 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex)
2894 {
2895     struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2896 
2897     if (!ocfs2_mount_local(osb))
2898         ocfs2_cluster_unlock(osb, lockres,
2899                      ex ? LKM_EXMODE : LKM_PRMODE);
2900     if (ex)
2901         up_write(&osb->nfs_sync_rwlock);
2902     else
2903         up_read(&osb->nfs_sync_rwlock);
2904 }
2905 
2906 int ocfs2_trim_fs_lock(struct ocfs2_super *osb,
2907                struct ocfs2_trim_fs_info *info, int trylock)
2908 {
2909     int status;
2910     struct ocfs2_trim_fs_lvb *lvb;
2911     struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
2912 
2913     if (info)
2914         info->tf_valid = 0;
2915 
2916     if (ocfs2_is_hard_readonly(osb))
2917         return -EROFS;
2918 
2919     if (ocfs2_mount_local(osb))
2920         return 0;
2921 
2922     status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX,
2923                     trylock ? DLM_LKF_NOQUEUE : 0, 0);
2924     if (status < 0) {
2925         if (status != -EAGAIN)
2926             mlog_errno(status);
2927         return status;
2928     }
2929 
2930     if (info) {
2931         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2932         if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2933             lvb->lvb_version == OCFS2_TRIMFS_LVB_VERSION) {
2934             info->tf_valid = 1;
2935             info->tf_success = lvb->lvb_success;
2936             info->tf_nodenum = be32_to_cpu(lvb->lvb_nodenum);
2937             info->tf_start = be64_to_cpu(lvb->lvb_start);
2938             info->tf_len = be64_to_cpu(lvb->lvb_len);
2939             info->tf_minlen = be64_to_cpu(lvb->lvb_minlen);
2940             info->tf_trimlen = be64_to_cpu(lvb->lvb_trimlen);
2941         }
2942     }
2943 
2944     return status;
2945 }
2946 
2947 void ocfs2_trim_fs_unlock(struct ocfs2_super *osb,
2948               struct ocfs2_trim_fs_info *info)
2949 {
2950     struct ocfs2_trim_fs_lvb *lvb;
2951     struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
2952 
2953     if (ocfs2_mount_local(osb))
2954         return;
2955 
2956     if (info) {
2957         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2958         lvb->lvb_version = OCFS2_TRIMFS_LVB_VERSION;
2959         lvb->lvb_success = info->tf_success;
2960         lvb->lvb_nodenum = cpu_to_be32(info->tf_nodenum);
2961         lvb->lvb_start = cpu_to_be64(info->tf_start);
2962         lvb->lvb_len = cpu_to_be64(info->tf_len);
2963         lvb->lvb_minlen = cpu_to_be64(info->tf_minlen);
2964         lvb->lvb_trimlen = cpu_to_be64(info->tf_trimlen);
2965     }
2966 
2967     ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2968 }
2969 
2970 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2971 {
2972     int ret;
2973     int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2974     struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2975     struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2976 
2977     BUG_ON(!dl);
2978 
2979     if (ocfs2_is_hard_readonly(osb)) {
2980         if (ex)
2981             return -EROFS;
2982         return 0;
2983     }
2984 
2985     if (ocfs2_mount_local(osb))
2986         return 0;
2987 
2988     ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2989     if (ret < 0)
2990         mlog_errno(ret);
2991 
2992     return ret;
2993 }
2994 
2995 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2996 {
2997     int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2998     struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2999     struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
3000 
3001     if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
3002         ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
3003 }
3004 
3005 /* Reference counting of the dlm debug structure. We want this because
3006  * open references on the debug inodes can live on after a mount, so
3007  * we can't rely on the ocfs2_super to always exist. */
3008 static void ocfs2_dlm_debug_free(struct kref *kref)
3009 {
3010     struct ocfs2_dlm_debug *dlm_debug;
3011 
3012     dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
3013 
3014     kfree(dlm_debug);
3015 }
3016 
3017 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
3018 {
3019     if (dlm_debug)
3020         kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
3021 }
3022 
3023 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
3024 {
3025     kref_get(&debug->d_refcnt);
3026 }
3027 
3028 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
3029 {
3030     struct ocfs2_dlm_debug *dlm_debug;
3031 
3032     dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
3033     if (!dlm_debug) {
3034         mlog_errno(-ENOMEM);
3035         goto out;
3036     }
3037 
3038     kref_init(&dlm_debug->d_refcnt);
3039     INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
3040     dlm_debug->d_filter_secs = 0;
3041 out:
3042     return dlm_debug;
3043 }
3044 
3045 /* Access to this is arbitrated for us via seq_file->sem. */
3046 struct ocfs2_dlm_seq_priv {
3047     struct ocfs2_dlm_debug *p_dlm_debug;
3048     struct ocfs2_lock_res p_iter_res;
3049     struct ocfs2_lock_res p_tmp_res;
3050 };
3051 
3052 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
3053                          struct ocfs2_dlm_seq_priv *priv)
3054 {
3055     struct ocfs2_lock_res *iter, *ret = NULL;
3056     struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
3057 
3058     assert_spin_locked(&ocfs2_dlm_tracking_lock);
3059 
3060     list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
3061         /* discover the head of the list */
3062         if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
3063             mlog(0, "End of list found, %p\n", ret);
3064             break;
3065         }
3066 
3067         /* We track our "dummy" iteration lockres' by a NULL
3068          * l_ops field. */
3069         if (iter->l_ops != NULL) {
3070             ret = iter;
3071             break;
3072         }
3073     }
3074 
3075     return ret;
3076 }
3077 
3078 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
3079 {
3080     struct ocfs2_dlm_seq_priv *priv = m->private;
3081     struct ocfs2_lock_res *iter;
3082 
3083     spin_lock(&ocfs2_dlm_tracking_lock);
3084     iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
3085     if (iter) {
3086         /* Since lockres' have the lifetime of their container
3087          * (which can be inodes, ocfs2_supers, etc) we want to
3088          * copy this out to a temporary lockres while still
3089          * under the spinlock. Obviously after this we can't
3090          * trust any pointers on the copy returned, but that's
3091          * ok as the information we want isn't typically held
3092          * in them. */
3093         priv->p_tmp_res = *iter;
3094         iter = &priv->p_tmp_res;
3095     }
3096     spin_unlock(&ocfs2_dlm_tracking_lock);
3097 
3098     return iter;
3099 }
3100 
3101 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
3102 {
3103 }
3104 
3105 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
3106 {
3107     struct ocfs2_dlm_seq_priv *priv = m->private;
3108     struct ocfs2_lock_res *iter = v;
3109     struct ocfs2_lock_res *dummy = &priv->p_iter_res;
3110 
3111     spin_lock(&ocfs2_dlm_tracking_lock);
3112     iter = ocfs2_dlm_next_res(iter, priv);
3113     list_del_init(&dummy->l_debug_list);
3114     if (iter) {
3115         list_add(&dummy->l_debug_list, &iter->l_debug_list);
3116         priv->p_tmp_res = *iter;
3117         iter = &priv->p_tmp_res;
3118     }
3119     spin_unlock(&ocfs2_dlm_tracking_lock);
3120 
3121     return iter;
3122 }
3123 
3124 /*
3125  * Version is used by debugfs.ocfs2 to determine the format being used
3126  *
3127  * New in version 2
3128  *  - Lock stats printed
3129  * New in version 3
3130  *  - Max time in lock stats is in usecs (instead of nsecs)
3131  * New in version 4
3132  *  - Add last pr/ex unlock times and first lock wait time in usecs
3133  */
3134 #define OCFS2_DLM_DEBUG_STR_VERSION 4
3135 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
3136 {
3137     int i;
3138     char *lvb;
3139     struct ocfs2_lock_res *lockres = v;
3140 #ifdef CONFIG_OCFS2_FS_STATS
3141     u64 now, last;
3142     struct ocfs2_dlm_debug *dlm_debug =
3143             ((struct ocfs2_dlm_seq_priv *)m->private)->p_dlm_debug;
3144 #endif
3145 
3146     if (!lockres)
3147         return -EINVAL;
3148 
3149 #ifdef CONFIG_OCFS2_FS_STATS
3150     if (!lockres->l_lock_wait && dlm_debug->d_filter_secs) {
3151         now = ktime_to_us(ktime_get_real());
3152         if (lockres->l_lock_prmode.ls_last >
3153             lockres->l_lock_exmode.ls_last)
3154             last = lockres->l_lock_prmode.ls_last;
3155         else
3156             last = lockres->l_lock_exmode.ls_last;
3157         /*
3158          * Use d_filter_secs field to filter lock resources dump,
3159          * the default d_filter_secs(0) value filters nothing,
3160          * otherwise, only dump the last N seconds active lock
3161          * resources.
3162          */
3163         if (div_u64(now - last, 1000000) > dlm_debug->d_filter_secs)
3164             return 0;
3165     }
3166 #endif
3167 
3168     seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
3169 
3170     if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
3171         seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
3172                lockres->l_name,
3173                (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
3174     else
3175         seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
3176 
3177     seq_printf(m, "%d\t"
3178            "0x%lx\t"
3179            "0x%x\t"
3180            "0x%x\t"
3181            "%u\t"
3182            "%u\t"
3183            "%d\t"
3184            "%d\t",
3185            lockres->l_level,
3186            lockres->l_flags,
3187            lockres->l_action,
3188            lockres->l_unlock_action,
3189            lockres->l_ro_holders,
3190            lockres->l_ex_holders,
3191            lockres->l_requested,
3192            lockres->l_blocking);
3193 
3194     /* Dump the raw LVB */
3195     lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3196     for(i = 0; i < DLM_LVB_LEN; i++)
3197         seq_printf(m, "0x%x\t", lvb[i]);
3198 
3199 #ifdef CONFIG_OCFS2_FS_STATS
3200 # define lock_num_prmode(_l)        ((_l)->l_lock_prmode.ls_gets)
3201 # define lock_num_exmode(_l)        ((_l)->l_lock_exmode.ls_gets)
3202 # define lock_num_prmode_failed(_l) ((_l)->l_lock_prmode.ls_fail)
3203 # define lock_num_exmode_failed(_l) ((_l)->l_lock_exmode.ls_fail)
3204 # define lock_total_prmode(_l)      ((_l)->l_lock_prmode.ls_total)
3205 # define lock_total_exmode(_l)      ((_l)->l_lock_exmode.ls_total)
3206 # define lock_max_prmode(_l)        ((_l)->l_lock_prmode.ls_max)
3207 # define lock_max_exmode(_l)        ((_l)->l_lock_exmode.ls_max)
3208 # define lock_refresh(_l)       ((_l)->l_lock_refresh)
3209 # define lock_last_prmode(_l)       ((_l)->l_lock_prmode.ls_last)
3210 # define lock_last_exmode(_l)       ((_l)->l_lock_exmode.ls_last)
3211 # define lock_wait(_l)          ((_l)->l_lock_wait)
3212 #else
3213 # define lock_num_prmode(_l)        (0)
3214 # define lock_num_exmode(_l)        (0)
3215 # define lock_num_prmode_failed(_l) (0)
3216 # define lock_num_exmode_failed(_l) (0)
3217 # define lock_total_prmode(_l)      (0ULL)
3218 # define lock_total_exmode(_l)      (0ULL)
3219 # define lock_max_prmode(_l)        (0)
3220 # define lock_max_exmode(_l)        (0)
3221 # define lock_refresh(_l)       (0)
3222 # define lock_last_prmode(_l)       (0ULL)
3223 # define lock_last_exmode(_l)       (0ULL)
3224 # define lock_wait(_l)          (0ULL)
3225 #endif
3226     /* The following seq_print was added in version 2 of this output */
3227     seq_printf(m, "%u\t"
3228            "%u\t"
3229            "%u\t"
3230            "%u\t"
3231            "%llu\t"
3232            "%llu\t"
3233            "%u\t"
3234            "%u\t"
3235            "%u\t"
3236            "%llu\t"
3237            "%llu\t"
3238            "%llu\t",
3239            lock_num_prmode(lockres),
3240            lock_num_exmode(lockres),
3241            lock_num_prmode_failed(lockres),
3242            lock_num_exmode_failed(lockres),
3243            lock_total_prmode(lockres),
3244            lock_total_exmode(lockres),
3245            lock_max_prmode(lockres),
3246            lock_max_exmode(lockres),
3247            lock_refresh(lockres),
3248            lock_last_prmode(lockres),
3249            lock_last_exmode(lockres),
3250            lock_wait(lockres));
3251 
3252     /* End the line */
3253     seq_printf(m, "\n");
3254     return 0;
3255 }
3256 
3257 static const struct seq_operations ocfs2_dlm_seq_ops = {
3258     .start =    ocfs2_dlm_seq_start,
3259     .stop =     ocfs2_dlm_seq_stop,
3260     .next =     ocfs2_dlm_seq_next,
3261     .show =     ocfs2_dlm_seq_show,
3262 };
3263 
3264 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
3265 {
3266     struct seq_file *seq = file->private_data;
3267     struct ocfs2_dlm_seq_priv *priv = seq->private;
3268     struct ocfs2_lock_res *res = &priv->p_iter_res;
3269 
3270     ocfs2_remove_lockres_tracking(res);
3271     ocfs2_put_dlm_debug(priv->p_dlm_debug);
3272     return seq_release_private(inode, file);
3273 }
3274 
3275 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
3276 {
3277     struct ocfs2_dlm_seq_priv *priv;
3278     struct ocfs2_super *osb;
3279 
3280     priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv));
3281     if (!priv) {
3282         mlog_errno(-ENOMEM);
3283         return -ENOMEM;
3284     }
3285 
3286     osb = inode->i_private;
3287     ocfs2_get_dlm_debug(osb->osb_dlm_debug);
3288     priv->p_dlm_debug = osb->osb_dlm_debug;
3289     INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
3290 
3291     ocfs2_add_lockres_tracking(&priv->p_iter_res,
3292                    priv->p_dlm_debug);
3293 
3294     return 0;
3295 }
3296 
3297 static const struct file_operations ocfs2_dlm_debug_fops = {
3298     .open =     ocfs2_dlm_debug_open,
3299     .release =  ocfs2_dlm_debug_release,
3300     .read =     seq_read,
3301     .llseek =   seq_lseek,
3302 };
3303 
3304 static void ocfs2_dlm_init_debug(struct ocfs2_super *osb)
3305 {
3306     struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
3307 
3308     debugfs_create_file("locking_state", S_IFREG|S_IRUSR,
3309                 osb->osb_debug_root, osb, &ocfs2_dlm_debug_fops);
3310 
3311     debugfs_create_u32("locking_filter", 0600, osb->osb_debug_root,
3312                &dlm_debug->d_filter_secs);
3313     ocfs2_get_dlm_debug(dlm_debug);
3314 }
3315 
3316 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
3317 {
3318     struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
3319 
3320     if (dlm_debug)
3321         ocfs2_put_dlm_debug(dlm_debug);
3322 }
3323 
3324 int ocfs2_dlm_init(struct ocfs2_super *osb)
3325 {
3326     int status = 0;
3327     struct ocfs2_cluster_connection *conn = NULL;
3328 
3329     if (ocfs2_mount_local(osb)) {
3330         osb->node_num = 0;
3331         goto local;
3332     }
3333 
3334     ocfs2_dlm_init_debug(osb);
3335 
3336     /* launch downconvert thread */
3337     osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc-%s",
3338             osb->uuid_str);
3339     if (IS_ERR(osb->dc_task)) {
3340         status = PTR_ERR(osb->dc_task);
3341         osb->dc_task = NULL;
3342         mlog_errno(status);
3343         goto bail;
3344     }
3345 
3346     /* for now, uuid == domain */
3347     status = ocfs2_cluster_connect(osb->osb_cluster_stack,
3348                        osb->osb_cluster_name,
3349                        strlen(osb->osb_cluster_name),
3350                        osb->uuid_str,
3351                        strlen(osb->uuid_str),
3352                        &lproto, ocfs2_do_node_down, osb,
3353                        &conn);
3354     if (status) {
3355         mlog_errno(status);
3356         goto bail;
3357     }
3358 
3359     status = ocfs2_cluster_this_node(conn, &osb->node_num);
3360     if (status < 0) {
3361         mlog_errno(status);
3362         mlog(ML_ERROR,
3363              "could not find this host's node number\n");
3364         ocfs2_cluster_disconnect(conn, 0);
3365         goto bail;
3366     }
3367 
3368 local:
3369     ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
3370     ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
3371     ocfs2_nfs_sync_lock_init(osb);
3372     ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
3373 
3374     osb->cconn = conn;
3375 bail:
3376     if (status < 0) {
3377         ocfs2_dlm_shutdown_debug(osb);
3378         if (osb->dc_task)
3379             kthread_stop(osb->dc_task);
3380     }
3381 
3382     return status;
3383 }
3384 
3385 void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
3386             int hangup_pending)
3387 {
3388     ocfs2_drop_osb_locks(osb);
3389 
3390     /*
3391      * Now that we have dropped all locks and ocfs2_dismount_volume()
3392      * has disabled recovery, the DLM won't be talking to us.  It's
3393      * safe to tear things down before disconnecting the cluster.
3394      */
3395 
3396     if (osb->dc_task) {
3397         kthread_stop(osb->dc_task);
3398         osb->dc_task = NULL;
3399     }
3400 
3401     ocfs2_lock_res_free(&osb->osb_super_lockres);
3402     ocfs2_lock_res_free(&osb->osb_rename_lockres);
3403     ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
3404     ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
3405 
3406     if (osb->cconn) {
3407         ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
3408         osb->cconn = NULL;
3409 
3410         ocfs2_dlm_shutdown_debug(osb);
3411     }
3412 }
3413 
3414 static int ocfs2_drop_lock(struct ocfs2_super *osb,
3415                struct ocfs2_lock_res *lockres)
3416 {
3417     int ret;
3418     unsigned long flags;
3419     u32 lkm_flags = 0;
3420 
3421     /* We didn't get anywhere near actually using this lockres. */
3422     if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
3423         goto out;
3424 
3425     if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3426         lkm_flags |= DLM_LKF_VALBLK;
3427 
3428     spin_lock_irqsave(&lockres->l_lock, flags);
3429 
3430     mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
3431             "lockres %s, flags 0x%lx\n",
3432             lockres->l_name, lockres->l_flags);
3433 
3434     while (lockres->l_flags & OCFS2_LOCK_BUSY) {
3435         mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
3436              "%u, unlock_action = %u\n",
3437              lockres->l_name, lockres->l_flags, lockres->l_action,
3438              lockres->l_unlock_action);
3439 
3440         spin_unlock_irqrestore(&lockres->l_lock, flags);
3441 
3442         /* XXX: Today we just wait on any busy
3443          * locks... Perhaps we need to cancel converts in the
3444          * future? */
3445         ocfs2_wait_on_busy_lock(lockres);
3446 
3447         spin_lock_irqsave(&lockres->l_lock, flags);
3448     }
3449 
3450     if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3451         if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
3452             lockres->l_level == DLM_LOCK_EX &&
3453             !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3454             lockres->l_ops->set_lvb(lockres);
3455     }
3456 
3457     if (lockres->l_flags & OCFS2_LOCK_BUSY)
3458         mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
3459              lockres->l_name);
3460     if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
3461         mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
3462 
3463     if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
3464         spin_unlock_irqrestore(&lockres->l_lock, flags);
3465         goto out;
3466     }
3467 
3468     lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
3469 
3470     /* make sure we never get here while waiting for an ast to
3471      * fire. */
3472     BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
3473 
3474     /* is this necessary? */
3475     lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3476     lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
3477     spin_unlock_irqrestore(&lockres->l_lock, flags);
3478 
3479     mlog(0, "lock %s\n", lockres->l_name);
3480 
3481     ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags);
3482     if (ret) {
3483         ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3484         mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
3485         ocfs2_dlm_dump_lksb(&lockres->l_lksb);
3486         BUG();
3487     }
3488     mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n",
3489          lockres->l_name);
3490 
3491     ocfs2_wait_on_busy_lock(lockres);
3492 out:
3493     return 0;
3494 }
3495 
3496 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3497                        struct ocfs2_lock_res *lockres);
3498 
3499 /* Mark the lockres as being dropped. It will no longer be
3500  * queued if blocking, but we still may have to wait on it
3501  * being dequeued from the downconvert thread before we can consider
3502  * it safe to drop.
3503  *
3504  * You can *not* attempt to call cluster_lock on this lockres anymore. */
3505 void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb,
3506                 struct ocfs2_lock_res *lockres)
3507 {
3508     int status;
3509     struct ocfs2_mask_waiter mw;
3510     unsigned long flags, flags2;
3511 
3512     ocfs2_init_mask_waiter(&mw);
3513 
3514     spin_lock_irqsave(&lockres->l_lock, flags);
3515     lockres->l_flags |= OCFS2_LOCK_FREEING;
3516     if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) {
3517         /*
3518          * We know the downconvert is queued but not in progress
3519          * because we are the downconvert thread and processing
3520          * different lock. So we can just remove the lock from the
3521          * queue. This is not only an optimization but also a way
3522          * to avoid the following deadlock:
3523          *   ocfs2_dentry_post_unlock()
3524          *     ocfs2_dentry_lock_put()
3525          *       ocfs2_drop_dentry_lock()
3526          *         iput()
3527          *           ocfs2_evict_inode()
3528          *             ocfs2_clear_inode()
3529          *               ocfs2_mark_lockres_freeing()
3530          *                 ... blocks waiting for OCFS2_LOCK_QUEUED
3531          *                 since we are the downconvert thread which
3532          *                 should clear the flag.
3533          */
3534         spin_unlock_irqrestore(&lockres->l_lock, flags);
3535         spin_lock_irqsave(&osb->dc_task_lock, flags2);
3536         list_del_init(&lockres->l_blocked_list);
3537         osb->blocked_lock_count--;
3538         spin_unlock_irqrestore(&osb->dc_task_lock, flags2);
3539         /*
3540          * Warn if we recurse into another post_unlock call.  Strictly
3541          * speaking it isn't a problem but we need to be careful if
3542          * that happens (stack overflow, deadlocks, ...) so warn if
3543          * ocfs2 grows a path for which this can happen.
3544          */
3545         WARN_ON_ONCE(lockres->l_ops->post_unlock);
3546         /* Since the lock is freeing we don't do much in the fn below */
3547         ocfs2_process_blocked_lock(osb, lockres);
3548         return;
3549     }
3550     while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
3551         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
3552         spin_unlock_irqrestore(&lockres->l_lock, flags);
3553 
3554         mlog(0, "Waiting on lockres %s\n", lockres->l_name);
3555 
3556         status = ocfs2_wait_for_mask(&mw);
3557         if (status)
3558             mlog_errno(status);
3559 
3560         spin_lock_irqsave(&lockres->l_lock, flags);
3561     }
3562     spin_unlock_irqrestore(&lockres->l_lock, flags);
3563 }
3564 
3565 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
3566                    struct ocfs2_lock_res *lockres)
3567 {
3568     int ret;
3569 
3570     ocfs2_mark_lockres_freeing(osb, lockres);
3571     ret = ocfs2_drop_lock(osb, lockres);
3572     if (ret)
3573         mlog_errno(ret);
3574 }
3575 
3576 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
3577 {
3578     ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
3579     ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
3580     ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
3581     ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
3582 }
3583 
3584 int ocfs2_drop_inode_locks(struct inode *inode)
3585 {
3586     int status, err;
3587 
3588     /* No need to call ocfs2_mark_lockres_freeing here -
3589      * ocfs2_clear_inode has done it for us. */
3590 
3591     err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3592                   &OCFS2_I(inode)->ip_open_lockres);
3593     if (err < 0)
3594         mlog_errno(err);
3595 
3596     status = err;
3597 
3598     err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3599                   &OCFS2_I(inode)->ip_inode_lockres);
3600     if (err < 0)
3601         mlog_errno(err);
3602     if (err < 0 && !status)
3603         status = err;
3604 
3605     err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3606                   &OCFS2_I(inode)->ip_rw_lockres);
3607     if (err < 0)
3608         mlog_errno(err);
3609     if (err < 0 && !status)
3610         status = err;
3611 
3612     return status;
3613 }
3614 
3615 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
3616                           int new_level)
3617 {
3618     assert_spin_locked(&lockres->l_lock);
3619 
3620     BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
3621 
3622     if (lockres->l_level <= new_level) {
3623         mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, "
3624              "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, "
3625              "block %d, pgen %d\n", lockres->l_name, lockres->l_level,
3626              new_level, list_empty(&lockres->l_blocked_list),
3627              list_empty(&lockres->l_mask_waiters), lockres->l_type,
3628              lockres->l_flags, lockres->l_ro_holders,
3629              lockres->l_ex_holders, lockres->l_action,
3630              lockres->l_unlock_action, lockres->l_requested,
3631              lockres->l_blocking, lockres->l_pending_gen);
3632         BUG();
3633     }
3634 
3635     mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n",
3636          lockres->l_name, lockres->l_level, new_level, lockres->l_blocking);
3637 
3638     lockres->l_action = OCFS2_AST_DOWNCONVERT;
3639     lockres->l_requested = new_level;
3640     lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3641     return lockres_set_pending(lockres);
3642 }
3643 
3644 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
3645                   struct ocfs2_lock_res *lockres,
3646                   int new_level,
3647                   int lvb,
3648                   unsigned int generation)
3649 {
3650     int ret;
3651     u32 dlm_flags = DLM_LKF_CONVERT;
3652 
3653     mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name,
3654          lockres->l_level, new_level);
3655 
3656     /*
3657      * On DLM_LKF_VALBLK, fsdlm behaves differently with o2cb. It always
3658      * expects DLM_LKF_VALBLK being set if the LKB has LVB, so that
3659      * we can recover correctly from node failure. Otherwise, we may get
3660      * invalid LVB in LKB, but without DLM_SBF_VALNOTVALID being set.
3661      */
3662     if (ocfs2_userspace_stack(osb) &&
3663         lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3664         lvb = 1;
3665 
3666     if (lvb)
3667         dlm_flags |= DLM_LKF_VALBLK;
3668 
3669     ret = ocfs2_dlm_lock(osb->cconn,
3670                  new_level,
3671                  &lockres->l_lksb,
3672                  dlm_flags,
3673                  lockres->l_name,
3674                  OCFS2_LOCK_ID_MAX_LEN - 1);
3675     lockres_clear_pending(lockres, generation, osb);
3676     if (ret) {
3677         ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
3678         ocfs2_recover_from_dlm_error(lockres, 1);
3679         goto bail;
3680     }
3681 
3682     ret = 0;
3683 bail:
3684     return ret;
3685 }
3686 
3687 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
3688 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
3689                         struct ocfs2_lock_res *lockres)
3690 {
3691     assert_spin_locked(&lockres->l_lock);
3692 
3693     if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
3694         /* If we're already trying to cancel a lock conversion
3695          * then just drop the spinlock and allow the caller to
3696          * requeue this lock. */
3697         mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name);
3698         return 0;
3699     }
3700 
3701     /* were we in a convert when we got the bast fire? */
3702     BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
3703            lockres->l_action != OCFS2_AST_DOWNCONVERT);
3704     /* set things up for the unlockast to know to just
3705      * clear out the ast_action and unset busy, etc. */
3706     lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
3707 
3708     mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
3709             "lock %s, invalid flags: 0x%lx\n",
3710             lockres->l_name, lockres->l_flags);
3711 
3712     mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3713 
3714     return 1;
3715 }
3716 
3717 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
3718                 struct ocfs2_lock_res *lockres)
3719 {
3720     int ret;
3721 
3722     ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
3723                    DLM_LKF_CANCEL);
3724     if (ret) {
3725         ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3726         ocfs2_recover_from_dlm_error(lockres, 0);
3727     }
3728 
3729     mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3730 
3731     return ret;
3732 }
3733 
3734 static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3735                   struct ocfs2_lock_res *lockres,
3736                   struct ocfs2_unblock_ctl *ctl)
3737 {
3738     unsigned long flags;
3739     int blocking;
3740     int new_level;
3741     int level;
3742     int ret = 0;
3743     int set_lvb = 0;
3744     unsigned int gen;
3745 
3746     spin_lock_irqsave(&lockres->l_lock, flags);
3747 
3748 recheck:
3749     /*
3750      * Is it still blocking? If not, we have no more work to do.
3751      */
3752     if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) {
3753         BUG_ON(lockres->l_blocking != DLM_LOCK_NL);
3754         spin_unlock_irqrestore(&lockres->l_lock, flags);
3755         ret = 0;
3756         goto leave;
3757     }
3758 
3759     if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3760         /* XXX
3761          * This is a *big* race.  The OCFS2_LOCK_PENDING flag
3762          * exists entirely for one reason - another thread has set
3763          * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
3764          *
3765          * If we do ocfs2_cancel_convert() before the other thread
3766          * calls dlm_lock(), our cancel will do nothing.  We will
3767          * get no ast, and we will have no way of knowing the
3768          * cancel failed.  Meanwhile, the other thread will call
3769          * into dlm_lock() and wait...forever.
3770          *
3771          * Why forever?  Because another node has asked for the
3772          * lock first; that's why we're here in unblock_lock().
3773          *
3774          * The solution is OCFS2_LOCK_PENDING.  When PENDING is
3775          * set, we just requeue the unblock.  Only when the other
3776          * thread has called dlm_lock() and cleared PENDING will
3777          * we then cancel their request.
3778          *
3779          * All callers of dlm_lock() must set OCFS2_DLM_PENDING
3780          * at the same time they set OCFS2_DLM_BUSY.  They must
3781          * clear OCFS2_DLM_PENDING after dlm_lock() returns.
3782          */
3783         if (lockres->l_flags & OCFS2_LOCK_PENDING) {
3784             mlog(ML_BASTS, "lockres %s, ReQ: Pending\n",
3785                  lockres->l_name);
3786             goto leave_requeue;
3787         }
3788 
3789         ctl->requeue = 1;
3790         ret = ocfs2_prepare_cancel_convert(osb, lockres);
3791         spin_unlock_irqrestore(&lockres->l_lock, flags);
3792         if (ret) {
3793             ret = ocfs2_cancel_convert(osb, lockres);
3794             if (ret < 0)
3795                 mlog_errno(ret);
3796         }
3797         goto leave;
3798     }
3799 
3800     /*
3801      * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is
3802      * set when the ast is received for an upconvert just before the
3803      * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast
3804      * on the heels of the ast, we want to delay the downconvert just
3805      * enough to allow the up requestor to do its task. Because this
3806      * lock is in the blocked queue, the lock will be downconverted
3807      * as soon as the requestor is done with the lock.
3808      */
3809     if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING)
3810         goto leave_requeue;
3811 
3812     /*
3813      * How can we block and yet be at NL?  We were trying to upconvert
3814      * from NL and got canceled.  The code comes back here, and now
3815      * we notice and clear BLOCKING.
3816      */
3817     if (lockres->l_level == DLM_LOCK_NL) {
3818         BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders);
3819         mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name);
3820         lockres->l_blocking = DLM_LOCK_NL;
3821         lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
3822         spin_unlock_irqrestore(&lockres->l_lock, flags);
3823         goto leave;
3824     }
3825 
3826     /* if we're blocking an exclusive and we have *any* holders,
3827      * then requeue. */
3828     if ((lockres->l_blocking == DLM_LOCK_EX)
3829         && (lockres->l_ex_holders || lockres->l_ro_holders)) {
3830         mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n",
3831              lockres->l_name, lockres->l_ex_holders,
3832              lockres->l_ro_holders);
3833         goto leave_requeue;
3834     }
3835 
3836     /* If it's a PR we're blocking, then only
3837      * requeue if we've got any EX holders */
3838     if (lockres->l_blocking == DLM_LOCK_PR &&
3839         lockres->l_ex_holders) {
3840         mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n",
3841              lockres->l_name, lockres->l_ex_holders);
3842         goto leave_requeue;
3843     }
3844 
3845     /*
3846      * Can we get a lock in this state if the holder counts are
3847      * zero? The meta data unblock code used to check this.
3848      */
3849     if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
3850         && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) {
3851         mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n",
3852              lockres->l_name);
3853         goto leave_requeue;
3854     }
3855 
3856     new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
3857 
3858     if (lockres->l_ops->check_downconvert
3859         && !lockres->l_ops->check_downconvert(lockres, new_level)) {
3860         mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n",
3861              lockres->l_name);
3862         goto leave_requeue;
3863     }
3864 
3865     /* If we get here, then we know that there are no more
3866      * incompatible holders (and anyone asking for an incompatible
3867      * lock is blocked). We can now downconvert the lock */
3868     if (!lockres->l_ops->downconvert_worker)
3869         goto downconvert;
3870 
3871     /* Some lockres types want to do a bit of work before
3872      * downconverting a lock. Allow that here. The worker function
3873      * may sleep, so we save off a copy of what we're blocking as
3874      * it may change while we're not holding the spin lock. */
3875     blocking = lockres->l_blocking;
3876     level = lockres->l_level;
3877     spin_unlock_irqrestore(&lockres->l_lock, flags);
3878 
3879     ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
3880 
3881     if (ctl->unblock_action == UNBLOCK_STOP_POST) {
3882         mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n",
3883              lockres->l_name);
3884         goto leave;
3885     }
3886 
3887     spin_lock_irqsave(&lockres->l_lock, flags);
3888     if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) {
3889         /* If this changed underneath us, then we can't drop
3890          * it just yet. */
3891         mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, "
3892              "Recheck\n", lockres->l_name, blocking,
3893              lockres->l_blocking, level, lockres->l_level);
3894         goto recheck;
3895     }
3896 
3897 downconvert:
3898     ctl->requeue = 0;
3899 
3900     if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3901         if (lockres->l_level == DLM_LOCK_EX)
3902             set_lvb = 1;
3903 
3904         /*
3905          * We only set the lvb if the lock has been fully
3906          * refreshed - otherwise we risk setting stale
3907          * data. Otherwise, there's no need to actually clear
3908          * out the lvb here as it's value is still valid.
3909          */
3910         if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3911             lockres->l_ops->set_lvb(lockres);
3912     }
3913 
3914     gen = ocfs2_prepare_downconvert(lockres, new_level);
3915     spin_unlock_irqrestore(&lockres->l_lock, flags);
3916     ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
3917                      gen);
3918     /* The dlm lock convert is being cancelled in background,
3919      * ocfs2_cancel_convert() is asynchronous in fs/dlm,
3920      * requeue it, try again later.
3921      */
3922     if (ret == -EBUSY) {
3923         ctl->requeue = 1;
3924         mlog(ML_BASTS, "lockres %s, ReQ: Downconvert busy\n",
3925              lockres->l_name);
3926         ret = 0;
3927         msleep(20);
3928     }
3929 
3930 leave:
3931     if (ret)
3932         mlog_errno(ret);
3933     return ret;
3934 
3935 leave_requeue:
3936     spin_unlock_irqrestore(&lockres->l_lock, flags);
3937     ctl->requeue = 1;
3938 
3939     return 0;
3940 }
3941 
3942 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3943                      int blocking)
3944 {
3945     struct inode *inode;
3946     struct address_space *mapping;
3947     struct ocfs2_inode_info *oi;
3948 
3949         inode = ocfs2_lock_res_inode(lockres);
3950     mapping = inode->i_mapping;
3951 
3952     if (S_ISDIR(inode->i_mode)) {
3953         oi = OCFS2_I(inode);
3954         oi->ip_dir_lock_gen++;
3955         mlog(0, "generation: %u\n", oi->ip_dir_lock_gen);
3956         goto out_forget;
3957     }
3958 
3959     if (!S_ISREG(inode->i_mode))
3960         goto out;
3961 
3962     /*
3963      * We need this before the filemap_fdatawrite() so that it can
3964      * transfer the dirty bit from the PTE to the
3965      * page. Unfortunately this means that even for EX->PR
3966      * downconverts, we'll lose our mappings and have to build
3967      * them up again.
3968      */
3969     unmap_mapping_range(mapping, 0, 0, 0);
3970 
3971     if (filemap_fdatawrite(mapping)) {
3972         mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3973              (unsigned long long)OCFS2_I(inode)->ip_blkno);
3974     }
3975     sync_mapping_buffers(mapping);
3976     if (blocking == DLM_LOCK_EX) {
3977         truncate_inode_pages(mapping, 0);
3978     } else {
3979         /* We only need to wait on the I/O if we're not also
3980          * truncating pages because truncate_inode_pages waits
3981          * for us above. We don't truncate pages if we're
3982          * blocking anything < EXMODE because we want to keep
3983          * them around in that case. */
3984         filemap_fdatawait(mapping);
3985     }
3986 
3987 out_forget:
3988     forget_all_cached_acls(inode);
3989 
3990 out:
3991     return UNBLOCK_CONTINUE;
3992 }
3993 
3994 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci,
3995                  struct ocfs2_lock_res *lockres,
3996                  int new_level)
3997 {
3998     int checkpointed = ocfs2_ci_fully_checkpointed(ci);
3999 
4000     BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
4001     BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
4002 
4003     if (checkpointed)
4004         return 1;
4005 
4006     ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci)));
4007     return 0;
4008 }
4009 
4010 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
4011                     int new_level)
4012 {
4013     struct inode *inode = ocfs2_lock_res_inode(lockres);
4014 
4015     return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level);
4016 }
4017 
4018 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
4019 {
4020     struct inode *inode = ocfs2_lock_res_inode(lockres);
4021 
4022     __ocfs2_stuff_meta_lvb(inode);
4023 }
4024 
4025 /*
4026  * Does the final reference drop on our dentry lock. Right now this
4027  * happens in the downconvert thread, but we could choose to simplify the
4028  * dlmglue API and push these off to the ocfs2_wq in the future.
4029  */
4030 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
4031                      struct ocfs2_lock_res *lockres)
4032 {
4033     struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
4034     ocfs2_dentry_lock_put(osb, dl);
4035 }
4036 
4037 /*
4038  * d_delete() matching dentries before the lock downconvert.
4039  *
4040  * At this point, any process waiting to destroy the
4041  * dentry_lock due to last ref count is stopped by the
4042  * OCFS2_LOCK_QUEUED flag.
4043  *
4044  * We have two potential problems
4045  *
4046  * 1) If we do the last reference drop on our dentry_lock (via dput)
4047  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
4048  *    the downconvert to finish. Instead we take an elevated
4049  *    reference and push the drop until after we've completed our
4050  *    unblock processing.
4051  *
4052  * 2) There might be another process with a final reference,
4053  *    waiting on us to finish processing. If this is the case, we
4054  *    detect it and exit out - there's no more dentries anyway.
4055  */
4056 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
4057                        int blocking)
4058 {
4059     struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
4060     struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
4061     struct dentry *dentry;
4062     unsigned long flags;
4063     int extra_ref = 0;
4064 
4065     /*
4066      * This node is blocking another node from getting a read
4067      * lock. This happens when we've renamed within a
4068      * directory. We've forced the other nodes to d_delete(), but
4069      * we never actually dropped our lock because it's still
4070      * valid. The downconvert code will retain a PR for this node,
4071      * so there's no further work to do.
4072      */
4073     if (blocking == DLM_LOCK_PR)
4074         return UNBLOCK_CONTINUE;
4075 
4076     /*
4077      * Mark this inode as potentially orphaned. The code in
4078      * ocfs2_delete_inode() will figure out whether it actually
4079      * needs to be freed or not.
4080      */
4081     spin_lock(&oi->ip_lock);
4082     oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
4083     spin_unlock(&oi->ip_lock);
4084 
4085     /*
4086      * Yuck. We need to make sure however that the check of
4087      * OCFS2_LOCK_FREEING and the extra reference are atomic with
4088      * respect to a reference decrement or the setting of that
4089      * flag.
4090      */
4091     spin_lock_irqsave(&lockres->l_lock, flags);
4092     spin_lock(&dentry_attach_lock);
4093     if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
4094         && dl->dl_count) {
4095         dl->dl_count++;
4096         extra_ref = 1;
4097     }
4098     spin_unlock(&dentry_attach_lock);
4099     spin_unlock_irqrestore(&lockres->l_lock, flags);
4100 
4101     mlog(0, "extra_ref = %d\n", extra_ref);
4102 
4103     /*
4104      * We have a process waiting on us in ocfs2_dentry_iput(),
4105      * which means we can't have any more outstanding
4106      * aliases. There's no need to do any more work.
4107      */
4108     if (!extra_ref)
4109         return UNBLOCK_CONTINUE;
4110 
4111     spin_lock(&dentry_attach_lock);
4112     while (1) {
4113         dentry = ocfs2_find_local_alias(dl->dl_inode,
4114                         dl->dl_parent_blkno, 1);
4115         if (!dentry)
4116             break;
4117         spin_unlock(&dentry_attach_lock);
4118 
4119         if (S_ISDIR(dl->dl_inode->i_mode))
4120             shrink_dcache_parent(dentry);
4121 
4122         mlog(0, "d_delete(%pd);\n", dentry);
4123 
4124         /*
4125          * The following dcache calls may do an
4126          * iput(). Normally we don't want that from the
4127          * downconverting thread, but in this case it's ok
4128          * because the requesting node already has an
4129          * exclusive lock on the inode, so it can't be queued
4130          * for a downconvert.
4131          */
4132         d_delete(dentry);
4133         dput(dentry);
4134 
4135         spin_lock(&dentry_attach_lock);
4136     }
4137     spin_unlock(&dentry_attach_lock);
4138 
4139     /*
4140      * If we are the last holder of this dentry lock, there is no
4141      * reason to downconvert so skip straight to the unlock.
4142      */
4143     if (dl->dl_count == 1)
4144         return UNBLOCK_STOP_POST;
4145 
4146     return UNBLOCK_CONTINUE_POST;
4147 }
4148 
4149 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
4150                         int new_level)
4151 {
4152     struct ocfs2_refcount_tree *tree =
4153                 ocfs2_lock_res_refcount_tree(lockres);
4154 
4155     return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level);
4156 }
4157 
4158 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
4159                      int blocking)
4160 {
4161     struct ocfs2_refcount_tree *tree =
4162                 ocfs2_lock_res_refcount_tree(lockres);
4163 
4164     ocfs2_metadata_cache_purge(&tree->rf_ci);
4165 
4166     return UNBLOCK_CONTINUE;
4167 }
4168 
4169 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres)
4170 {
4171     struct ocfs2_qinfo_lvb *lvb;
4172     struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres);
4173     struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
4174                         oinfo->dqi_gi.dqi_type);
4175 
4176     lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
4177     lvb->lvb_version = OCFS2_QINFO_LVB_VERSION;
4178     lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace);
4179     lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace);
4180     lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms);
4181     lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks);
4182     lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk);
4183     lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry);
4184 }
4185 
4186 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex)
4187 {
4188     struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
4189     struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
4190     int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4191 
4192     if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
4193         ocfs2_cluster_unlock(osb, lockres, level);
4194 }
4195 
4196 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo)
4197 {
4198     struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
4199                         oinfo->dqi_gi.dqi_type);
4200     struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
4201     struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
4202     struct buffer_head *bh = NULL;
4203     struct ocfs2_global_disk_dqinfo *gdinfo;
4204     int status = 0;
4205 
4206     if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
4207         lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) {
4208         info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace);
4209         info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace);
4210         oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms);
4211         oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks);
4212         oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk);
4213         oinfo->dqi_gi.dqi_free_entry =
4214                     be32_to_cpu(lvb->lvb_free_entry);
4215     } else {
4216         status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode,
4217                              oinfo->dqi_giblk, &bh);
4218         if (status) {
4219             mlog_errno(status);
4220             goto bail;
4221         }
4222         gdinfo = (struct ocfs2_global_disk_dqinfo *)
4223                     (bh->b_data + OCFS2_GLOBAL_INFO_OFF);
4224         info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace);
4225         info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace);
4226         oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms);
4227         oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks);
4228         oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk);
4229         oinfo->dqi_gi.dqi_free_entry =
4230                     le32_to_cpu(gdinfo->dqi_free_entry);
4231         brelse(bh);
4232         ocfs2_track_lock_refresh(lockres);
4233     }
4234 
4235 bail:
4236     return status;
4237 }
4238 
4239 /* Lock quota info, this function expects at least shared lock on the quota file
4240  * so that we can safely refresh quota info from disk. */
4241 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex)
4242 {
4243     struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
4244     struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
4245     int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4246     int status = 0;
4247 
4248     /* On RO devices, locking really isn't needed... */
4249     if (ocfs2_is_hard_readonly(osb)) {
4250         if (ex)
4251             status = -EROFS;
4252         goto bail;
4253     }
4254     if (ocfs2_mount_local(osb))
4255         goto bail;
4256 
4257     status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
4258     if (status < 0) {
4259         mlog_errno(status);
4260         goto bail;
4261     }
4262     if (!ocfs2_should_refresh_lock_res(lockres))
4263         goto bail;
4264     /* OK, we have the lock but we need to refresh the quota info */
4265     status = ocfs2_refresh_qinfo(oinfo);
4266     if (status)
4267         ocfs2_qinfo_unlock(oinfo, ex);
4268     ocfs2_complete_lock_res_refresh(lockres, status);
4269 bail:
4270     return status;
4271 }
4272 
4273 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex)
4274 {
4275     int status;
4276     int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4277     struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
4278     struct ocfs2_super *osb = lockres->l_priv;
4279 
4280 
4281     if (ocfs2_is_hard_readonly(osb))
4282         return -EROFS;
4283 
4284     if (ocfs2_mount_local(osb))
4285         return 0;
4286 
4287     status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
4288     if (status < 0)
4289         mlog_errno(status);
4290 
4291     return status;
4292 }
4293 
4294 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
4295 {
4296     int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4297     struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
4298     struct ocfs2_super *osb = lockres->l_priv;
4299 
4300     if (!ocfs2_mount_local(osb))
4301         ocfs2_cluster_unlock(osb, lockres, level);
4302 }
4303 
4304 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
4305                        struct ocfs2_lock_res *lockres)
4306 {
4307     int status;
4308     struct ocfs2_unblock_ctl ctl = {0, 0,};
4309     unsigned long flags;
4310 
4311     /* Our reference to the lockres in this function can be
4312      * considered valid until we remove the OCFS2_LOCK_QUEUED
4313      * flag. */
4314 
4315     BUG_ON(!lockres);
4316     BUG_ON(!lockres->l_ops);
4317 
4318     mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name);
4319 
4320     /* Detect whether a lock has been marked as going away while
4321      * the downconvert thread was processing other things. A lock can
4322      * still be marked with OCFS2_LOCK_FREEING after this check,
4323      * but short circuiting here will still save us some
4324      * performance. */
4325     spin_lock_irqsave(&lockres->l_lock, flags);
4326     if (lockres->l_flags & OCFS2_LOCK_FREEING)
4327         goto unqueue;
4328     spin_unlock_irqrestore(&lockres->l_lock, flags);
4329 
4330     status = ocfs2_unblock_lock(osb, lockres, &ctl);
4331     if (status < 0)
4332         mlog_errno(status);
4333 
4334     spin_lock_irqsave(&lockres->l_lock, flags);
4335 unqueue:
4336     if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
4337         lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
4338     } else
4339         ocfs2_schedule_blocked_lock(osb, lockres);
4340 
4341     mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name,
4342          ctl.requeue ? "yes" : "no");
4343     spin_unlock_irqrestore(&lockres->l_lock, flags);
4344 
4345     if (ctl.unblock_action != UNBLOCK_CONTINUE
4346         && lockres->l_ops->post_unlock)
4347         lockres->l_ops->post_unlock(osb, lockres);
4348 }
4349 
4350 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
4351                     struct ocfs2_lock_res *lockres)
4352 {
4353     unsigned long flags;
4354 
4355     assert_spin_locked(&lockres->l_lock);
4356 
4357     if (lockres->l_flags & OCFS2_LOCK_FREEING) {
4358         /* Do not schedule a lock for downconvert when it's on
4359          * the way to destruction - any nodes wanting access
4360          * to the resource will get it soon. */
4361         mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n",
4362              lockres->l_name, lockres->l_flags);
4363         return;
4364     }
4365 
4366     lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
4367 
4368     spin_lock_irqsave(&osb->dc_task_lock, flags);
4369     if (list_empty(&lockres->l_blocked_list)) {
4370         list_add_tail(&lockres->l_blocked_list,
4371                   &osb->blocked_lock_list);
4372         osb->blocked_lock_count++;
4373     }
4374     spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4375 }
4376 
4377 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
4378 {
4379     unsigned long processed;
4380     unsigned long flags;
4381     struct ocfs2_lock_res *lockres;
4382 
4383     spin_lock_irqsave(&osb->dc_task_lock, flags);
4384     /* grab this early so we know to try again if a state change and
4385      * wake happens part-way through our work  */
4386     osb->dc_work_sequence = osb->dc_wake_sequence;
4387 
4388     processed = osb->blocked_lock_count;
4389     /*
4390      * blocked lock processing in this loop might call iput which can
4391      * remove items off osb->blocked_lock_list. Downconvert up to
4392      * 'processed' number of locks, but stop short if we had some
4393      * removed in ocfs2_mark_lockres_freeing when downconverting.
4394      */
4395     while (processed && !list_empty(&osb->blocked_lock_list)) {
4396         lockres = list_entry(osb->blocked_lock_list.next,
4397                      struct ocfs2_lock_res, l_blocked_list);
4398         list_del_init(&lockres->l_blocked_list);
4399         osb->blocked_lock_count--;
4400         spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4401 
4402         BUG_ON(!processed);
4403         processed--;
4404 
4405         ocfs2_process_blocked_lock(osb, lockres);
4406 
4407         spin_lock_irqsave(&osb->dc_task_lock, flags);
4408     }
4409     spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4410 }
4411 
4412 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
4413 {
4414     int empty = 0;
4415     unsigned long flags;
4416 
4417     spin_lock_irqsave(&osb->dc_task_lock, flags);
4418     if (list_empty(&osb->blocked_lock_list))
4419         empty = 1;
4420 
4421     spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4422     return empty;
4423 }
4424 
4425 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
4426 {
4427     int should_wake = 0;
4428     unsigned long flags;
4429 
4430     spin_lock_irqsave(&osb->dc_task_lock, flags);
4431     if (osb->dc_work_sequence != osb->dc_wake_sequence)
4432         should_wake = 1;
4433     spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4434 
4435     return should_wake;
4436 }
4437 
4438 static int ocfs2_downconvert_thread(void *arg)
4439 {
4440     struct ocfs2_super *osb = arg;
4441 
4442     /* only quit once we've been asked to stop and there is no more
4443      * work available */
4444     while (!(kthread_should_stop() &&
4445         ocfs2_downconvert_thread_lists_empty(osb))) {
4446 
4447         wait_event_interruptible(osb->dc_event,
4448                      ocfs2_downconvert_thread_should_wake(osb) ||
4449                      kthread_should_stop());
4450 
4451         mlog(0, "downconvert_thread: awoken\n");
4452 
4453         ocfs2_downconvert_thread_do_work(osb);
4454     }
4455 
4456     osb->dc_task = NULL;
4457     return 0;
4458 }
4459 
4460 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
4461 {
4462     unsigned long flags;
4463 
4464     spin_lock_irqsave(&osb->dc_task_lock, flags);
4465     /* make sure the voting thread gets a swipe at whatever changes
4466      * the caller may have made to the voting state */
4467     osb->dc_wake_sequence++;
4468     spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4469     wake_up(&osb->dc_event);
4470 }