0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 #include <linux/signal.h>
0015 #include <linux/sched/signal.h>
0016
0017 #include <linux/module.h>
0018 #include <linux/fs.h>
0019 #include <linux/types.h>
0020 #include <linux/crc32.h>
0021
0022 #include "../ocfs2_lockingver.h"
0023 #include "../stackglue.h"
0024 #include "userdlm.h"
0025
0026 #define MLOG_MASK_PREFIX ML_DLMFS
0027 #include "../cluster/masklog.h"
0028
0029
0030 static inline struct user_lock_res *user_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
0031 {
0032 return container_of(lksb, struct user_lock_res, l_lksb);
0033 }
0034
0035 static inline int user_check_wait_flag(struct user_lock_res *lockres,
0036 int flag)
0037 {
0038 int ret;
0039
0040 spin_lock(&lockres->l_lock);
0041 ret = lockres->l_flags & flag;
0042 spin_unlock(&lockres->l_lock);
0043
0044 return ret;
0045 }
0046
0047 static inline void user_wait_on_busy_lock(struct user_lock_res *lockres)
0048
0049 {
0050 wait_event(lockres->l_event,
0051 !user_check_wait_flag(lockres, USER_LOCK_BUSY));
0052 }
0053
0054 static inline void user_wait_on_blocked_lock(struct user_lock_res *lockres)
0055
0056 {
0057 wait_event(lockres->l_event,
0058 !user_check_wait_flag(lockres, USER_LOCK_BLOCKED));
0059 }
0060
0061
0062 static inline struct ocfs2_cluster_connection *
0063 cluster_connection_from_user_lockres(struct user_lock_res *lockres)
0064 {
0065 struct dlmfs_inode_private *ip;
0066
0067 ip = container_of(lockres,
0068 struct dlmfs_inode_private,
0069 ip_lockres);
0070 return ip->ip_conn;
0071 }
0072
0073 static struct inode *
0074 user_dlm_inode_from_user_lockres(struct user_lock_res *lockres)
0075 {
0076 struct dlmfs_inode_private *ip;
0077
0078 ip = container_of(lockres,
0079 struct dlmfs_inode_private,
0080 ip_lockres);
0081 return &ip->ip_vfs_inode;
0082 }
0083
0084 static inline void user_recover_from_dlm_error(struct user_lock_res *lockres)
0085 {
0086 spin_lock(&lockres->l_lock);
0087 lockres->l_flags &= ~USER_LOCK_BUSY;
0088 spin_unlock(&lockres->l_lock);
0089 }
0090
0091 #define user_log_dlm_error(_func, _stat, _lockres) do { \
0092 mlog(ML_ERROR, "Dlm error %d while calling %s on " \
0093 "resource %.*s\n", _stat, _func, \
0094 _lockres->l_namelen, _lockres->l_name); \
0095 } while (0)
0096
0097
0098
0099
0100 static inline int user_highest_compat_lock_level(int level)
0101 {
0102 int new_level = DLM_LOCK_EX;
0103
0104 if (level == DLM_LOCK_EX)
0105 new_level = DLM_LOCK_NL;
0106 else if (level == DLM_LOCK_PR)
0107 new_level = DLM_LOCK_PR;
0108 return new_level;
0109 }
0110
0111 static void user_ast(struct ocfs2_dlm_lksb *lksb)
0112 {
0113 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
0114 int status;
0115
0116 mlog(ML_BASTS, "AST fired for lockres %.*s, level %d => %d\n",
0117 lockres->l_namelen, lockres->l_name, lockres->l_level,
0118 lockres->l_requested);
0119
0120 spin_lock(&lockres->l_lock);
0121
0122 status = ocfs2_dlm_lock_status(&lockres->l_lksb);
0123 if (status) {
0124 mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n",
0125 status, lockres->l_namelen, lockres->l_name);
0126 spin_unlock(&lockres->l_lock);
0127 return;
0128 }
0129
0130 mlog_bug_on_msg(lockres->l_requested == DLM_LOCK_IV,
0131 "Lockres %.*s, requested ivmode. flags 0x%x\n",
0132 lockres->l_namelen, lockres->l_name, lockres->l_flags);
0133
0134
0135 if (lockres->l_requested < lockres->l_level) {
0136 if (lockres->l_requested <=
0137 user_highest_compat_lock_level(lockres->l_blocking)) {
0138 lockres->l_blocking = DLM_LOCK_NL;
0139 lockres->l_flags &= ~USER_LOCK_BLOCKED;
0140 }
0141 }
0142
0143 lockres->l_level = lockres->l_requested;
0144 lockres->l_requested = DLM_LOCK_IV;
0145 lockres->l_flags |= USER_LOCK_ATTACHED;
0146 lockres->l_flags &= ~USER_LOCK_BUSY;
0147
0148 spin_unlock(&lockres->l_lock);
0149
0150 wake_up(&lockres->l_event);
0151 }
0152
0153 static inline void user_dlm_grab_inode_ref(struct user_lock_res *lockres)
0154 {
0155 struct inode *inode;
0156 inode = user_dlm_inode_from_user_lockres(lockres);
0157 if (!igrab(inode))
0158 BUG();
0159 }
0160
0161 static void user_dlm_unblock_lock(struct work_struct *work);
0162
0163 static void __user_dlm_queue_lockres(struct user_lock_res *lockres)
0164 {
0165 if (!(lockres->l_flags & USER_LOCK_QUEUED)) {
0166 user_dlm_grab_inode_ref(lockres);
0167
0168 INIT_WORK(&lockres->l_work, user_dlm_unblock_lock);
0169
0170 queue_work(user_dlm_worker, &lockres->l_work);
0171 lockres->l_flags |= USER_LOCK_QUEUED;
0172 }
0173 }
0174
0175 static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres)
0176 {
0177 int queue = 0;
0178
0179 if (!(lockres->l_flags & USER_LOCK_BLOCKED))
0180 return;
0181
0182 switch (lockres->l_blocking) {
0183 case DLM_LOCK_EX:
0184 if (!lockres->l_ex_holders && !lockres->l_ro_holders)
0185 queue = 1;
0186 break;
0187 case DLM_LOCK_PR:
0188 if (!lockres->l_ex_holders)
0189 queue = 1;
0190 break;
0191 default:
0192 BUG();
0193 }
0194
0195 if (queue)
0196 __user_dlm_queue_lockres(lockres);
0197 }
0198
0199 static void user_bast(struct ocfs2_dlm_lksb *lksb, int level)
0200 {
0201 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
0202
0203 mlog(ML_BASTS, "BAST fired for lockres %.*s, blocking %d, level %d\n",
0204 lockres->l_namelen, lockres->l_name, level, lockres->l_level);
0205
0206 spin_lock(&lockres->l_lock);
0207 lockres->l_flags |= USER_LOCK_BLOCKED;
0208 if (level > lockres->l_blocking)
0209 lockres->l_blocking = level;
0210
0211 __user_dlm_queue_lockres(lockres);
0212 spin_unlock(&lockres->l_lock);
0213
0214 wake_up(&lockres->l_event);
0215 }
0216
0217 static void user_unlock_ast(struct ocfs2_dlm_lksb *lksb, int status)
0218 {
0219 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
0220
0221 mlog(ML_BASTS, "UNLOCK AST fired for lockres %.*s, flags 0x%x\n",
0222 lockres->l_namelen, lockres->l_name, lockres->l_flags);
0223
0224 if (status)
0225 mlog(ML_ERROR, "dlm returns status %d\n", status);
0226
0227 spin_lock(&lockres->l_lock);
0228
0229
0230
0231 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN
0232 && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) {
0233 lockres->l_level = DLM_LOCK_IV;
0234 } else if (status == DLM_CANCELGRANT) {
0235
0236
0237
0238 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
0239 lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
0240 goto out_noclear;
0241 } else {
0242 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
0243
0244 lockres->l_requested = DLM_LOCK_IV;
0245
0246
0247 lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
0248
0249
0250 if (lockres->l_flags & USER_LOCK_BLOCKED)
0251 __user_dlm_queue_lockres(lockres);
0252 }
0253
0254 lockres->l_flags &= ~USER_LOCK_BUSY;
0255 out_noclear:
0256 spin_unlock(&lockres->l_lock);
0257
0258 wake_up(&lockres->l_event);
0259 }
0260
0261
0262
0263
0264
0265
0266 static struct ocfs2_locking_protocol user_dlm_lproto = {
0267 .lp_max_version = {
0268 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
0269 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
0270 },
0271 .lp_lock_ast = user_ast,
0272 .lp_blocking_ast = user_bast,
0273 .lp_unlock_ast = user_unlock_ast,
0274 };
0275
0276 static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres)
0277 {
0278 struct inode *inode;
0279 inode = user_dlm_inode_from_user_lockres(lockres);
0280 iput(inode);
0281 }
0282
0283 static void user_dlm_unblock_lock(struct work_struct *work)
0284 {
0285 int new_level, status;
0286 struct user_lock_res *lockres =
0287 container_of(work, struct user_lock_res, l_work);
0288 struct ocfs2_cluster_connection *conn =
0289 cluster_connection_from_user_lockres(lockres);
0290
0291 mlog(0, "lockres %.*s\n", lockres->l_namelen, lockres->l_name);
0292
0293 spin_lock(&lockres->l_lock);
0294
0295 mlog_bug_on_msg(!(lockres->l_flags & USER_LOCK_QUEUED),
0296 "Lockres %.*s, flags 0x%x\n",
0297 lockres->l_namelen, lockres->l_name, lockres->l_flags);
0298
0299
0300
0301 lockres->l_flags &= ~USER_LOCK_QUEUED;
0302
0303
0304
0305
0306
0307
0308 if (!(lockres->l_flags & USER_LOCK_BLOCKED)) {
0309 mlog(ML_BASTS, "lockres %.*s USER_LOCK_BLOCKED\n",
0310 lockres->l_namelen, lockres->l_name);
0311 spin_unlock(&lockres->l_lock);
0312 goto drop_ref;
0313 }
0314
0315 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
0316 mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_TEARDOWN\n",
0317 lockres->l_namelen, lockres->l_name);
0318 spin_unlock(&lockres->l_lock);
0319 goto drop_ref;
0320 }
0321
0322 if (lockres->l_flags & USER_LOCK_BUSY) {
0323 if (lockres->l_flags & USER_LOCK_IN_CANCEL) {
0324 mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_CANCEL\n",
0325 lockres->l_namelen, lockres->l_name);
0326 spin_unlock(&lockres->l_lock);
0327 goto drop_ref;
0328 }
0329
0330 lockres->l_flags |= USER_LOCK_IN_CANCEL;
0331 spin_unlock(&lockres->l_lock);
0332
0333 status = ocfs2_dlm_unlock(conn, &lockres->l_lksb,
0334 DLM_LKF_CANCEL);
0335 if (status)
0336 user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
0337 goto drop_ref;
0338 }
0339
0340
0341
0342
0343 if ((lockres->l_blocking == DLM_LOCK_EX)
0344 && (lockres->l_ex_holders || lockres->l_ro_holders)) {
0345 spin_unlock(&lockres->l_lock);
0346 mlog(ML_BASTS, "lockres %.*s, EX/PR Holders %u,%u\n",
0347 lockres->l_namelen, lockres->l_name,
0348 lockres->l_ex_holders, lockres->l_ro_holders);
0349 goto drop_ref;
0350 }
0351
0352 if ((lockres->l_blocking == DLM_LOCK_PR)
0353 && lockres->l_ex_holders) {
0354 spin_unlock(&lockres->l_lock);
0355 mlog(ML_BASTS, "lockres %.*s, EX Holders %u\n",
0356 lockres->l_namelen, lockres->l_name,
0357 lockres->l_ex_holders);
0358 goto drop_ref;
0359 }
0360
0361
0362 new_level = user_highest_compat_lock_level(lockres->l_blocking);
0363 lockres->l_requested = new_level;
0364 lockres->l_flags |= USER_LOCK_BUSY;
0365 mlog(ML_BASTS, "lockres %.*s, downconvert %d => %d\n",
0366 lockres->l_namelen, lockres->l_name, lockres->l_level, new_level);
0367 spin_unlock(&lockres->l_lock);
0368
0369
0370 status = ocfs2_dlm_lock(conn, new_level, &lockres->l_lksb,
0371 DLM_LKF_CONVERT|DLM_LKF_VALBLK,
0372 lockres->l_name,
0373 lockres->l_namelen);
0374 if (status) {
0375 user_log_dlm_error("ocfs2_dlm_lock", status, lockres);
0376 user_recover_from_dlm_error(lockres);
0377 }
0378
0379 drop_ref:
0380 user_dlm_drop_inode_ref(lockres);
0381 }
0382
0383 static inline void user_dlm_inc_holders(struct user_lock_res *lockres,
0384 int level)
0385 {
0386 switch(level) {
0387 case DLM_LOCK_EX:
0388 lockres->l_ex_holders++;
0389 break;
0390 case DLM_LOCK_PR:
0391 lockres->l_ro_holders++;
0392 break;
0393 default:
0394 BUG();
0395 }
0396 }
0397
0398
0399
0400
0401 static inline int
0402 user_may_continue_on_blocked_lock(struct user_lock_res *lockres,
0403 int wanted)
0404 {
0405 BUG_ON(!(lockres->l_flags & USER_LOCK_BLOCKED));
0406
0407 return wanted <= user_highest_compat_lock_level(lockres->l_blocking);
0408 }
0409
0410 int user_dlm_cluster_lock(struct user_lock_res *lockres,
0411 int level,
0412 int lkm_flags)
0413 {
0414 int status, local_flags;
0415 struct ocfs2_cluster_connection *conn =
0416 cluster_connection_from_user_lockres(lockres);
0417
0418 if (level != DLM_LOCK_EX &&
0419 level != DLM_LOCK_PR) {
0420 mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
0421 lockres->l_namelen, lockres->l_name);
0422 status = -EINVAL;
0423 goto bail;
0424 }
0425
0426 mlog(ML_BASTS, "lockres %.*s, level %d, flags = 0x%x\n",
0427 lockres->l_namelen, lockres->l_name, level, lkm_flags);
0428
0429 again:
0430 if (signal_pending(current)) {
0431 status = -ERESTARTSYS;
0432 goto bail;
0433 }
0434
0435 spin_lock(&lockres->l_lock);
0436 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
0437 spin_unlock(&lockres->l_lock);
0438 status = -EAGAIN;
0439 goto bail;
0440 }
0441
0442
0443
0444
0445 if ((lockres->l_flags & USER_LOCK_BUSY) &&
0446 (level > lockres->l_level)) {
0447
0448
0449 spin_unlock(&lockres->l_lock);
0450
0451 user_wait_on_busy_lock(lockres);
0452 goto again;
0453 }
0454
0455 if ((lockres->l_flags & USER_LOCK_BLOCKED) &&
0456 (!user_may_continue_on_blocked_lock(lockres, level))) {
0457
0458
0459 spin_unlock(&lockres->l_lock);
0460
0461 user_wait_on_blocked_lock(lockres);
0462 goto again;
0463 }
0464
0465 if (level > lockres->l_level) {
0466 local_flags = lkm_flags | DLM_LKF_VALBLK;
0467 if (lockres->l_level != DLM_LOCK_IV)
0468 local_flags |= DLM_LKF_CONVERT;
0469
0470 lockres->l_requested = level;
0471 lockres->l_flags |= USER_LOCK_BUSY;
0472 spin_unlock(&lockres->l_lock);
0473
0474 BUG_ON(level == DLM_LOCK_IV);
0475 BUG_ON(level == DLM_LOCK_NL);
0476
0477
0478 status = ocfs2_dlm_lock(conn, level, &lockres->l_lksb,
0479 local_flags, lockres->l_name,
0480 lockres->l_namelen);
0481 if (status) {
0482 if ((lkm_flags & DLM_LKF_NOQUEUE) &&
0483 (status != -EAGAIN))
0484 user_log_dlm_error("ocfs2_dlm_lock",
0485 status, lockres);
0486 user_recover_from_dlm_error(lockres);
0487 goto bail;
0488 }
0489
0490 user_wait_on_busy_lock(lockres);
0491 goto again;
0492 }
0493
0494 user_dlm_inc_holders(lockres, level);
0495 spin_unlock(&lockres->l_lock);
0496
0497 status = 0;
0498 bail:
0499 return status;
0500 }
0501
0502 static inline void user_dlm_dec_holders(struct user_lock_res *lockres,
0503 int level)
0504 {
0505 switch(level) {
0506 case DLM_LOCK_EX:
0507 BUG_ON(!lockres->l_ex_holders);
0508 lockres->l_ex_holders--;
0509 break;
0510 case DLM_LOCK_PR:
0511 BUG_ON(!lockres->l_ro_holders);
0512 lockres->l_ro_holders--;
0513 break;
0514 default:
0515 BUG();
0516 }
0517 }
0518
0519 void user_dlm_cluster_unlock(struct user_lock_res *lockres,
0520 int level)
0521 {
0522 if (level != DLM_LOCK_EX &&
0523 level != DLM_LOCK_PR) {
0524 mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
0525 lockres->l_namelen, lockres->l_name);
0526 return;
0527 }
0528
0529 spin_lock(&lockres->l_lock);
0530 user_dlm_dec_holders(lockres, level);
0531 __user_dlm_cond_queue_lockres(lockres);
0532 spin_unlock(&lockres->l_lock);
0533 }
0534
0535 void user_dlm_write_lvb(struct inode *inode,
0536 const char *val,
0537 unsigned int len)
0538 {
0539 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
0540 char *lvb;
0541
0542 BUG_ON(len > DLM_LVB_LEN);
0543
0544 spin_lock(&lockres->l_lock);
0545
0546 BUG_ON(lockres->l_level < DLM_LOCK_EX);
0547 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
0548 memcpy(lvb, val, len);
0549
0550 spin_unlock(&lockres->l_lock);
0551 }
0552
0553 bool user_dlm_read_lvb(struct inode *inode, char *val)
0554 {
0555 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
0556 char *lvb;
0557 bool ret = true;
0558
0559 spin_lock(&lockres->l_lock);
0560
0561 BUG_ON(lockres->l_level < DLM_LOCK_PR);
0562 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)) {
0563 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
0564 memcpy(val, lvb, DLM_LVB_LEN);
0565 } else
0566 ret = false;
0567
0568 spin_unlock(&lockres->l_lock);
0569 return ret;
0570 }
0571
0572 void user_dlm_lock_res_init(struct user_lock_res *lockres,
0573 struct dentry *dentry)
0574 {
0575 memset(lockres, 0, sizeof(*lockres));
0576
0577 spin_lock_init(&lockres->l_lock);
0578 init_waitqueue_head(&lockres->l_event);
0579 lockres->l_level = DLM_LOCK_IV;
0580 lockres->l_requested = DLM_LOCK_IV;
0581 lockres->l_blocking = DLM_LOCK_IV;
0582
0583
0584 BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN);
0585
0586 memcpy(lockres->l_name,
0587 dentry->d_name.name,
0588 dentry->d_name.len);
0589 lockres->l_namelen = dentry->d_name.len;
0590 }
0591
0592 int user_dlm_destroy_lock(struct user_lock_res *lockres)
0593 {
0594 int status = -EBUSY;
0595 struct ocfs2_cluster_connection *conn =
0596 cluster_connection_from_user_lockres(lockres);
0597
0598 mlog(ML_BASTS, "lockres %.*s\n", lockres->l_namelen, lockres->l_name);
0599
0600 spin_lock(&lockres->l_lock);
0601 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
0602 spin_unlock(&lockres->l_lock);
0603 goto bail;
0604 }
0605
0606 lockres->l_flags |= USER_LOCK_IN_TEARDOWN;
0607
0608 while (lockres->l_flags & USER_LOCK_BUSY) {
0609 spin_unlock(&lockres->l_lock);
0610
0611 user_wait_on_busy_lock(lockres);
0612
0613 spin_lock(&lockres->l_lock);
0614 }
0615
0616 if (lockres->l_ro_holders || lockres->l_ex_holders) {
0617 lockres->l_flags &= ~USER_LOCK_IN_TEARDOWN;
0618 spin_unlock(&lockres->l_lock);
0619 goto bail;
0620 }
0621
0622 status = 0;
0623 if (!(lockres->l_flags & USER_LOCK_ATTACHED)) {
0624
0625
0626
0627
0628 spin_unlock(&lockres->l_lock);
0629 goto bail;
0630 }
0631
0632 lockres->l_flags |= USER_LOCK_BUSY;
0633 spin_unlock(&lockres->l_lock);
0634
0635 status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, DLM_LKF_VALBLK);
0636 if (status) {
0637 spin_lock(&lockres->l_lock);
0638 lockres->l_flags &= ~USER_LOCK_IN_TEARDOWN;
0639 lockres->l_flags &= ~USER_LOCK_BUSY;
0640 spin_unlock(&lockres->l_lock);
0641 user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
0642 goto bail;
0643 }
0644
0645 user_wait_on_busy_lock(lockres);
0646
0647 status = 0;
0648 bail:
0649 return status;
0650 }
0651
0652 static void user_dlm_recovery_handler_noop(int node_num,
0653 void *recovery_data)
0654 {
0655
0656 return;
0657 }
0658
0659 void user_dlm_set_locking_protocol(void)
0660 {
0661 ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto.lp_max_version);
0662 }
0663
0664 struct ocfs2_cluster_connection *user_dlm_register(const struct qstr *name)
0665 {
0666 int rc;
0667 struct ocfs2_cluster_connection *conn;
0668
0669 rc = ocfs2_cluster_connect_agnostic(name->name, name->len,
0670 &user_dlm_lproto,
0671 user_dlm_recovery_handler_noop,
0672 NULL, &conn);
0673 if (rc)
0674 mlog_errno(rc);
0675
0676 return rc ? ERR_PTR(rc) : conn;
0677 }
0678
0679 void user_dlm_unregister(struct ocfs2_cluster_connection *conn)
0680 {
0681 ocfs2_cluster_disconnect(conn, 0);
0682 }