0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056 #include <trace/events/dlm.h>
0057
0058 #include <linux/types.h>
0059 #include <linux/rbtree.h>
0060 #include <linux/slab.h>
0061 #include "dlm_internal.h"
0062 #include <linux/dlm_device.h>
0063 #include "memory.h"
0064 #include "midcomms.h"
0065 #include "requestqueue.h"
0066 #include "util.h"
0067 #include "dir.h"
0068 #include "member.h"
0069 #include "lockspace.h"
0070 #include "ast.h"
0071 #include "lock.h"
0072 #include "rcom.h"
0073 #include "recover.h"
0074 #include "lvb_table.h"
0075 #include "user.h"
0076 #include "config.h"
0077
0078 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
0079 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
0080 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
0081 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
0082 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
0083 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
0084 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
0085 static int send_remove(struct dlm_rsb *r);
0086 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
0087 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
0088 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
0089 struct dlm_message *ms);
0090 static int receive_extralen(struct dlm_message *ms);
0091 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
0092 static void del_timeout(struct dlm_lkb *lkb);
0093 static void toss_rsb(struct kref *kref);
0094
0095
0096
0097
0098
0099
0100
0101
0102
0103 static const int __dlm_compat_matrix[8][8] = {
0104
0105 {1, 1, 1, 1, 1, 1, 1, 0},
0106 {1, 1, 1, 1, 1, 1, 1, 0},
0107 {1, 1, 1, 1, 1, 1, 0, 0},
0108 {1, 1, 1, 1, 0, 0, 0, 0},
0109 {1, 1, 1, 0, 1, 0, 0, 0},
0110 {1, 1, 1, 0, 0, 0, 0, 0},
0111 {1, 1, 0, 0, 0, 0, 0, 0},
0112 {0, 0, 0, 0, 0, 0, 0, 0}
0113 };
0114
0115
0116
0117
0118
0119
0120
0121
0122
0123
0124 const int dlm_lvb_operations[8][8] = {
0125
0126 { -1, 1, 1, 1, 1, 1, 1, -1 },
0127 { -1, 1, 1, 1, 1, 1, 1, 0 },
0128 { -1, -1, 1, 1, 1, 1, 1, 0 },
0129 { -1, -1, -1, 1, 1, 1, 1, 0 },
0130 { -1, -1, -1, -1, 1, 1, 1, 0 },
0131 { -1, 0, 0, 0, 0, 0, 1, 0 },
0132 { -1, 0, 0, 0, 0, 0, 0, 0 },
0133 { -1, 0, 0, 0, 0, 0, 0, 0 }
0134 };
0135
0136 #define modes_compat(gr, rq) \
0137 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
0138
0139 int dlm_modes_compat(int mode1, int mode2)
0140 {
0141 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
0142 }
0143
0144
0145
0146
0147
0148
0149
0150 static const int __quecvt_compat_matrix[8][8] = {
0151
0152 {0, 0, 0, 0, 0, 0, 0, 0},
0153 {0, 0, 1, 1, 1, 1, 1, 0},
0154 {0, 0, 0, 1, 1, 1, 1, 0},
0155 {0, 0, 0, 0, 1, 1, 1, 0},
0156 {0, 0, 0, 1, 0, 1, 1, 0},
0157 {0, 0, 0, 0, 0, 0, 1, 0},
0158 {0, 0, 0, 0, 0, 0, 0, 0},
0159 {0, 0, 0, 0, 0, 0, 0, 0}
0160 };
0161
0162 void dlm_print_lkb(struct dlm_lkb *lkb)
0163 {
0164 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
0165 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
0166 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
0167 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
0168 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
0169 (unsigned long long)lkb->lkb_recover_seq);
0170 }
0171
0172 static void dlm_print_rsb(struct dlm_rsb *r)
0173 {
0174 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
0175 "rlc %d name %s\n",
0176 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
0177 r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
0178 r->res_name);
0179 }
0180
0181 void dlm_dump_rsb(struct dlm_rsb *r)
0182 {
0183 struct dlm_lkb *lkb;
0184
0185 dlm_print_rsb(r);
0186
0187 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
0188 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
0189 printk(KERN_ERR "rsb lookup list\n");
0190 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
0191 dlm_print_lkb(lkb);
0192 printk(KERN_ERR "rsb grant queue:\n");
0193 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
0194 dlm_print_lkb(lkb);
0195 printk(KERN_ERR "rsb convert queue:\n");
0196 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
0197 dlm_print_lkb(lkb);
0198 printk(KERN_ERR "rsb wait queue:\n");
0199 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
0200 dlm_print_lkb(lkb);
0201 }
0202
0203
0204
0205 static inline void dlm_lock_recovery(struct dlm_ls *ls)
0206 {
0207 down_read(&ls->ls_in_recovery);
0208 }
0209
0210 void dlm_unlock_recovery(struct dlm_ls *ls)
0211 {
0212 up_read(&ls->ls_in_recovery);
0213 }
0214
0215 int dlm_lock_recovery_try(struct dlm_ls *ls)
0216 {
0217 return down_read_trylock(&ls->ls_in_recovery);
0218 }
0219
0220 static inline int can_be_queued(struct dlm_lkb *lkb)
0221 {
0222 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
0223 }
0224
0225 static inline int force_blocking_asts(struct dlm_lkb *lkb)
0226 {
0227 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
0228 }
0229
0230 static inline int is_demoted(struct dlm_lkb *lkb)
0231 {
0232 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
0233 }
0234
0235 static inline int is_altmode(struct dlm_lkb *lkb)
0236 {
0237 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
0238 }
0239
0240 static inline int is_granted(struct dlm_lkb *lkb)
0241 {
0242 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
0243 }
0244
0245 static inline int is_remote(struct dlm_rsb *r)
0246 {
0247 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
0248 return !!r->res_nodeid;
0249 }
0250
0251 static inline int is_process_copy(struct dlm_lkb *lkb)
0252 {
0253 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
0254 }
0255
0256 static inline int is_master_copy(struct dlm_lkb *lkb)
0257 {
0258 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
0259 }
0260
0261 static inline int middle_conversion(struct dlm_lkb *lkb)
0262 {
0263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
0264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
0265 return 1;
0266 return 0;
0267 }
0268
0269 static inline int down_conversion(struct dlm_lkb *lkb)
0270 {
0271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
0272 }
0273
0274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
0275 {
0276 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
0277 }
0278
0279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
0280 {
0281 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
0282 }
0283
0284 static inline int is_overlap(struct dlm_lkb *lkb)
0285 {
0286 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
0287 DLM_IFL_OVERLAP_CANCEL));
0288 }
0289
0290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
0291 {
0292 if (is_master_copy(lkb))
0293 return;
0294
0295 del_timeout(lkb);
0296
0297 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
0298
0299 #ifdef CONFIG_DLM_DEPRECATED_API
0300
0301
0302 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
0303 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
0304 rv = -ETIMEDOUT;
0305 }
0306 #endif
0307
0308 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
0309 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
0310 rv = -EDEADLK;
0311 }
0312
0313 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
0314 }
0315
0316 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
0317 {
0318 queue_cast(r, lkb,
0319 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
0320 }
0321
0322 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
0323 {
0324 if (is_master_copy(lkb)) {
0325 send_bast(r, lkb, rqmode);
0326 } else {
0327 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
0328 }
0329 }
0330
0331
0332
0333
0334
0335
0336
0337
0338 static inline void hold_rsb(struct dlm_rsb *r)
0339 {
0340 kref_get(&r->res_ref);
0341 }
0342
0343 void dlm_hold_rsb(struct dlm_rsb *r)
0344 {
0345 hold_rsb(r);
0346 }
0347
0348
0349
0350
0351 static void put_rsb(struct dlm_rsb *r)
0352 {
0353 struct dlm_ls *ls = r->res_ls;
0354 uint32_t bucket = r->res_bucket;
0355 int rv;
0356
0357 rv = kref_put_lock(&r->res_ref, toss_rsb,
0358 &ls->ls_rsbtbl[bucket].lock);
0359 if (rv)
0360 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
0361 }
0362
0363 void dlm_put_rsb(struct dlm_rsb *r)
0364 {
0365 put_rsb(r);
0366 }
0367
0368 static int pre_rsb_struct(struct dlm_ls *ls)
0369 {
0370 struct dlm_rsb *r1, *r2;
0371 int count = 0;
0372
0373 spin_lock(&ls->ls_new_rsb_spin);
0374 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
0375 spin_unlock(&ls->ls_new_rsb_spin);
0376 return 0;
0377 }
0378 spin_unlock(&ls->ls_new_rsb_spin);
0379
0380 r1 = dlm_allocate_rsb(ls);
0381 r2 = dlm_allocate_rsb(ls);
0382
0383 spin_lock(&ls->ls_new_rsb_spin);
0384 if (r1) {
0385 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
0386 ls->ls_new_rsb_count++;
0387 }
0388 if (r2) {
0389 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
0390 ls->ls_new_rsb_count++;
0391 }
0392 count = ls->ls_new_rsb_count;
0393 spin_unlock(&ls->ls_new_rsb_spin);
0394
0395 if (!count)
0396 return -ENOMEM;
0397 return 0;
0398 }
0399
0400
0401
0402
0403
0404 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
0405 struct dlm_rsb **r_ret)
0406 {
0407 struct dlm_rsb *r;
0408 int count;
0409
0410 spin_lock(&ls->ls_new_rsb_spin);
0411 if (list_empty(&ls->ls_new_rsb)) {
0412 count = ls->ls_new_rsb_count;
0413 spin_unlock(&ls->ls_new_rsb_spin);
0414 log_debug(ls, "find_rsb retry %d %d %s",
0415 count, dlm_config.ci_new_rsb_count, name);
0416 return -EAGAIN;
0417 }
0418
0419 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
0420 list_del(&r->res_hashchain);
0421
0422 memset(&r->res_hashnode, 0, sizeof(struct rb_node));
0423 ls->ls_new_rsb_count--;
0424 spin_unlock(&ls->ls_new_rsb_spin);
0425
0426 r->res_ls = ls;
0427 r->res_length = len;
0428 memcpy(r->res_name, name, len);
0429 mutex_init(&r->res_mutex);
0430
0431 INIT_LIST_HEAD(&r->res_lookup);
0432 INIT_LIST_HEAD(&r->res_grantqueue);
0433 INIT_LIST_HEAD(&r->res_convertqueue);
0434 INIT_LIST_HEAD(&r->res_waitqueue);
0435 INIT_LIST_HEAD(&r->res_root_list);
0436 INIT_LIST_HEAD(&r->res_recover_list);
0437
0438 *r_ret = r;
0439 return 0;
0440 }
0441
0442 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
0443 {
0444 char maxname[DLM_RESNAME_MAXLEN];
0445
0446 memset(maxname, 0, DLM_RESNAME_MAXLEN);
0447 memcpy(maxname, name, nlen);
0448 return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
0449 }
0450
0451 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
0452 struct dlm_rsb **r_ret)
0453 {
0454 struct rb_node *node = tree->rb_node;
0455 struct dlm_rsb *r;
0456 int rc;
0457
0458 while (node) {
0459 r = rb_entry(node, struct dlm_rsb, res_hashnode);
0460 rc = rsb_cmp(r, name, len);
0461 if (rc < 0)
0462 node = node->rb_left;
0463 else if (rc > 0)
0464 node = node->rb_right;
0465 else
0466 goto found;
0467 }
0468 *r_ret = NULL;
0469 return -EBADR;
0470
0471 found:
0472 *r_ret = r;
0473 return 0;
0474 }
0475
0476 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
0477 {
0478 struct rb_node **newn = &tree->rb_node;
0479 struct rb_node *parent = NULL;
0480 int rc;
0481
0482 while (*newn) {
0483 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
0484 res_hashnode);
0485
0486 parent = *newn;
0487 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
0488 if (rc < 0)
0489 newn = &parent->rb_left;
0490 else if (rc > 0)
0491 newn = &parent->rb_right;
0492 else {
0493 log_print("rsb_insert match");
0494 dlm_dump_rsb(rsb);
0495 dlm_dump_rsb(cur);
0496 return -EEXIST;
0497 }
0498 }
0499
0500 rb_link_node(&rsb->res_hashnode, parent, newn);
0501 rb_insert_color(&rsb->res_hashnode, tree);
0502 return 0;
0503 }
0504
0505
0506
0507
0508
0509
0510
0511
0512
0513
0514
0515
0516
0517
0518
0519
0520
0521
0522
0523
0524
0525
0526
0527
0528
0529
0530
0531
0532
0533
0534
0535
0536
0537
0538
0539
0540
0541
0542
0543
0544
0545
0546
0547
0548
0549 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
0550 uint32_t hash, uint32_t b,
0551 int dir_nodeid, int from_nodeid,
0552 unsigned int flags, struct dlm_rsb **r_ret)
0553 {
0554 struct dlm_rsb *r = NULL;
0555 int our_nodeid = dlm_our_nodeid();
0556 int from_local = 0;
0557 int from_other = 0;
0558 int from_dir = 0;
0559 int create = 0;
0560 int error;
0561
0562 if (flags & R_RECEIVE_REQUEST) {
0563 if (from_nodeid == dir_nodeid)
0564 from_dir = 1;
0565 else
0566 from_other = 1;
0567 } else if (flags & R_REQUEST) {
0568 from_local = 1;
0569 }
0570
0571
0572
0573
0574
0575
0576
0577
0578
0579
0580
0581
0582
0583
0584
0585
0586 if (from_local || from_dir ||
0587 (from_other && (dir_nodeid == our_nodeid))) {
0588 create = 1;
0589 }
0590
0591 retry:
0592 if (create) {
0593 error = pre_rsb_struct(ls);
0594 if (error < 0)
0595 goto out;
0596 }
0597
0598 spin_lock(&ls->ls_rsbtbl[b].lock);
0599
0600 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
0601 if (error)
0602 goto do_toss;
0603
0604
0605
0606
0607
0608 kref_get(&r->res_ref);
0609 goto out_unlock;
0610
0611
0612 do_toss:
0613 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
0614 if (error)
0615 goto do_new;
0616
0617
0618
0619
0620
0621
0622
0623
0624 if ((r->res_master_nodeid != our_nodeid) && from_other) {
0625
0626
0627 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
0628 from_nodeid, r->res_master_nodeid, dir_nodeid,
0629 r->res_name);
0630 error = -ENOTBLK;
0631 goto out_unlock;
0632 }
0633
0634 if ((r->res_master_nodeid != our_nodeid) && from_dir) {
0635
0636 log_error(ls, "find_rsb toss from_dir %d master %d",
0637 from_nodeid, r->res_master_nodeid);
0638 dlm_print_rsb(r);
0639
0640 r->res_master_nodeid = our_nodeid;
0641 r->res_nodeid = 0;
0642 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
0643 r->res_first_lkid = 0;
0644 }
0645
0646 if (from_local && (r->res_master_nodeid != our_nodeid)) {
0647
0648
0649 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
0650 r->res_first_lkid = 0;
0651 }
0652
0653 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
0654 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
0655 goto out_unlock;
0656
0657
0658 do_new:
0659
0660
0661
0662
0663 if (error == -EBADR && !create)
0664 goto out_unlock;
0665
0666 error = get_rsb_struct(ls, name, len, &r);
0667 if (error == -EAGAIN) {
0668 spin_unlock(&ls->ls_rsbtbl[b].lock);
0669 goto retry;
0670 }
0671 if (error)
0672 goto out_unlock;
0673
0674 r->res_hash = hash;
0675 r->res_bucket = b;
0676 r->res_dir_nodeid = dir_nodeid;
0677 kref_init(&r->res_ref);
0678
0679 if (from_dir) {
0680
0681 log_debug(ls, "find_rsb new from_dir %d recreate %s",
0682 from_nodeid, r->res_name);
0683 r->res_master_nodeid = our_nodeid;
0684 r->res_nodeid = 0;
0685 goto out_add;
0686 }
0687
0688 if (from_other && (dir_nodeid != our_nodeid)) {
0689
0690 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
0691 from_nodeid, dir_nodeid, our_nodeid, r->res_name);
0692 dlm_free_rsb(r);
0693 r = NULL;
0694 error = -ENOTBLK;
0695 goto out_unlock;
0696 }
0697
0698 if (from_other) {
0699 log_debug(ls, "find_rsb new from_other %d dir %d %s",
0700 from_nodeid, dir_nodeid, r->res_name);
0701 }
0702
0703 if (dir_nodeid == our_nodeid) {
0704
0705
0706 r->res_master_nodeid = our_nodeid;
0707 r->res_nodeid = 0;
0708 } else {
0709
0710 r->res_master_nodeid = 0;
0711 r->res_nodeid = -1;
0712 }
0713
0714 out_add:
0715 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
0716 out_unlock:
0717 spin_unlock(&ls->ls_rsbtbl[b].lock);
0718 out:
0719 *r_ret = r;
0720 return error;
0721 }
0722
0723
0724
0725
0726
0727 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
0728 uint32_t hash, uint32_t b,
0729 int dir_nodeid, int from_nodeid,
0730 unsigned int flags, struct dlm_rsb **r_ret)
0731 {
0732 struct dlm_rsb *r = NULL;
0733 int our_nodeid = dlm_our_nodeid();
0734 int recover = (flags & R_RECEIVE_RECOVER);
0735 int error;
0736
0737 retry:
0738 error = pre_rsb_struct(ls);
0739 if (error < 0)
0740 goto out;
0741
0742 spin_lock(&ls->ls_rsbtbl[b].lock);
0743
0744 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
0745 if (error)
0746 goto do_toss;
0747
0748
0749
0750
0751
0752 kref_get(&r->res_ref);
0753 goto out_unlock;
0754
0755
0756 do_toss:
0757 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
0758 if (error)
0759 goto do_new;
0760
0761
0762
0763
0764
0765
0766
0767 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
0768
0769
0770 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
0771 from_nodeid, r->res_master_nodeid, dir_nodeid);
0772 dlm_print_rsb(r);
0773 error = -ENOTBLK;
0774 goto out_unlock;
0775 }
0776
0777 if (!recover && (r->res_master_nodeid != our_nodeid) &&
0778 (dir_nodeid == our_nodeid)) {
0779
0780
0781 log_error(ls, "find_rsb toss our %d master %d dir %d",
0782 our_nodeid, r->res_master_nodeid, dir_nodeid);
0783 dlm_print_rsb(r);
0784 r->res_master_nodeid = our_nodeid;
0785 r->res_nodeid = 0;
0786 }
0787
0788 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
0789 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
0790 goto out_unlock;
0791
0792
0793 do_new:
0794
0795
0796
0797
0798 error = get_rsb_struct(ls, name, len, &r);
0799 if (error == -EAGAIN) {
0800 spin_unlock(&ls->ls_rsbtbl[b].lock);
0801 goto retry;
0802 }
0803 if (error)
0804 goto out_unlock;
0805
0806 r->res_hash = hash;
0807 r->res_bucket = b;
0808 r->res_dir_nodeid = dir_nodeid;
0809 r->res_master_nodeid = dir_nodeid;
0810 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
0811 kref_init(&r->res_ref);
0812
0813 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
0814 out_unlock:
0815 spin_unlock(&ls->ls_rsbtbl[b].lock);
0816 out:
0817 *r_ret = r;
0818 return error;
0819 }
0820
0821 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
0822 unsigned int flags, struct dlm_rsb **r_ret)
0823 {
0824 uint32_t hash, b;
0825 int dir_nodeid;
0826
0827 if (len > DLM_RESNAME_MAXLEN)
0828 return -EINVAL;
0829
0830 hash = jhash(name, len, 0);
0831 b = hash & (ls->ls_rsbtbl_size - 1);
0832
0833 dir_nodeid = dlm_hash2nodeid(ls, hash);
0834
0835 if (dlm_no_directory(ls))
0836 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
0837 from_nodeid, flags, r_ret);
0838 else
0839 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
0840 from_nodeid, flags, r_ret);
0841 }
0842
0843
0844
0845
0846 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
0847 int from_nodeid)
0848 {
0849 if (dlm_no_directory(ls)) {
0850 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
0851 from_nodeid, r->res_master_nodeid,
0852 r->res_dir_nodeid);
0853 dlm_print_rsb(r);
0854 return -ENOTBLK;
0855 }
0856
0857 if (from_nodeid != r->res_dir_nodeid) {
0858
0859
0860
0861
0862 if (r->res_master_nodeid) {
0863 log_debug(ls, "validate master from_other %d master %d "
0864 "dir %d first %x %s", from_nodeid,
0865 r->res_master_nodeid, r->res_dir_nodeid,
0866 r->res_first_lkid, r->res_name);
0867 }
0868 return -ENOTBLK;
0869 } else {
0870
0871
0872
0873 if (r->res_master_nodeid) {
0874 log_error(ls, "validate master from_dir %d master %d "
0875 "first %x %s",
0876 from_nodeid, r->res_master_nodeid,
0877 r->res_first_lkid, r->res_name);
0878 }
0879
0880 r->res_master_nodeid = dlm_our_nodeid();
0881 r->res_nodeid = 0;
0882 return 0;
0883 }
0884 }
0885
0886 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
0887 int from_nodeid, bool toss_list, unsigned int flags,
0888 int *r_nodeid, int *result)
0889 {
0890 int fix_master = (flags & DLM_LU_RECOVER_MASTER);
0891 int from_master = (flags & DLM_LU_RECOVER_DIR);
0892
0893 if (r->res_dir_nodeid != our_nodeid) {
0894
0895 log_error(ls, "%s res_dir %d our %d %s", __func__,
0896 r->res_dir_nodeid, our_nodeid, r->res_name);
0897 r->res_dir_nodeid = our_nodeid;
0898 }
0899
0900 if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
0901
0902
0903
0904
0905
0906
0907 r->res_master_nodeid = from_nodeid;
0908 r->res_nodeid = from_nodeid;
0909 rsb_set_flag(r, RSB_NEW_MASTER);
0910
0911 if (toss_list) {
0912
0913 log_error(ls, "%s fix_master on toss", __func__);
0914 dlm_dump_rsb(r);
0915 }
0916 }
0917
0918 if (from_master && (r->res_master_nodeid != from_nodeid)) {
0919
0920
0921
0922
0923
0924 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
0925 __func__, from_nodeid, r->res_master_nodeid,
0926 r->res_nodeid, r->res_first_lkid, r->res_name);
0927
0928 if (r->res_master_nodeid == our_nodeid) {
0929 log_error(ls, "from_master %d our_master", from_nodeid);
0930 dlm_dump_rsb(r);
0931 goto ret_assign;
0932 }
0933
0934 r->res_master_nodeid = from_nodeid;
0935 r->res_nodeid = from_nodeid;
0936 rsb_set_flag(r, RSB_NEW_MASTER);
0937 }
0938
0939 if (!r->res_master_nodeid) {
0940
0941
0942
0943
0944 log_debug(ls, "%s master 0 to %d first %x %s", __func__,
0945 from_nodeid, r->res_first_lkid, r->res_name);
0946 r->res_master_nodeid = from_nodeid;
0947 r->res_nodeid = from_nodeid;
0948 }
0949
0950 if (!from_master && !fix_master &&
0951 (r->res_master_nodeid == from_nodeid)) {
0952
0953
0954
0955
0956
0957 log_limit(ls, "%s from master %d flags %x first %x %s",
0958 __func__, from_nodeid, flags, r->res_first_lkid,
0959 r->res_name);
0960 }
0961
0962 ret_assign:
0963 *r_nodeid = r->res_master_nodeid;
0964 if (result)
0965 *result = DLM_LU_MATCH;
0966 }
0967
0968
0969
0970
0971
0972
0973
0974
0975
0976
0977
0978
0979
0980
0981
0982
0983
0984
0985
0986
0987
0988
0989
0990
0991
0992
0993
0994
0995
0996
0997 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
0998 unsigned int flags, int *r_nodeid, int *result)
0999 {
1000 struct dlm_rsb *r = NULL;
1001 uint32_t hash, b;
1002 int our_nodeid = dlm_our_nodeid();
1003 int dir_nodeid, error;
1004
1005 if (len > DLM_RESNAME_MAXLEN)
1006 return -EINVAL;
1007
1008 if (from_nodeid == our_nodeid) {
1009 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1010 our_nodeid, flags);
1011 return -EINVAL;
1012 }
1013
1014 hash = jhash(name, len, 0);
1015 b = hash & (ls->ls_rsbtbl_size - 1);
1016
1017 dir_nodeid = dlm_hash2nodeid(ls, hash);
1018 if (dir_nodeid != our_nodeid) {
1019 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1020 from_nodeid, dir_nodeid, our_nodeid, hash,
1021 ls->ls_num_nodes);
1022 *r_nodeid = -1;
1023 return -EINVAL;
1024 }
1025
1026 retry:
1027 error = pre_rsb_struct(ls);
1028 if (error < 0)
1029 return error;
1030
1031 spin_lock(&ls->ls_rsbtbl[b].lock);
1032 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1033 if (!error) {
1034
1035
1036
1037
1038 hold_rsb(r);
1039 spin_unlock(&ls->ls_rsbtbl[b].lock);
1040 lock_rsb(r);
1041
1042 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1043 flags, r_nodeid, result);
1044
1045
1046 unlock_rsb(r);
1047 put_rsb(r);
1048
1049 return 0;
1050 }
1051
1052 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1053 if (error)
1054 goto not_found;
1055
1056
1057
1058
1059
1060 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1061 r_nodeid, result);
1062
1063 r->res_toss_time = jiffies;
1064
1065 spin_unlock(&ls->ls_rsbtbl[b].lock);
1066
1067 return 0;
1068
1069 not_found:
1070 error = get_rsb_struct(ls, name, len, &r);
1071 if (error == -EAGAIN) {
1072 spin_unlock(&ls->ls_rsbtbl[b].lock);
1073 goto retry;
1074 }
1075 if (error)
1076 goto out_unlock;
1077
1078 r->res_hash = hash;
1079 r->res_bucket = b;
1080 r->res_dir_nodeid = our_nodeid;
1081 r->res_master_nodeid = from_nodeid;
1082 r->res_nodeid = from_nodeid;
1083 kref_init(&r->res_ref);
1084 r->res_toss_time = jiffies;
1085
1086 error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1087 if (error) {
1088
1089 dlm_free_rsb(r);
1090 spin_unlock(&ls->ls_rsbtbl[b].lock);
1091 goto retry;
1092 }
1093
1094 if (result)
1095 *result = DLM_LU_ADD;
1096 *r_nodeid = from_nodeid;
1097 out_unlock:
1098 spin_unlock(&ls->ls_rsbtbl[b].lock);
1099 return error;
1100 }
1101
1102 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1103 {
1104 struct rb_node *n;
1105 struct dlm_rsb *r;
1106 int i;
1107
1108 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1109 spin_lock(&ls->ls_rsbtbl[i].lock);
1110 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1111 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1112 if (r->res_hash == hash)
1113 dlm_dump_rsb(r);
1114 }
1115 spin_unlock(&ls->ls_rsbtbl[i].lock);
1116 }
1117 }
1118
1119 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1120 {
1121 struct dlm_rsb *r = NULL;
1122 uint32_t hash, b;
1123 int error;
1124
1125 hash = jhash(name, len, 0);
1126 b = hash & (ls->ls_rsbtbl_size - 1);
1127
1128 spin_lock(&ls->ls_rsbtbl[b].lock);
1129 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1130 if (!error)
1131 goto out_dump;
1132
1133 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1134 if (error)
1135 goto out;
1136 out_dump:
1137 dlm_dump_rsb(r);
1138 out:
1139 spin_unlock(&ls->ls_rsbtbl[b].lock);
1140 }
1141
1142 static void toss_rsb(struct kref *kref)
1143 {
1144 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1145 struct dlm_ls *ls = r->res_ls;
1146
1147 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1148 kref_init(&r->res_ref);
1149 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1150 rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1151 r->res_toss_time = jiffies;
1152 ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK;
1153 if (r->res_lvbptr) {
1154 dlm_free_lvb(r->res_lvbptr);
1155 r->res_lvbptr = NULL;
1156 }
1157 }
1158
1159
1160
1161 static void unhold_rsb(struct dlm_rsb *r)
1162 {
1163 int rv;
1164 rv = kref_put(&r->res_ref, toss_rsb);
1165 DLM_ASSERT(!rv, dlm_dump_rsb(r););
1166 }
1167
1168 static void kill_rsb(struct kref *kref)
1169 {
1170 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1171
1172
1173
1174
1175 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1176 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1177 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1178 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1179 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1180 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1181 }
1182
1183
1184
1185
1186 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1187 {
1188 hold_rsb(r);
1189 lkb->lkb_resource = r;
1190 }
1191
1192 static void detach_lkb(struct dlm_lkb *lkb)
1193 {
1194 if (lkb->lkb_resource) {
1195 put_rsb(lkb->lkb_resource);
1196 lkb->lkb_resource = NULL;
1197 }
1198 }
1199
1200 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1201 int start, int end)
1202 {
1203 struct dlm_lkb *lkb;
1204 int rv;
1205
1206 lkb = dlm_allocate_lkb(ls);
1207 if (!lkb)
1208 return -ENOMEM;
1209
1210 lkb->lkb_nodeid = -1;
1211 lkb->lkb_grmode = DLM_LOCK_IV;
1212 kref_init(&lkb->lkb_ref);
1213 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1214 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1215 #ifdef CONFIG_DLM_DEPRECATED_API
1216 INIT_LIST_HEAD(&lkb->lkb_time_list);
1217 #endif
1218 INIT_LIST_HEAD(&lkb->lkb_cb_list);
1219 mutex_init(&lkb->lkb_cb_mutex);
1220 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1221
1222 idr_preload(GFP_NOFS);
1223 spin_lock(&ls->ls_lkbidr_spin);
1224 rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
1225 if (rv >= 0)
1226 lkb->lkb_id = rv;
1227 spin_unlock(&ls->ls_lkbidr_spin);
1228 idr_preload_end();
1229
1230 if (rv < 0) {
1231 log_error(ls, "create_lkb idr error %d", rv);
1232 dlm_free_lkb(lkb);
1233 return rv;
1234 }
1235
1236 *lkb_ret = lkb;
1237 return 0;
1238 }
1239
1240 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1241 {
1242 return _create_lkb(ls, lkb_ret, 1, 0);
1243 }
1244
1245 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1246 {
1247 struct dlm_lkb *lkb;
1248
1249 spin_lock(&ls->ls_lkbidr_spin);
1250 lkb = idr_find(&ls->ls_lkbidr, lkid);
1251 if (lkb)
1252 kref_get(&lkb->lkb_ref);
1253 spin_unlock(&ls->ls_lkbidr_spin);
1254
1255 *lkb_ret = lkb;
1256 return lkb ? 0 : -ENOENT;
1257 }
1258
1259 static void kill_lkb(struct kref *kref)
1260 {
1261 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1262
1263
1264
1265
1266 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1267 }
1268
1269
1270
1271
1272 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1273 {
1274 uint32_t lkid = lkb->lkb_id;
1275 int rv;
1276
1277 rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
1278 &ls->ls_lkbidr_spin);
1279 if (rv) {
1280 idr_remove(&ls->ls_lkbidr, lkid);
1281 spin_unlock(&ls->ls_lkbidr_spin);
1282
1283 detach_lkb(lkb);
1284
1285
1286 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1287 dlm_free_lvb(lkb->lkb_lvbptr);
1288 dlm_free_lkb(lkb);
1289 }
1290
1291 return rv;
1292 }
1293
1294 int dlm_put_lkb(struct dlm_lkb *lkb)
1295 {
1296 struct dlm_ls *ls;
1297
1298 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1299 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1300
1301 ls = lkb->lkb_resource->res_ls;
1302 return __put_lkb(ls, lkb);
1303 }
1304
1305
1306
1307
1308 static inline void hold_lkb(struct dlm_lkb *lkb)
1309 {
1310 kref_get(&lkb->lkb_ref);
1311 }
1312
1313 static void unhold_lkb_assert(struct kref *kref)
1314 {
1315 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1316
1317 DLM_ASSERT(false, dlm_print_lkb(lkb););
1318 }
1319
1320
1321
1322
1323
1324
1325 static inline void unhold_lkb(struct dlm_lkb *lkb)
1326 {
1327 kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1328 }
1329
1330 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1331 int mode)
1332 {
1333 struct dlm_lkb *lkb = NULL, *iter;
1334
1335 list_for_each_entry(iter, head, lkb_statequeue)
1336 if (iter->lkb_rqmode < mode) {
1337 lkb = iter;
1338 list_add_tail(new, &iter->lkb_statequeue);
1339 break;
1340 }
1341
1342 if (!lkb)
1343 list_add_tail(new, head);
1344 }
1345
1346
1347
1348 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1349 {
1350 kref_get(&lkb->lkb_ref);
1351
1352 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1353
1354 lkb->lkb_timestamp = ktime_get();
1355
1356 lkb->lkb_status = status;
1357
1358 switch (status) {
1359 case DLM_LKSTS_WAITING:
1360 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1361 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1362 else
1363 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1364 break;
1365 case DLM_LKSTS_GRANTED:
1366
1367 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1368 lkb->lkb_grmode);
1369 break;
1370 case DLM_LKSTS_CONVERT:
1371 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1372 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1373 else
1374 list_add_tail(&lkb->lkb_statequeue,
1375 &r->res_convertqueue);
1376 break;
1377 default:
1378 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1379 }
1380 }
1381
1382 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1383 {
1384 lkb->lkb_status = 0;
1385 list_del(&lkb->lkb_statequeue);
1386 unhold_lkb(lkb);
1387 }
1388
1389 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1390 {
1391 hold_lkb(lkb);
1392 del_lkb(r, lkb);
1393 add_lkb(r, lkb, sts);
1394 unhold_lkb(lkb);
1395 }
1396
1397 static int msg_reply_type(int mstype)
1398 {
1399 switch (mstype) {
1400 case DLM_MSG_REQUEST:
1401 return DLM_MSG_REQUEST_REPLY;
1402 case DLM_MSG_CONVERT:
1403 return DLM_MSG_CONVERT_REPLY;
1404 case DLM_MSG_UNLOCK:
1405 return DLM_MSG_UNLOCK_REPLY;
1406 case DLM_MSG_CANCEL:
1407 return DLM_MSG_CANCEL_REPLY;
1408 case DLM_MSG_LOOKUP:
1409 return DLM_MSG_LOOKUP_REPLY;
1410 }
1411 return -1;
1412 }
1413
1414
1415
1416
1417 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1418 {
1419 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1420 int error = 0;
1421
1422 mutex_lock(&ls->ls_waiters_mutex);
1423
1424 if (is_overlap_unlock(lkb) ||
1425 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1426 error = -EINVAL;
1427 goto out;
1428 }
1429
1430 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1431 switch (mstype) {
1432 case DLM_MSG_UNLOCK:
1433 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1434 break;
1435 case DLM_MSG_CANCEL:
1436 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1437 break;
1438 default:
1439 error = -EBUSY;
1440 goto out;
1441 }
1442 lkb->lkb_wait_count++;
1443 hold_lkb(lkb);
1444
1445 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1446 lkb->lkb_id, lkb->lkb_wait_type, mstype,
1447 lkb->lkb_wait_count, lkb->lkb_flags);
1448 goto out;
1449 }
1450
1451 DLM_ASSERT(!lkb->lkb_wait_count,
1452 dlm_print_lkb(lkb);
1453 printk("wait_count %d\n", lkb->lkb_wait_count););
1454
1455 lkb->lkb_wait_count++;
1456 lkb->lkb_wait_type = mstype;
1457 lkb->lkb_wait_nodeid = to_nodeid;
1458 hold_lkb(lkb);
1459 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1460 out:
1461 if (error)
1462 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1463 lkb->lkb_id, error, lkb->lkb_flags, mstype,
1464 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1465 mutex_unlock(&ls->ls_waiters_mutex);
1466 return error;
1467 }
1468
1469
1470
1471
1472
1473
1474 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1475 struct dlm_message *ms)
1476 {
1477 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1478 int overlap_done = 0;
1479
1480 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1481 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1482 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1483 overlap_done = 1;
1484 goto out_del;
1485 }
1486
1487 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1488 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1489 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1490 overlap_done = 1;
1491 goto out_del;
1492 }
1493
1494
1495
1496
1497 if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1498 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1499 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1500 lkb->lkb_id, lkb->lkb_wait_type);
1501 return -1;
1502 }
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512 if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1513 (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1514 is_overlap_cancel(lkb) && ms && !ms->m_result) {
1515 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1516 lkb->lkb_id);
1517 lkb->lkb_wait_type = 0;
1518 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1519 lkb->lkb_wait_count--;
1520 unhold_lkb(lkb);
1521 goto out_del;
1522 }
1523
1524
1525
1526
1527 if (lkb->lkb_wait_type) {
1528 lkb->lkb_wait_type = 0;
1529 goto out_del;
1530 }
1531
1532 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1533 lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1534 lkb->lkb_remid, mstype, lkb->lkb_flags);
1535 return -1;
1536
1537 out_del:
1538
1539
1540
1541
1542
1543 if (overlap_done && lkb->lkb_wait_type) {
1544 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1545 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1546 lkb->lkb_wait_count--;
1547 unhold_lkb(lkb);
1548 lkb->lkb_wait_type = 0;
1549 }
1550
1551 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1552
1553 lkb->lkb_flags &= ~DLM_IFL_RESEND;
1554 lkb->lkb_wait_count--;
1555 if (!lkb->lkb_wait_count)
1556 list_del_init(&lkb->lkb_wait_reply);
1557 unhold_lkb(lkb);
1558 return 0;
1559 }
1560
1561 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1562 {
1563 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1564 int error;
1565
1566 mutex_lock(&ls->ls_waiters_mutex);
1567 error = _remove_from_waiters(lkb, mstype, NULL);
1568 mutex_unlock(&ls->ls_waiters_mutex);
1569 return error;
1570 }
1571
1572
1573
1574
1575 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1576 {
1577 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1578 int error;
1579
1580 if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS))
1581 mutex_lock(&ls->ls_waiters_mutex);
1582 error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1583 if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS))
1584 mutex_unlock(&ls->ls_waiters_mutex);
1585 return error;
1586 }
1587
1588
1589
1590
1591
1592 #define DLM_WAIT_PENDING_COND(ls, r) \
1593 (ls->ls_remove_len && \
1594 !rsb_cmp(r, ls->ls_remove_name, \
1595 ls->ls_remove_len))
1596
1597 static void wait_pending_remove(struct dlm_rsb *r)
1598 {
1599 struct dlm_ls *ls = r->res_ls;
1600 restart:
1601 spin_lock(&ls->ls_remove_spin);
1602 if (DLM_WAIT_PENDING_COND(ls, r)) {
1603 log_debug(ls, "delay lookup for remove dir %d %s",
1604 r->res_dir_nodeid, r->res_name);
1605 spin_unlock(&ls->ls_remove_spin);
1606 wait_event(ls->ls_remove_wait, !DLM_WAIT_PENDING_COND(ls, r));
1607 goto restart;
1608 }
1609 spin_unlock(&ls->ls_remove_spin);
1610 }
1611
1612
1613
1614
1615
1616
1617
1618
1619 static void shrink_bucket(struct dlm_ls *ls, int b)
1620 {
1621 struct rb_node *n, *next;
1622 struct dlm_rsb *r;
1623 char *name;
1624 int our_nodeid = dlm_our_nodeid();
1625 int remote_count = 0;
1626 int need_shrink = 0;
1627 int i, len, rv;
1628
1629 memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1630
1631 spin_lock(&ls->ls_rsbtbl[b].lock);
1632
1633 if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) {
1634 spin_unlock(&ls->ls_rsbtbl[b].lock);
1635 return;
1636 }
1637
1638 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1639 next = rb_next(n);
1640 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1641
1642
1643
1644
1645
1646
1647 if (!dlm_no_directory(ls) &&
1648 (r->res_master_nodeid != our_nodeid) &&
1649 (dlm_dir_nodeid(r) == our_nodeid)) {
1650 continue;
1651 }
1652
1653 need_shrink = 1;
1654
1655 if (!time_after_eq(jiffies, r->res_toss_time +
1656 dlm_config.ci_toss_secs * HZ)) {
1657 continue;
1658 }
1659
1660 if (!dlm_no_directory(ls) &&
1661 (r->res_master_nodeid == our_nodeid) &&
1662 (dlm_dir_nodeid(r) != our_nodeid)) {
1663
1664
1665
1666
1667
1668 ls->ls_remove_lens[remote_count] = r->res_length;
1669 memcpy(ls->ls_remove_names[remote_count], r->res_name,
1670 DLM_RESNAME_MAXLEN);
1671 remote_count++;
1672
1673 if (remote_count >= DLM_REMOVE_NAMES_MAX)
1674 break;
1675 continue;
1676 }
1677
1678 if (!kref_put(&r->res_ref, kill_rsb)) {
1679 log_error(ls, "tossed rsb in use %s", r->res_name);
1680 continue;
1681 }
1682
1683 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1684 dlm_free_rsb(r);
1685 }
1686
1687 if (need_shrink)
1688 ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK;
1689 else
1690 ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK;
1691 spin_unlock(&ls->ls_rsbtbl[b].lock);
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707 for (i = 0; i < remote_count; i++) {
1708 name = ls->ls_remove_names[i];
1709 len = ls->ls_remove_lens[i];
1710
1711 spin_lock(&ls->ls_rsbtbl[b].lock);
1712 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1713 if (rv) {
1714 spin_unlock(&ls->ls_rsbtbl[b].lock);
1715 log_debug(ls, "remove_name not toss %s", name);
1716 continue;
1717 }
1718
1719 if (r->res_master_nodeid != our_nodeid) {
1720 spin_unlock(&ls->ls_rsbtbl[b].lock);
1721 log_debug(ls, "remove_name master %d dir %d our %d %s",
1722 r->res_master_nodeid, r->res_dir_nodeid,
1723 our_nodeid, name);
1724 continue;
1725 }
1726
1727 if (r->res_dir_nodeid == our_nodeid) {
1728
1729 spin_unlock(&ls->ls_rsbtbl[b].lock);
1730 log_error(ls, "remove_name dir %d master %d our %d %s",
1731 r->res_dir_nodeid, r->res_master_nodeid,
1732 our_nodeid, name);
1733 continue;
1734 }
1735
1736 if (!time_after_eq(jiffies, r->res_toss_time +
1737 dlm_config.ci_toss_secs * HZ)) {
1738 spin_unlock(&ls->ls_rsbtbl[b].lock);
1739 log_debug(ls, "remove_name toss_time %lu now %lu %s",
1740 r->res_toss_time, jiffies, name);
1741 continue;
1742 }
1743
1744 if (!kref_put(&r->res_ref, kill_rsb)) {
1745 spin_unlock(&ls->ls_rsbtbl[b].lock);
1746 log_error(ls, "remove_name in use %s", name);
1747 continue;
1748 }
1749
1750 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1751
1752
1753 spin_lock(&ls->ls_remove_spin);
1754 ls->ls_remove_len = len;
1755 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1756 spin_unlock(&ls->ls_remove_spin);
1757 spin_unlock(&ls->ls_rsbtbl[b].lock);
1758
1759 send_remove(r);
1760
1761
1762 spin_lock(&ls->ls_remove_spin);
1763 ls->ls_remove_len = 0;
1764 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1765 spin_unlock(&ls->ls_remove_spin);
1766 wake_up(&ls->ls_remove_wait);
1767
1768 dlm_free_rsb(r);
1769 }
1770 }
1771
1772 void dlm_scan_rsbs(struct dlm_ls *ls)
1773 {
1774 int i;
1775
1776 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1777 shrink_bucket(ls, i);
1778 if (dlm_locking_stopped(ls))
1779 break;
1780 cond_resched();
1781 }
1782 }
1783
1784 #ifdef CONFIG_DLM_DEPRECATED_API
1785 static void add_timeout(struct dlm_lkb *lkb)
1786 {
1787 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1788
1789 if (is_master_copy(lkb))
1790 return;
1791
1792 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1793 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1794 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1795 goto add_it;
1796 }
1797 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1798 goto add_it;
1799 return;
1800
1801 add_it:
1802 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1803 mutex_lock(&ls->ls_timeout_mutex);
1804 hold_lkb(lkb);
1805 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1806 mutex_unlock(&ls->ls_timeout_mutex);
1807 }
1808
1809 static void del_timeout(struct dlm_lkb *lkb)
1810 {
1811 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1812
1813 mutex_lock(&ls->ls_timeout_mutex);
1814 if (!list_empty(&lkb->lkb_time_list)) {
1815 list_del_init(&lkb->lkb_time_list);
1816 unhold_lkb(lkb);
1817 }
1818 mutex_unlock(&ls->ls_timeout_mutex);
1819 }
1820
1821
1822
1823
1824
1825
1826
1827 void dlm_scan_timeout(struct dlm_ls *ls)
1828 {
1829 struct dlm_rsb *r;
1830 struct dlm_lkb *lkb = NULL, *iter;
1831 int do_cancel, do_warn;
1832 s64 wait_us;
1833
1834 for (;;) {
1835 if (dlm_locking_stopped(ls))
1836 break;
1837
1838 do_cancel = 0;
1839 do_warn = 0;
1840 mutex_lock(&ls->ls_timeout_mutex);
1841 list_for_each_entry(iter, &ls->ls_timeout, lkb_time_list) {
1842
1843 wait_us = ktime_to_us(ktime_sub(ktime_get(),
1844 iter->lkb_timestamp));
1845
1846 if ((iter->lkb_exflags & DLM_LKF_TIMEOUT) &&
1847 wait_us >= (iter->lkb_timeout_cs * 10000))
1848 do_cancel = 1;
1849
1850 if ((iter->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1851 wait_us >= dlm_config.ci_timewarn_cs * 10000)
1852 do_warn = 1;
1853
1854 if (!do_cancel && !do_warn)
1855 continue;
1856 hold_lkb(iter);
1857 lkb = iter;
1858 break;
1859 }
1860 mutex_unlock(&ls->ls_timeout_mutex);
1861
1862 if (!lkb)
1863 break;
1864
1865 r = lkb->lkb_resource;
1866 hold_rsb(r);
1867 lock_rsb(r);
1868
1869 if (do_warn) {
1870
1871 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1872 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1873 del_timeout(lkb);
1874 dlm_timeout_warn(lkb);
1875 }
1876
1877 if (do_cancel) {
1878 log_debug(ls, "timeout cancel %x node %d %s",
1879 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1880 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1881 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1882 del_timeout(lkb);
1883 _cancel_lock(r, lkb);
1884 }
1885
1886 unlock_rsb(r);
1887 unhold_rsb(r);
1888 dlm_put_lkb(lkb);
1889 }
1890 }
1891
1892
1893
1894
1895 void dlm_adjust_timeouts(struct dlm_ls *ls)
1896 {
1897 struct dlm_lkb *lkb;
1898 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1899
1900 ls->ls_recover_begin = 0;
1901 mutex_lock(&ls->ls_timeout_mutex);
1902 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1903 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1904 mutex_unlock(&ls->ls_timeout_mutex);
1905 }
1906 #else
1907 static void add_timeout(struct dlm_lkb *lkb) { }
1908 static void del_timeout(struct dlm_lkb *lkb) { }
1909 #endif
1910
1911
1912
1913 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1914 {
1915 int b, len = r->res_ls->ls_lvblen;
1916
1917
1918
1919
1920
1921 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1922
1923 if (b == 1) {
1924 if (!lkb->lkb_lvbptr)
1925 return;
1926
1927 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1928 return;
1929
1930 if (!r->res_lvbptr)
1931 return;
1932
1933 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1934 lkb->lkb_lvbseq = r->res_lvbseq;
1935
1936 } else if (b == 0) {
1937 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1938 rsb_set_flag(r, RSB_VALNOTVALID);
1939 return;
1940 }
1941
1942 if (!lkb->lkb_lvbptr)
1943 return;
1944
1945 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1946 return;
1947
1948 if (!r->res_lvbptr)
1949 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1950
1951 if (!r->res_lvbptr)
1952 return;
1953
1954 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1955 r->res_lvbseq++;
1956 lkb->lkb_lvbseq = r->res_lvbseq;
1957 rsb_clear_flag(r, RSB_VALNOTVALID);
1958 }
1959
1960 if (rsb_flag(r, RSB_VALNOTVALID))
1961 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1962 }
1963
1964 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1965 {
1966 if (lkb->lkb_grmode < DLM_LOCK_PW)
1967 return;
1968
1969 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1970 rsb_set_flag(r, RSB_VALNOTVALID);
1971 return;
1972 }
1973
1974 if (!lkb->lkb_lvbptr)
1975 return;
1976
1977 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1978 return;
1979
1980 if (!r->res_lvbptr)
1981 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1982
1983 if (!r->res_lvbptr)
1984 return;
1985
1986 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1987 r->res_lvbseq++;
1988 rsb_clear_flag(r, RSB_VALNOTVALID);
1989 }
1990
1991
1992
1993 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1994 struct dlm_message *ms)
1995 {
1996 int b;
1997
1998 if (!lkb->lkb_lvbptr)
1999 return;
2000
2001 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2002 return;
2003
2004 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2005 if (b == 1) {
2006 int len = receive_extralen(ms);
2007 if (len > r->res_ls->ls_lvblen)
2008 len = r->res_ls->ls_lvblen;
2009 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2010 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
2011 }
2012 }
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2025 {
2026 del_lkb(r, lkb);
2027 lkb->lkb_grmode = DLM_LOCK_IV;
2028
2029
2030 unhold_lkb(lkb);
2031 }
2032
2033 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2034 {
2035 set_lvb_unlock(r, lkb);
2036 _remove_lock(r, lkb);
2037 }
2038
2039 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2040 {
2041 _remove_lock(r, lkb);
2042 }
2043
2044
2045
2046
2047
2048 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2049 {
2050 int rv = 0;
2051
2052 lkb->lkb_rqmode = DLM_LOCK_IV;
2053
2054 switch (lkb->lkb_status) {
2055 case DLM_LKSTS_GRANTED:
2056 break;
2057 case DLM_LKSTS_CONVERT:
2058 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2059 rv = 1;
2060 break;
2061 case DLM_LKSTS_WAITING:
2062 del_lkb(r, lkb);
2063 lkb->lkb_grmode = DLM_LOCK_IV;
2064
2065
2066 unhold_lkb(lkb);
2067 rv = -1;
2068 break;
2069 default:
2070 log_print("invalid status for revert %d", lkb->lkb_status);
2071 }
2072 return rv;
2073 }
2074
2075 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2076 {
2077 return revert_lock(r, lkb);
2078 }
2079
2080 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2081 {
2082 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2083 lkb->lkb_grmode = lkb->lkb_rqmode;
2084 if (lkb->lkb_status)
2085 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2086 else
2087 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2088 }
2089
2090 lkb->lkb_rqmode = DLM_LOCK_IV;
2091 lkb->lkb_highbast = 0;
2092 }
2093
2094 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2095 {
2096 set_lvb_lock(r, lkb);
2097 _grant_lock(r, lkb);
2098 }
2099
2100 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2101 struct dlm_message *ms)
2102 {
2103 set_lvb_lock_pc(r, lkb, ms);
2104 _grant_lock(r, lkb);
2105 }
2106
2107
2108
2109
2110
2111 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2112 {
2113 grant_lock(r, lkb);
2114 if (is_master_copy(lkb))
2115 send_grant(r, lkb);
2116 else
2117 queue_cast(r, lkb, 0);
2118 }
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128 static void munge_demoted(struct dlm_lkb *lkb)
2129 {
2130 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2131 log_print("munge_demoted %x invalid modes gr %d rq %d",
2132 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2133 return;
2134 }
2135
2136 lkb->lkb_grmode = DLM_LOCK_NL;
2137 }
2138
2139 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2140 {
2141 if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2142 ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2143 log_print("munge_altmode %x invalid reply type %d",
2144 lkb->lkb_id, le32_to_cpu(ms->m_type));
2145 return;
2146 }
2147
2148 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2149 lkb->lkb_rqmode = DLM_LOCK_PR;
2150 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2151 lkb->lkb_rqmode = DLM_LOCK_CW;
2152 else {
2153 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2154 dlm_print_lkb(lkb);
2155 }
2156 }
2157
2158 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2159 {
2160 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2161 lkb_statequeue);
2162 if (lkb->lkb_id == first->lkb_id)
2163 return 1;
2164
2165 return 0;
2166 }
2167
2168
2169
2170 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2171 {
2172 struct dlm_lkb *this;
2173
2174 list_for_each_entry(this, head, lkb_statequeue) {
2175 if (this == lkb)
2176 continue;
2177 if (!modes_compat(this, lkb))
2178 return 1;
2179 }
2180 return 0;
2181 }
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2225 {
2226 struct dlm_lkb *lkb1;
2227 int lkb_is_ahead = 0;
2228
2229 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2230 if (lkb1 == lkb2) {
2231 lkb_is_ahead = 1;
2232 continue;
2233 }
2234
2235 if (!lkb_is_ahead) {
2236 if (!modes_compat(lkb2, lkb1))
2237 return 1;
2238 } else {
2239 if (!modes_compat(lkb2, lkb1) &&
2240 !modes_compat(lkb1, lkb2))
2241 return 1;
2242 }
2243 }
2244 return 0;
2245 }
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2264 int recover)
2265 {
2266 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2288 return 1;
2289
2290
2291
2292
2293
2294
2295 if (queue_conflict(&r->res_grantqueue, lkb))
2296 return 0;
2297
2298
2299
2300
2301
2302
2303
2304 if (queue_conflict(&r->res_convertqueue, lkb))
2305 return 0;
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321 if (conv && recover)
2322 return 1;
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2348 return 1;
2349
2350
2351
2352
2353
2354
2355 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2356 if (list_empty(&r->res_convertqueue))
2357 return 1;
2358 else
2359 return 0;
2360 }
2361
2362
2363
2364
2365
2366
2367 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2368 return 1;
2369
2370
2371
2372
2373
2374
2375
2376 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2377 return 1;
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390 if (now && !conv && list_empty(&r->res_convertqueue) &&
2391 list_empty(&r->res_waitqueue))
2392 return 1;
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402 if (!now && !conv && list_empty(&r->res_convertqueue) &&
2403 first_in_list(lkb, &r->res_waitqueue))
2404 return 1;
2405
2406 return 0;
2407 }
2408
2409 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2410 int recover, int *err)
2411 {
2412 int rv;
2413 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2414 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2415
2416 if (err)
2417 *err = 0;
2418
2419 rv = _can_be_granted(r, lkb, now, recover);
2420 if (rv)
2421 goto out;
2422
2423
2424
2425
2426
2427
2428
2429 if (is_convert && can_be_queued(lkb) &&
2430 conversion_deadlock_detect(r, lkb)) {
2431 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2432 lkb->lkb_grmode = DLM_LOCK_NL;
2433 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2434 } else if (err) {
2435 *err = -EDEADLK;
2436 } else {
2437 log_print("can_be_granted deadlock %x now %d",
2438 lkb->lkb_id, now);
2439 dlm_dump_rsb(r);
2440 }
2441 goto out;
2442 }
2443
2444
2445
2446
2447
2448
2449
2450
2451 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2452 alt = DLM_LOCK_PR;
2453 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2454 alt = DLM_LOCK_CW;
2455
2456 if (alt) {
2457 lkb->lkb_rqmode = alt;
2458 rv = _can_be_granted(r, lkb, now, 0);
2459 if (rv)
2460 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2461 else
2462 lkb->lkb_rqmode = rqmode;
2463 }
2464 out:
2465 return rv;
2466 }
2467
2468
2469
2470
2471 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2472 unsigned int *count)
2473 {
2474 struct dlm_lkb *lkb, *s;
2475 int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2476 int hi, demoted, quit, grant_restart, demote_restart;
2477 int deadlk;
2478
2479 quit = 0;
2480 restart:
2481 grant_restart = 0;
2482 demote_restart = 0;
2483 hi = DLM_LOCK_IV;
2484
2485 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2486 demoted = is_demoted(lkb);
2487 deadlk = 0;
2488
2489 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2490 grant_lock_pending(r, lkb);
2491 grant_restart = 1;
2492 if (count)
2493 (*count)++;
2494 continue;
2495 }
2496
2497 if (!demoted && is_demoted(lkb)) {
2498 log_print("WARN: pending demoted %x node %d %s",
2499 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2500 demote_restart = 1;
2501 continue;
2502 }
2503
2504 if (deadlk) {
2505
2506
2507
2508
2509
2510 if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2511 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2512 queue_bast(r, lkb, lkb->lkb_rqmode);
2513 lkb->lkb_highbast = lkb->lkb_rqmode;
2514 }
2515 } else {
2516 log_print("WARN: pending deadlock %x node %d %s",
2517 lkb->lkb_id, lkb->lkb_nodeid,
2518 r->res_name);
2519 dlm_dump_rsb(r);
2520 }
2521 continue;
2522 }
2523
2524 hi = max_t(int, lkb->lkb_rqmode, hi);
2525
2526 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2527 *cw = 1;
2528 }
2529
2530 if (grant_restart)
2531 goto restart;
2532 if (demote_restart && !quit) {
2533 quit = 1;
2534 goto restart;
2535 }
2536
2537 return max_t(int, high, hi);
2538 }
2539
2540 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2541 unsigned int *count)
2542 {
2543 struct dlm_lkb *lkb, *s;
2544
2545 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2546 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2547 grant_lock_pending(r, lkb);
2548 if (count)
2549 (*count)++;
2550 } else {
2551 high = max_t(int, lkb->lkb_rqmode, high);
2552 if (lkb->lkb_rqmode == DLM_LOCK_CW)
2553 *cw = 1;
2554 }
2555 }
2556
2557 return high;
2558 }
2559
2560
2561
2562
2563
2564
2565 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2566 {
2567 if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2568 if (gr->lkb_highbast < DLM_LOCK_EX)
2569 return 1;
2570 return 0;
2571 }
2572
2573 if (gr->lkb_highbast < high &&
2574 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2575 return 1;
2576 return 0;
2577 }
2578
2579 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2580 {
2581 struct dlm_lkb *lkb, *s;
2582 int high = DLM_LOCK_IV;
2583 int cw = 0;
2584
2585 if (!is_master(r)) {
2586 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2587 dlm_dump_rsb(r);
2588 return;
2589 }
2590
2591 high = grant_pending_convert(r, high, &cw, count);
2592 high = grant_pending_wait(r, high, &cw, count);
2593
2594 if (high == DLM_LOCK_IV)
2595 return;
2596
2597
2598
2599
2600
2601
2602
2603 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2604 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2605 if (cw && high == DLM_LOCK_PR &&
2606 lkb->lkb_grmode == DLM_LOCK_PR)
2607 queue_bast(r, lkb, DLM_LOCK_CW);
2608 else
2609 queue_bast(r, lkb, high);
2610 lkb->lkb_highbast = high;
2611 }
2612 }
2613 }
2614
2615 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2616 {
2617 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2618 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2619 if (gr->lkb_highbast < DLM_LOCK_EX)
2620 return 1;
2621 return 0;
2622 }
2623
2624 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2625 return 1;
2626 return 0;
2627 }
2628
2629 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2630 struct dlm_lkb *lkb)
2631 {
2632 struct dlm_lkb *gr;
2633
2634 list_for_each_entry(gr, head, lkb_statequeue) {
2635
2636 if (gr == lkb)
2637 continue;
2638 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2639 queue_bast(r, gr, lkb->lkb_rqmode);
2640 gr->lkb_highbast = lkb->lkb_rqmode;
2641 }
2642 }
2643 }
2644
2645 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2646 {
2647 send_bast_queue(r, &r->res_grantqueue, lkb);
2648 }
2649
2650 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2651 {
2652 send_bast_queue(r, &r->res_grantqueue, lkb);
2653 send_bast_queue(r, &r->res_convertqueue, lkb);
2654 }
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2676 {
2677 int our_nodeid = dlm_our_nodeid();
2678
2679 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2680 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2681 r->res_first_lkid = lkb->lkb_id;
2682 lkb->lkb_nodeid = r->res_nodeid;
2683 return 0;
2684 }
2685
2686 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2687 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2688 return 1;
2689 }
2690
2691 if (r->res_master_nodeid == our_nodeid) {
2692 lkb->lkb_nodeid = 0;
2693 return 0;
2694 }
2695
2696 if (r->res_master_nodeid) {
2697 lkb->lkb_nodeid = r->res_master_nodeid;
2698 return 0;
2699 }
2700
2701 if (dlm_dir_nodeid(r) == our_nodeid) {
2702
2703
2704
2705
2706
2707
2708 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2709 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2710 r->res_name);
2711 r->res_master_nodeid = our_nodeid;
2712 r->res_nodeid = 0;
2713 lkb->lkb_nodeid = 0;
2714 return 0;
2715 }
2716
2717 wait_pending_remove(r);
2718
2719 r->res_first_lkid = lkb->lkb_id;
2720 send_lookup(r, lkb);
2721 return 1;
2722 }
2723
2724 static void process_lookup_list(struct dlm_rsb *r)
2725 {
2726 struct dlm_lkb *lkb, *safe;
2727
2728 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2729 list_del_init(&lkb->lkb_rsb_lookup);
2730 _request_lock(r, lkb);
2731 schedule();
2732 }
2733 }
2734
2735
2736
2737 static void confirm_master(struct dlm_rsb *r, int error)
2738 {
2739 struct dlm_lkb *lkb;
2740
2741 if (!r->res_first_lkid)
2742 return;
2743
2744 switch (error) {
2745 case 0:
2746 case -EINPROGRESS:
2747 r->res_first_lkid = 0;
2748 process_lookup_list(r);
2749 break;
2750
2751 case -EAGAIN:
2752 case -EBADR:
2753 case -ENOTBLK:
2754
2755
2756
2757
2758 r->res_first_lkid = 0;
2759
2760 if (!list_empty(&r->res_lookup)) {
2761 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2762 lkb_rsb_lookup);
2763 list_del_init(&lkb->lkb_rsb_lookup);
2764 r->res_first_lkid = lkb->lkb_id;
2765 _request_lock(r, lkb);
2766 }
2767 break;
2768
2769 default:
2770 log_error(r->res_ls, "confirm_master unknown error %d", error);
2771 }
2772 }
2773
2774 #ifdef CONFIG_DLM_DEPRECATED_API
2775 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2776 int namelen, unsigned long timeout_cs,
2777 void (*ast) (void *astparam),
2778 void *astparam,
2779 void (*bast) (void *astparam, int mode),
2780 struct dlm_args *args)
2781 #else
2782 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2783 int namelen, void (*ast)(void *astparam),
2784 void *astparam,
2785 void (*bast)(void *astparam, int mode),
2786 struct dlm_args *args)
2787 #endif
2788 {
2789 int rv = -EINVAL;
2790
2791
2792
2793 if (mode < 0 || mode > DLM_LOCK_EX)
2794 goto out;
2795
2796 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2797 goto out;
2798
2799 if (flags & DLM_LKF_CANCEL)
2800 goto out;
2801
2802 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2803 goto out;
2804
2805 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2806 goto out;
2807
2808 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2809 goto out;
2810
2811 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2812 goto out;
2813
2814 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2815 goto out;
2816
2817 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2818 goto out;
2819
2820 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2821 goto out;
2822
2823 if (!ast || !lksb)
2824 goto out;
2825
2826 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2827 goto out;
2828
2829 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2830 goto out;
2831
2832
2833
2834
2835
2836 args->flags = flags;
2837 args->astfn = ast;
2838 args->astparam = astparam;
2839 args->bastfn = bast;
2840 #ifdef CONFIG_DLM_DEPRECATED_API
2841 args->timeout = timeout_cs;
2842 #endif
2843 args->mode = mode;
2844 args->lksb = lksb;
2845 rv = 0;
2846 out:
2847 return rv;
2848 }
2849
2850 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2851 {
2852 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2853 DLM_LKF_FORCEUNLOCK))
2854 return -EINVAL;
2855
2856 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2857 return -EINVAL;
2858
2859 args->flags = flags;
2860 args->astparam = astarg;
2861 return 0;
2862 }
2863
2864 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2865 struct dlm_args *args)
2866 {
2867 int rv = -EINVAL;
2868
2869 if (args->flags & DLM_LKF_CONVERT) {
2870 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2871 goto out;
2872
2873 if (args->flags & DLM_LKF_QUECVT &&
2874 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2875 goto out;
2876
2877 rv = -EBUSY;
2878 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2879 goto out;
2880
2881
2882 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2883 goto out;
2884
2885 if (is_overlap(lkb))
2886 goto out;
2887 }
2888
2889 lkb->lkb_exflags = args->flags;
2890 lkb->lkb_sbflags = 0;
2891 lkb->lkb_astfn = args->astfn;
2892 lkb->lkb_astparam = args->astparam;
2893 lkb->lkb_bastfn = args->bastfn;
2894 lkb->lkb_rqmode = args->mode;
2895 lkb->lkb_lksb = args->lksb;
2896 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2897 lkb->lkb_ownpid = (int) current->pid;
2898 #ifdef CONFIG_DLM_DEPRECATED_API
2899 lkb->lkb_timeout_cs = args->timeout;
2900 #endif
2901 rv = 0;
2902 out:
2903 if (rv)
2904 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2905 rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2906 lkb->lkb_status, lkb->lkb_wait_type,
2907 lkb->lkb_resource->res_name);
2908 return rv;
2909 }
2910
2911
2912
2913
2914
2915
2916
2917
2918 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2919 {
2920 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2921 int rv = -EINVAL;
2922
2923 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2924 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2925 dlm_print_lkb(lkb);
2926 goto out;
2927 }
2928
2929
2930
2931
2932
2933 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2934 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2935 rv = -ENOENT;
2936 goto out;
2937 }
2938
2939
2940
2941
2942 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2943 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2944 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2945 list_del_init(&lkb->lkb_rsb_lookup);
2946 queue_cast(lkb->lkb_resource, lkb,
2947 args->flags & DLM_LKF_CANCEL ?
2948 -DLM_ECANCEL : -DLM_EUNLOCK);
2949 unhold_lkb(lkb);
2950 }
2951
2952 rv = -EBUSY;
2953 goto out;
2954 }
2955
2956
2957
2958 if (args->flags & DLM_LKF_CANCEL) {
2959 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2960 goto out;
2961
2962 if (is_overlap(lkb))
2963 goto out;
2964
2965
2966 del_timeout(lkb);
2967
2968 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2969 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2970 rv = -EBUSY;
2971 goto out;
2972 }
2973
2974
2975 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2976 !lkb->lkb_wait_type) {
2977 rv = -EBUSY;
2978 goto out;
2979 }
2980
2981 switch (lkb->lkb_wait_type) {
2982 case DLM_MSG_LOOKUP:
2983 case DLM_MSG_REQUEST:
2984 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2985 rv = -EBUSY;
2986 goto out;
2987 case DLM_MSG_UNLOCK:
2988 case DLM_MSG_CANCEL:
2989 goto out;
2990 }
2991
2992 goto out_ok;
2993 }
2994
2995
2996
2997
2998
2999 if (args->flags & DLM_LKF_FORCEUNLOCK) {
3000 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3001 goto out;
3002
3003 if (is_overlap_unlock(lkb))
3004 goto out;
3005
3006
3007 del_timeout(lkb);
3008
3009 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3010 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3011 rv = -EBUSY;
3012 goto out;
3013 }
3014
3015 switch (lkb->lkb_wait_type) {
3016 case DLM_MSG_LOOKUP:
3017 case DLM_MSG_REQUEST:
3018 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3019 rv = -EBUSY;
3020 goto out;
3021 case DLM_MSG_UNLOCK:
3022 goto out;
3023 }
3024
3025 goto out_ok;
3026 }
3027
3028
3029 rv = -EBUSY;
3030 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3031 goto out;
3032
3033 out_ok:
3034
3035 lkb->lkb_exflags |= args->flags;
3036 lkb->lkb_sbflags = 0;
3037 lkb->lkb_astparam = args->astparam;
3038 rv = 0;
3039 out:
3040 if (rv)
3041 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3042 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3043 args->flags, lkb->lkb_wait_type,
3044 lkb->lkb_resource->res_name);
3045 return rv;
3046 }
3047
3048
3049
3050
3051
3052
3053
3054
3055 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3056 {
3057 int error = 0;
3058
3059 if (can_be_granted(r, lkb, 1, 0, NULL)) {
3060 grant_lock(r, lkb);
3061 queue_cast(r, lkb, 0);
3062 goto out;
3063 }
3064
3065 if (can_be_queued(lkb)) {
3066 error = -EINPROGRESS;
3067 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3068 add_timeout(lkb);
3069 goto out;
3070 }
3071
3072 error = -EAGAIN;
3073 queue_cast(r, lkb, -EAGAIN);
3074 out:
3075 return error;
3076 }
3077
3078 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3079 int error)
3080 {
3081 switch (error) {
3082 case -EAGAIN:
3083 if (force_blocking_asts(lkb))
3084 send_blocking_asts_all(r, lkb);
3085 break;
3086 case -EINPROGRESS:
3087 send_blocking_asts(r, lkb);
3088 break;
3089 }
3090 }
3091
3092 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3093 {
3094 int error = 0;
3095 int deadlk = 0;
3096
3097
3098
3099 if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3100 grant_lock(r, lkb);
3101 queue_cast(r, lkb, 0);
3102 goto out;
3103 }
3104
3105
3106
3107
3108
3109 if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3110
3111 revert_lock(r, lkb);
3112 queue_cast(r, lkb, -EDEADLK);
3113 error = -EDEADLK;
3114 goto out;
3115 }
3116
3117
3118
3119
3120
3121
3122
3123 if (is_demoted(lkb)) {
3124 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3125 if (_can_be_granted(r, lkb, 1, 0)) {
3126 grant_lock(r, lkb);
3127 queue_cast(r, lkb, 0);
3128 goto out;
3129 }
3130
3131 }
3132
3133 if (can_be_queued(lkb)) {
3134 error = -EINPROGRESS;
3135 del_lkb(r, lkb);
3136 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3137 add_timeout(lkb);
3138 goto out;
3139 }
3140
3141 error = -EAGAIN;
3142 queue_cast(r, lkb, -EAGAIN);
3143 out:
3144 return error;
3145 }
3146
3147 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3148 int error)
3149 {
3150 switch (error) {
3151 case 0:
3152 grant_pending_locks(r, NULL);
3153
3154 break;
3155 case -EAGAIN:
3156 if (force_blocking_asts(lkb))
3157 send_blocking_asts_all(r, lkb);
3158 break;
3159 case -EINPROGRESS:
3160 send_blocking_asts(r, lkb);
3161 break;
3162 }
3163 }
3164
3165 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3166 {
3167 remove_lock(r, lkb);
3168 queue_cast(r, lkb, -DLM_EUNLOCK);
3169 return -DLM_EUNLOCK;
3170 }
3171
3172 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3173 int error)
3174 {
3175 grant_pending_locks(r, NULL);
3176 }
3177
3178
3179
3180 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3181 {
3182 int error;
3183
3184 error = revert_lock(r, lkb);
3185 if (error) {
3186 queue_cast(r, lkb, -DLM_ECANCEL);
3187 return -DLM_ECANCEL;
3188 }
3189 return 0;
3190 }
3191
3192 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3193 int error)
3194 {
3195 if (error)
3196 grant_pending_locks(r, NULL);
3197 }
3198
3199
3200
3201
3202
3203
3204
3205
3206 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3207 {
3208 int error;
3209
3210
3211
3212 error = set_master(r, lkb);
3213 if (error < 0)
3214 goto out;
3215 if (error) {
3216 error = 0;
3217 goto out;
3218 }
3219
3220 if (is_remote(r)) {
3221
3222 error = send_request(r, lkb);
3223 } else {
3224 error = do_request(r, lkb);
3225
3226
3227 do_request_effects(r, lkb, error);
3228 }
3229 out:
3230 return error;
3231 }
3232
3233
3234
3235 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3236 {
3237 int error;
3238
3239 if (is_remote(r)) {
3240
3241 error = send_convert(r, lkb);
3242 } else {
3243 error = do_convert(r, lkb);
3244
3245
3246 do_convert_effects(r, lkb, error);
3247 }
3248
3249 return error;
3250 }
3251
3252
3253
3254 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3255 {
3256 int error;
3257
3258 if (is_remote(r)) {
3259
3260 error = send_unlock(r, lkb);
3261 } else {
3262 error = do_unlock(r, lkb);
3263
3264
3265 do_unlock_effects(r, lkb, error);
3266 }
3267
3268 return error;
3269 }
3270
3271
3272
3273 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3274 {
3275 int error;
3276
3277 if (is_remote(r)) {
3278
3279 error = send_cancel(r, lkb);
3280 } else {
3281 error = do_cancel(r, lkb);
3282
3283
3284 do_cancel_effects(r, lkb, error);
3285 }
3286
3287 return error;
3288 }
3289
3290
3291
3292
3293
3294
3295 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3296 int len, struct dlm_args *args)
3297 {
3298 struct dlm_rsb *r;
3299 int error;
3300
3301 error = validate_lock_args(ls, lkb, args);
3302 if (error)
3303 return error;
3304
3305 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3306 if (error)
3307 return error;
3308
3309 lock_rsb(r);
3310
3311 attach_lkb(r, lkb);
3312 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3313
3314 error = _request_lock(r, lkb);
3315
3316 unlock_rsb(r);
3317 put_rsb(r);
3318 return error;
3319 }
3320
3321 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3322 struct dlm_args *args)
3323 {
3324 struct dlm_rsb *r;
3325 int error;
3326
3327 r = lkb->lkb_resource;
3328
3329 hold_rsb(r);
3330 lock_rsb(r);
3331
3332 error = validate_lock_args(ls, lkb, args);
3333 if (error)
3334 goto out;
3335
3336 error = _convert_lock(r, lkb);
3337 out:
3338 unlock_rsb(r);
3339 put_rsb(r);
3340 return error;
3341 }
3342
3343 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3344 struct dlm_args *args)
3345 {
3346 struct dlm_rsb *r;
3347 int error;
3348
3349 r = lkb->lkb_resource;
3350
3351 hold_rsb(r);
3352 lock_rsb(r);
3353
3354 error = validate_unlock_args(lkb, args);
3355 if (error)
3356 goto out;
3357
3358 error = _unlock_lock(r, lkb);
3359 out:
3360 unlock_rsb(r);
3361 put_rsb(r);
3362 return error;
3363 }
3364
3365 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3366 struct dlm_args *args)
3367 {
3368 struct dlm_rsb *r;
3369 int error;
3370
3371 r = lkb->lkb_resource;
3372
3373 hold_rsb(r);
3374 lock_rsb(r);
3375
3376 error = validate_unlock_args(lkb, args);
3377 if (error)
3378 goto out;
3379
3380 error = _cancel_lock(r, lkb);
3381 out:
3382 unlock_rsb(r);
3383 put_rsb(r);
3384 return error;
3385 }
3386
3387
3388
3389
3390
3391 int dlm_lock(dlm_lockspace_t *lockspace,
3392 int mode,
3393 struct dlm_lksb *lksb,
3394 uint32_t flags,
3395 void *name,
3396 unsigned int namelen,
3397 uint32_t parent_lkid,
3398 void (*ast) (void *astarg),
3399 void *astarg,
3400 void (*bast) (void *astarg, int mode))
3401 {
3402 struct dlm_ls *ls;
3403 struct dlm_lkb *lkb;
3404 struct dlm_args args;
3405 int error, convert = flags & DLM_LKF_CONVERT;
3406
3407 ls = dlm_find_lockspace_local(lockspace);
3408 if (!ls)
3409 return -EINVAL;
3410
3411 dlm_lock_recovery(ls);
3412
3413 if (convert)
3414 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3415 else
3416 error = create_lkb(ls, &lkb);
3417
3418 if (error)
3419 goto out;
3420
3421 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3422
3423 #ifdef CONFIG_DLM_DEPRECATED_API
3424 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3425 astarg, bast, &args);
3426 #else
3427 error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3428 &args);
3429 #endif
3430 if (error)
3431 goto out_put;
3432
3433 if (convert)
3434 error = convert_lock(ls, lkb, &args);
3435 else
3436 error = request_lock(ls, lkb, name, namelen, &args);
3437
3438 if (error == -EINPROGRESS)
3439 error = 0;
3440 out_put:
3441 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error);
3442
3443 if (convert || error)
3444 __put_lkb(ls, lkb);
3445 if (error == -EAGAIN || error == -EDEADLK)
3446 error = 0;
3447 out:
3448 dlm_unlock_recovery(ls);
3449 dlm_put_lockspace(ls);
3450 return error;
3451 }
3452
3453 int dlm_unlock(dlm_lockspace_t *lockspace,
3454 uint32_t lkid,
3455 uint32_t flags,
3456 struct dlm_lksb *lksb,
3457 void *astarg)
3458 {
3459 struct dlm_ls *ls;
3460 struct dlm_lkb *lkb;
3461 struct dlm_args args;
3462 int error;
3463
3464 ls = dlm_find_lockspace_local(lockspace);
3465 if (!ls)
3466 return -EINVAL;
3467
3468 dlm_lock_recovery(ls);
3469
3470 error = find_lkb(ls, lkid, &lkb);
3471 if (error)
3472 goto out;
3473
3474 trace_dlm_unlock_start(ls, lkb, flags);
3475
3476 error = set_unlock_args(flags, astarg, &args);
3477 if (error)
3478 goto out_put;
3479
3480 if (flags & DLM_LKF_CANCEL)
3481 error = cancel_lock(ls, lkb, &args);
3482 else
3483 error = unlock_lock(ls, lkb, &args);
3484
3485 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3486 error = 0;
3487 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3488 error = 0;
3489 out_put:
3490 trace_dlm_unlock_end(ls, lkb, flags, error);
3491
3492 dlm_put_lkb(lkb);
3493 out:
3494 dlm_unlock_recovery(ls);
3495 dlm_put_lockspace(ls);
3496 return error;
3497 }
3498
3499
3500
3501
3502
3503
3504
3505
3506
3507
3508
3509
3510
3511
3512
3513
3514
3515
3516
3517
3518
3519
3520
3521 static int _create_message(struct dlm_ls *ls, int mb_len,
3522 int to_nodeid, int mstype,
3523 struct dlm_message **ms_ret,
3524 struct dlm_mhandle **mh_ret)
3525 {
3526 struct dlm_message *ms;
3527 struct dlm_mhandle *mh;
3528 char *mb;
3529
3530
3531
3532
3533
3534 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
3535 if (!mh)
3536 return -ENOBUFS;
3537
3538 ms = (struct dlm_message *) mb;
3539
3540 ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3541 ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3542 ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3543 ms->m_header.h_length = cpu_to_le16(mb_len);
3544 ms->m_header.h_cmd = DLM_MSG;
3545
3546 ms->m_type = cpu_to_le32(mstype);
3547
3548 *mh_ret = mh;
3549 *ms_ret = ms;
3550 return 0;
3551 }
3552
3553 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3554 int to_nodeid, int mstype,
3555 struct dlm_message **ms_ret,
3556 struct dlm_mhandle **mh_ret)
3557 {
3558 int mb_len = sizeof(struct dlm_message);
3559
3560 switch (mstype) {
3561 case DLM_MSG_REQUEST:
3562 case DLM_MSG_LOOKUP:
3563 case DLM_MSG_REMOVE:
3564 mb_len += r->res_length;
3565 break;
3566 case DLM_MSG_CONVERT:
3567 case DLM_MSG_UNLOCK:
3568 case DLM_MSG_REQUEST_REPLY:
3569 case DLM_MSG_CONVERT_REPLY:
3570 case DLM_MSG_GRANT:
3571 if (lkb && lkb->lkb_lvbptr)
3572 mb_len += r->res_ls->ls_lvblen;
3573 break;
3574 }
3575
3576 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3577 ms_ret, mh_ret);
3578 }
3579
3580
3581
3582
3583 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3584 {
3585 dlm_midcomms_commit_mhandle(mh);
3586 return 0;
3587 }
3588
3589 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3590 struct dlm_message *ms)
3591 {
3592 ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid);
3593 ms->m_pid = cpu_to_le32(lkb->lkb_ownpid);
3594 ms->m_lkid = cpu_to_le32(lkb->lkb_id);
3595 ms->m_remid = cpu_to_le32(lkb->lkb_remid);
3596 ms->m_exflags = cpu_to_le32(lkb->lkb_exflags);
3597 ms->m_sbflags = cpu_to_le32(lkb->lkb_sbflags);
3598 ms->m_flags = cpu_to_le32(lkb->lkb_flags);
3599 ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
3600 ms->m_status = cpu_to_le32(lkb->lkb_status);
3601 ms->m_grmode = cpu_to_le32(lkb->lkb_grmode);
3602 ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode);
3603 ms->m_hash = cpu_to_le32(r->res_hash);
3604
3605
3606
3607
3608 if (lkb->lkb_bastfn)
3609 ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3610 if (lkb->lkb_astfn)
3611 ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3612
3613
3614
3615
3616 switch (ms->m_type) {
3617 case cpu_to_le32(DLM_MSG_REQUEST):
3618 case cpu_to_le32(DLM_MSG_LOOKUP):
3619 memcpy(ms->m_extra, r->res_name, r->res_length);
3620 break;
3621 case cpu_to_le32(DLM_MSG_CONVERT):
3622 case cpu_to_le32(DLM_MSG_UNLOCK):
3623 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3624 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3625 case cpu_to_le32(DLM_MSG_GRANT):
3626 if (!lkb->lkb_lvbptr)
3627 break;
3628 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3629 break;
3630 }
3631 }
3632
3633 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3634 {
3635 struct dlm_message *ms;
3636 struct dlm_mhandle *mh;
3637 int to_nodeid, error;
3638
3639 to_nodeid = r->res_nodeid;
3640
3641 error = add_to_waiters(lkb, mstype, to_nodeid);
3642 if (error)
3643 return error;
3644
3645 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3646 if (error)
3647 goto fail;
3648
3649 send_args(r, lkb, ms);
3650
3651 error = send_message(mh, ms);
3652 if (error)
3653 goto fail;
3654 return 0;
3655
3656 fail:
3657 remove_from_waiters(lkb, msg_reply_type(mstype));
3658 return error;
3659 }
3660
3661 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3662 {
3663 return send_common(r, lkb, DLM_MSG_REQUEST);
3664 }
3665
3666 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3667 {
3668 int error;
3669
3670 error = send_common(r, lkb, DLM_MSG_CONVERT);
3671
3672
3673 if (!error && down_conversion(lkb)) {
3674 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3675 r->res_ls->ls_stub_ms.m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
3676 r->res_ls->ls_stub_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3677 r->res_ls->ls_stub_ms.m_result = 0;
3678 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3679 }
3680
3681 return error;
3682 }
3683
3684
3685
3686
3687
3688 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3689 {
3690 return send_common(r, lkb, DLM_MSG_UNLOCK);
3691 }
3692
3693 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3694 {
3695 return send_common(r, lkb, DLM_MSG_CANCEL);
3696 }
3697
3698 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3699 {
3700 struct dlm_message *ms;
3701 struct dlm_mhandle *mh;
3702 int to_nodeid, error;
3703
3704 to_nodeid = lkb->lkb_nodeid;
3705
3706 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3707 if (error)
3708 goto out;
3709
3710 send_args(r, lkb, ms);
3711
3712 ms->m_result = 0;
3713
3714 error = send_message(mh, ms);
3715 out:
3716 return error;
3717 }
3718
3719 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3720 {
3721 struct dlm_message *ms;
3722 struct dlm_mhandle *mh;
3723 int to_nodeid, error;
3724
3725 to_nodeid = lkb->lkb_nodeid;
3726
3727 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3728 if (error)
3729 goto out;
3730
3731 send_args(r, lkb, ms);
3732
3733 ms->m_bastmode = cpu_to_le32(mode);
3734
3735 error = send_message(mh, ms);
3736 out:
3737 return error;
3738 }
3739
3740 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3741 {
3742 struct dlm_message *ms;
3743 struct dlm_mhandle *mh;
3744 int to_nodeid, error;
3745
3746 to_nodeid = dlm_dir_nodeid(r);
3747
3748 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3749 if (error)
3750 return error;
3751
3752 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3753 if (error)
3754 goto fail;
3755
3756 send_args(r, lkb, ms);
3757
3758 error = send_message(mh, ms);
3759 if (error)
3760 goto fail;
3761 return 0;
3762
3763 fail:
3764 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3765 return error;
3766 }
3767
3768 static int send_remove(struct dlm_rsb *r)
3769 {
3770 struct dlm_message *ms;
3771 struct dlm_mhandle *mh;
3772 int to_nodeid, error;
3773
3774 to_nodeid = dlm_dir_nodeid(r);
3775
3776 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3777 if (error)
3778 goto out;
3779
3780 memcpy(ms->m_extra, r->res_name, r->res_length);
3781 ms->m_hash = cpu_to_le32(r->res_hash);
3782
3783 error = send_message(mh, ms);
3784 out:
3785 return error;
3786 }
3787
3788 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3789 int mstype, int rv)
3790 {
3791 struct dlm_message *ms;
3792 struct dlm_mhandle *mh;
3793 int to_nodeid, error;
3794
3795 to_nodeid = lkb->lkb_nodeid;
3796
3797 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3798 if (error)
3799 goto out;
3800
3801 send_args(r, lkb, ms);
3802
3803 ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3804
3805 error = send_message(mh, ms);
3806 out:
3807 return error;
3808 }
3809
3810 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3811 {
3812 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3813 }
3814
3815 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3816 {
3817 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3818 }
3819
3820 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3821 {
3822 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3823 }
3824
3825 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3826 {
3827 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3828 }
3829
3830 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3831 int ret_nodeid, int rv)
3832 {
3833 struct dlm_rsb *r = &ls->ls_stub_rsb;
3834 struct dlm_message *ms;
3835 struct dlm_mhandle *mh;
3836 int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3837
3838 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3839 if (error)
3840 goto out;
3841
3842 ms->m_lkid = ms_in->m_lkid;
3843 ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3844 ms->m_nodeid = cpu_to_le32(ret_nodeid);
3845
3846 error = send_message(mh, ms);
3847 out:
3848 return error;
3849 }
3850
3851
3852
3853
3854
3855 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3856 {
3857 lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3858 lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags);
3859 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3860 (le32_to_cpu(ms->m_flags) & 0x0000FFFF);
3861 }
3862
3863 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3864 {
3865 if (ms->m_flags == cpu_to_le32(DLM_IFL_STUB_MS))
3866 return;
3867
3868 lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags);
3869 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3870 (le32_to_cpu(ms->m_flags) & 0x0000FFFF);
3871 }
3872
3873 static int receive_extralen(struct dlm_message *ms)
3874 {
3875 return (le16_to_cpu(ms->m_header.h_length) -
3876 sizeof(struct dlm_message));
3877 }
3878
3879 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3880 struct dlm_message *ms)
3881 {
3882 int len;
3883
3884 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3885 if (!lkb->lkb_lvbptr)
3886 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3887 if (!lkb->lkb_lvbptr)
3888 return -ENOMEM;
3889 len = receive_extralen(ms);
3890 if (len > ls->ls_lvblen)
3891 len = ls->ls_lvblen;
3892 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3893 }
3894 return 0;
3895 }
3896
3897 static void fake_bastfn(void *astparam, int mode)
3898 {
3899 log_print("fake_bastfn should not be called");
3900 }
3901
3902 static void fake_astfn(void *astparam)
3903 {
3904 log_print("fake_astfn should not be called");
3905 }
3906
3907 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3908 struct dlm_message *ms)
3909 {
3910 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3911 lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3912 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3913 lkb->lkb_grmode = DLM_LOCK_IV;
3914 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3915
3916 lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3917 lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3918
3919 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3920
3921 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3922 if (!lkb->lkb_lvbptr)
3923 return -ENOMEM;
3924 }
3925
3926 return 0;
3927 }
3928
3929 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3930 struct dlm_message *ms)
3931 {
3932 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3933 return -EBUSY;
3934
3935 if (receive_lvb(ls, lkb, ms))
3936 return -ENOMEM;
3937
3938 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3939 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3940
3941 return 0;
3942 }
3943
3944 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3945 struct dlm_message *ms)
3946 {
3947 if (receive_lvb(ls, lkb, ms))
3948 return -ENOMEM;
3949 return 0;
3950 }
3951
3952
3953
3954
3955 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3956 {
3957 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3958 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3959 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3960 }
3961
3962
3963
3964
3965 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3966 {
3967 int from = le32_to_cpu(ms->m_header.h_nodeid);
3968 int error = 0;
3969
3970
3971 if (ms->m_flags & cpu_to_le32(DLM_IFL_USER) &&
3972 ~lkb->lkb_flags & DLM_IFL_USER) {
3973 log_error(lkb->lkb_resource->res_ls,
3974 "got user dlm message for a kernel lock");
3975 error = -EINVAL;
3976 goto out;
3977 }
3978
3979 switch (ms->m_type) {
3980 case cpu_to_le32(DLM_MSG_CONVERT):
3981 case cpu_to_le32(DLM_MSG_UNLOCK):
3982 case cpu_to_le32(DLM_MSG_CANCEL):
3983 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3984 error = -EINVAL;
3985 break;
3986
3987 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3988 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3989 case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3990 case cpu_to_le32(DLM_MSG_GRANT):
3991 case cpu_to_le32(DLM_MSG_BAST):
3992 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3993 error = -EINVAL;
3994 break;
3995
3996 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3997 if (!is_process_copy(lkb))
3998 error = -EINVAL;
3999 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
4000 error = -EINVAL;
4001 break;
4002
4003 default:
4004 error = -EINVAL;
4005 }
4006
4007 out:
4008 if (error)
4009 log_error(lkb->lkb_resource->res_ls,
4010 "ignore invalid message %d from %d %x %x %x %d",
4011 le32_to_cpu(ms->m_type), from, lkb->lkb_id,
4012 lkb->lkb_remid, lkb->lkb_flags, lkb->lkb_nodeid);
4013 return error;
4014 }
4015
4016 static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4017 {
4018 char name[DLM_RESNAME_MAXLEN + 1];
4019 struct dlm_message *ms;
4020 struct dlm_mhandle *mh;
4021 struct dlm_rsb *r;
4022 uint32_t hash, b;
4023 int rv, dir_nodeid;
4024
4025 memset(name, 0, sizeof(name));
4026 memcpy(name, ms_name, len);
4027
4028 hash = jhash(name, len, 0);
4029 b = hash & (ls->ls_rsbtbl_size - 1);
4030
4031 dir_nodeid = dlm_hash2nodeid(ls, hash);
4032
4033 log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4034
4035 spin_lock(&ls->ls_rsbtbl[b].lock);
4036 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4037 if (!rv) {
4038 spin_unlock(&ls->ls_rsbtbl[b].lock);
4039 log_error(ls, "repeat_remove on keep %s", name);
4040 return;
4041 }
4042
4043 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4044 if (!rv) {
4045 spin_unlock(&ls->ls_rsbtbl[b].lock);
4046 log_error(ls, "repeat_remove on toss %s", name);
4047 return;
4048 }
4049
4050
4051
4052 spin_lock(&ls->ls_remove_spin);
4053 ls->ls_remove_len = len;
4054 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
4055 spin_unlock(&ls->ls_remove_spin);
4056 spin_unlock(&ls->ls_rsbtbl[b].lock);
4057
4058 rv = _create_message(ls, sizeof(struct dlm_message) + len,
4059 dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
4060 if (rv)
4061 goto out;
4062
4063 memcpy(ms->m_extra, name, len);
4064 ms->m_hash = cpu_to_le32(hash);
4065
4066 send_message(mh, ms);
4067
4068 out:
4069 spin_lock(&ls->ls_remove_spin);
4070 ls->ls_remove_len = 0;
4071 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
4072 spin_unlock(&ls->ls_remove_spin);
4073 wake_up(&ls->ls_remove_wait);
4074 }
4075
4076 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
4077 {
4078 struct dlm_lkb *lkb;
4079 struct dlm_rsb *r;
4080 int from_nodeid;
4081 int error, namelen = 0;
4082
4083 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4084
4085 error = create_lkb(ls, &lkb);
4086 if (error)
4087 goto fail;
4088
4089 receive_flags(lkb, ms);
4090 lkb->lkb_flags |= DLM_IFL_MSTCPY;
4091 error = receive_request_args(ls, lkb, ms);
4092 if (error) {
4093 __put_lkb(ls, lkb);
4094 goto fail;
4095 }
4096
4097
4098
4099
4100
4101
4102
4103 namelen = receive_extralen(ms);
4104
4105 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4106 R_RECEIVE_REQUEST, &r);
4107 if (error) {
4108 __put_lkb(ls, lkb);
4109 goto fail;
4110 }
4111
4112 lock_rsb(r);
4113
4114 if (r->res_master_nodeid != dlm_our_nodeid()) {
4115 error = validate_master_nodeid(ls, r, from_nodeid);
4116 if (error) {
4117 unlock_rsb(r);
4118 put_rsb(r);
4119 __put_lkb(ls, lkb);
4120 goto fail;
4121 }
4122 }
4123
4124 attach_lkb(r, lkb);
4125 error = do_request(r, lkb);
4126 send_request_reply(r, lkb, error);
4127 do_request_effects(r, lkb, error);
4128
4129 unlock_rsb(r);
4130 put_rsb(r);
4131
4132 if (error == -EINPROGRESS)
4133 error = 0;
4134 if (error)
4135 dlm_put_lkb(lkb);
4136 return 0;
4137
4138 fail:
4139
4140
4141
4142
4143
4144
4145
4146
4147
4148
4149
4150
4151
4152
4153
4154 if (error != -ENOTBLK) {
4155 log_limit(ls, "receive_request %x from %d %d",
4156 le32_to_cpu(ms->m_lkid), from_nodeid, error);
4157 }
4158
4159 if (namelen && error == -EBADR) {
4160 send_repeat_remove(ls, ms->m_extra, namelen);
4161 msleep(1000);
4162 }
4163
4164 setup_stub_lkb(ls, ms);
4165 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4166 return error;
4167 }
4168
4169 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4170 {
4171 struct dlm_lkb *lkb;
4172 struct dlm_rsb *r;
4173 int error, reply = 1;
4174
4175 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4176 if (error)
4177 goto fail;
4178
4179 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4180 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4181 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4182 (unsigned long long)lkb->lkb_recover_seq,
4183 le32_to_cpu(ms->m_header.h_nodeid),
4184 le32_to_cpu(ms->m_lkid));
4185 error = -ENOENT;
4186 dlm_put_lkb(lkb);
4187 goto fail;
4188 }
4189
4190 r = lkb->lkb_resource;
4191
4192 hold_rsb(r);
4193 lock_rsb(r);
4194
4195 error = validate_message(lkb, ms);
4196 if (error)
4197 goto out;
4198
4199 receive_flags(lkb, ms);
4200
4201 error = receive_convert_args(ls, lkb, ms);
4202 if (error) {
4203 send_convert_reply(r, lkb, error);
4204 goto out;
4205 }
4206
4207 reply = !down_conversion(lkb);
4208
4209 error = do_convert(r, lkb);
4210 if (reply)
4211 send_convert_reply(r, lkb, error);
4212 do_convert_effects(r, lkb, error);
4213 out:
4214 unlock_rsb(r);
4215 put_rsb(r);
4216 dlm_put_lkb(lkb);
4217 return 0;
4218
4219 fail:
4220 setup_stub_lkb(ls, ms);
4221 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4222 return error;
4223 }
4224
4225 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4226 {
4227 struct dlm_lkb *lkb;
4228 struct dlm_rsb *r;
4229 int error;
4230
4231 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4232 if (error)
4233 goto fail;
4234
4235 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4236 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4237 lkb->lkb_id, lkb->lkb_remid,
4238 le32_to_cpu(ms->m_header.h_nodeid),
4239 le32_to_cpu(ms->m_lkid));
4240 error = -ENOENT;
4241 dlm_put_lkb(lkb);
4242 goto fail;
4243 }
4244
4245 r = lkb->lkb_resource;
4246
4247 hold_rsb(r);
4248 lock_rsb(r);
4249
4250 error = validate_message(lkb, ms);
4251 if (error)
4252 goto out;
4253
4254 receive_flags(lkb, ms);
4255
4256 error = receive_unlock_args(ls, lkb, ms);
4257 if (error) {
4258 send_unlock_reply(r, lkb, error);
4259 goto out;
4260 }
4261
4262 error = do_unlock(r, lkb);
4263 send_unlock_reply(r, lkb, error);
4264 do_unlock_effects(r, lkb, error);
4265 out:
4266 unlock_rsb(r);
4267 put_rsb(r);
4268 dlm_put_lkb(lkb);
4269 return 0;
4270
4271 fail:
4272 setup_stub_lkb(ls, ms);
4273 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4274 return error;
4275 }
4276
4277 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4278 {
4279 struct dlm_lkb *lkb;
4280 struct dlm_rsb *r;
4281 int error;
4282
4283 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4284 if (error)
4285 goto fail;
4286
4287 receive_flags(lkb, ms);
4288
4289 r = lkb->lkb_resource;
4290
4291 hold_rsb(r);
4292 lock_rsb(r);
4293
4294 error = validate_message(lkb, ms);
4295 if (error)
4296 goto out;
4297
4298 error = do_cancel(r, lkb);
4299 send_cancel_reply(r, lkb, error);
4300 do_cancel_effects(r, lkb, error);
4301 out:
4302 unlock_rsb(r);
4303 put_rsb(r);
4304 dlm_put_lkb(lkb);
4305 return 0;
4306
4307 fail:
4308 setup_stub_lkb(ls, ms);
4309 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4310 return error;
4311 }
4312
4313 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4314 {
4315 struct dlm_lkb *lkb;
4316 struct dlm_rsb *r;
4317 int error;
4318
4319 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4320 if (error)
4321 return error;
4322
4323 r = lkb->lkb_resource;
4324
4325 hold_rsb(r);
4326 lock_rsb(r);
4327
4328 error = validate_message(lkb, ms);
4329 if (error)
4330 goto out;
4331
4332 receive_flags_reply(lkb, ms);
4333 if (is_altmode(lkb))
4334 munge_altmode(lkb, ms);
4335 grant_lock_pc(r, lkb, ms);
4336 queue_cast(r, lkb, 0);
4337 out:
4338 unlock_rsb(r);
4339 put_rsb(r);
4340 dlm_put_lkb(lkb);
4341 return 0;
4342 }
4343
4344 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4345 {
4346 struct dlm_lkb *lkb;
4347 struct dlm_rsb *r;
4348 int error;
4349
4350 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4351 if (error)
4352 return error;
4353
4354 r = lkb->lkb_resource;
4355
4356 hold_rsb(r);
4357 lock_rsb(r);
4358
4359 error = validate_message(lkb, ms);
4360 if (error)
4361 goto out;
4362
4363 queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4364 lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4365 out:
4366 unlock_rsb(r);
4367 put_rsb(r);
4368 dlm_put_lkb(lkb);
4369 return 0;
4370 }
4371
4372 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4373 {
4374 int len, error, ret_nodeid, from_nodeid, our_nodeid;
4375
4376 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4377 our_nodeid = dlm_our_nodeid();
4378
4379 len = receive_extralen(ms);
4380
4381 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4382 &ret_nodeid, NULL);
4383
4384
4385 if (!error && ret_nodeid == our_nodeid) {
4386 receive_request(ls, ms);
4387 return;
4388 }
4389 send_lookup_reply(ls, ms, ret_nodeid, error);
4390 }
4391
4392 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4393 {
4394 char name[DLM_RESNAME_MAXLEN+1];
4395 struct dlm_rsb *r;
4396 uint32_t hash, b;
4397 int rv, len, dir_nodeid, from_nodeid;
4398
4399 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4400
4401 len = receive_extralen(ms);
4402
4403 if (len > DLM_RESNAME_MAXLEN) {
4404 log_error(ls, "receive_remove from %d bad len %d",
4405 from_nodeid, len);
4406 return;
4407 }
4408
4409 dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4410 if (dir_nodeid != dlm_our_nodeid()) {
4411 log_error(ls, "receive_remove from %d bad nodeid %d",
4412 from_nodeid, dir_nodeid);
4413 return;
4414 }
4415
4416
4417
4418
4419
4420
4421
4422
4423
4424
4425 memset(name, 0, sizeof(name));
4426 memcpy(name, ms->m_extra, len);
4427
4428 hash = jhash(name, len, 0);
4429 b = hash & (ls->ls_rsbtbl_size - 1);
4430
4431 spin_lock(&ls->ls_rsbtbl[b].lock);
4432
4433 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4434 if (rv) {
4435
4436 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4437 if (rv) {
4438
4439 log_error(ls, "receive_remove from %d not found %s",
4440 from_nodeid, name);
4441 spin_unlock(&ls->ls_rsbtbl[b].lock);
4442 return;
4443 }
4444 if (r->res_master_nodeid != from_nodeid) {
4445
4446 log_error(ls, "receive_remove keep from %d master %d",
4447 from_nodeid, r->res_master_nodeid);
4448 dlm_print_rsb(r);
4449 spin_unlock(&ls->ls_rsbtbl[b].lock);
4450 return;
4451 }
4452
4453 log_debug(ls, "receive_remove from %d master %d first %x %s",
4454 from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4455 name);
4456 spin_unlock(&ls->ls_rsbtbl[b].lock);
4457 return;
4458 }
4459
4460 if (r->res_master_nodeid != from_nodeid) {
4461 log_error(ls, "receive_remove toss from %d master %d",
4462 from_nodeid, r->res_master_nodeid);
4463 dlm_print_rsb(r);
4464 spin_unlock(&ls->ls_rsbtbl[b].lock);
4465 return;
4466 }
4467
4468 if (kref_put(&r->res_ref, kill_rsb)) {
4469 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4470 spin_unlock(&ls->ls_rsbtbl[b].lock);
4471 dlm_free_rsb(r);
4472 } else {
4473 log_error(ls, "receive_remove from %d rsb ref error",
4474 from_nodeid);
4475 dlm_print_rsb(r);
4476 spin_unlock(&ls->ls_rsbtbl[b].lock);
4477 }
4478 }
4479
4480 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4481 {
4482 do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4483 }
4484
4485 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4486 {
4487 struct dlm_lkb *lkb;
4488 struct dlm_rsb *r;
4489 int error, mstype, result;
4490 int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4491
4492 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4493 if (error)
4494 return error;
4495
4496 r = lkb->lkb_resource;
4497 hold_rsb(r);
4498 lock_rsb(r);
4499
4500 error = validate_message(lkb, ms);
4501 if (error)
4502 goto out;
4503
4504 mstype = lkb->lkb_wait_type;
4505 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4506 if (error) {
4507 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4508 lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4509 from_dlm_errno(le32_to_cpu(ms->m_result)));
4510 dlm_dump_rsb(r);
4511 goto out;
4512 }
4513
4514
4515
4516 if (mstype == DLM_MSG_LOOKUP) {
4517 r->res_master_nodeid = from_nodeid;
4518 r->res_nodeid = from_nodeid;
4519 lkb->lkb_nodeid = from_nodeid;
4520 }
4521
4522
4523 result = from_dlm_errno(le32_to_cpu(ms->m_result));
4524
4525 switch (result) {
4526 case -EAGAIN:
4527
4528 queue_cast(r, lkb, -EAGAIN);
4529 confirm_master(r, -EAGAIN);
4530 unhold_lkb(lkb);
4531 break;
4532
4533 case -EINPROGRESS:
4534 case 0:
4535
4536 receive_flags_reply(lkb, ms);
4537 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4538 if (is_altmode(lkb))
4539 munge_altmode(lkb, ms);
4540 if (result) {
4541 add_lkb(r, lkb, DLM_LKSTS_WAITING);
4542 add_timeout(lkb);
4543 } else {
4544 grant_lock_pc(r, lkb, ms);
4545 queue_cast(r, lkb, 0);
4546 }
4547 confirm_master(r, result);
4548 break;
4549
4550 case -EBADR:
4551 case -ENOTBLK:
4552
4553 log_limit(ls, "receive_request_reply %x from %d %d "
4554 "master %d dir %d first %x %s", lkb->lkb_id,
4555 from_nodeid, result, r->res_master_nodeid,
4556 r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4557
4558 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4559 r->res_master_nodeid != dlm_our_nodeid()) {
4560
4561 r->res_master_nodeid = 0;
4562 r->res_nodeid = -1;
4563 lkb->lkb_nodeid = -1;
4564 }
4565
4566 if (is_overlap(lkb)) {
4567
4568 queue_cast_overlap(r, lkb);
4569 confirm_master(r, result);
4570 unhold_lkb(lkb);
4571 } else {
4572 _request_lock(r, lkb);
4573
4574 if (r->res_master_nodeid == dlm_our_nodeid())
4575 confirm_master(r, 0);
4576 }
4577 break;
4578
4579 default:
4580 log_error(ls, "receive_request_reply %x error %d",
4581 lkb->lkb_id, result);
4582 }
4583
4584 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4585 log_debug(ls, "receive_request_reply %x result %d unlock",
4586 lkb->lkb_id, result);
4587 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4588 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4589 send_unlock(r, lkb);
4590 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4591 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4592 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4593 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4594 send_cancel(r, lkb);
4595 } else {
4596 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4597 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4598 }
4599 out:
4600 unlock_rsb(r);
4601 put_rsb(r);
4602 dlm_put_lkb(lkb);
4603 return 0;
4604 }
4605
4606 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4607 struct dlm_message *ms)
4608 {
4609
4610 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4611 case -EAGAIN:
4612
4613 queue_cast(r, lkb, -EAGAIN);
4614 break;
4615
4616 case -EDEADLK:
4617 receive_flags_reply(lkb, ms);
4618 revert_lock_pc(r, lkb);
4619 queue_cast(r, lkb, -EDEADLK);
4620 break;
4621
4622 case -EINPROGRESS:
4623
4624 receive_flags_reply(lkb, ms);
4625 if (is_demoted(lkb))
4626 munge_demoted(lkb);
4627 del_lkb(r, lkb);
4628 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4629 add_timeout(lkb);
4630 break;
4631
4632 case 0:
4633
4634 receive_flags_reply(lkb, ms);
4635 if (is_demoted(lkb))
4636 munge_demoted(lkb);
4637 grant_lock_pc(r, lkb, ms);
4638 queue_cast(r, lkb, 0);
4639 break;
4640
4641 default:
4642 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4643 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4644 le32_to_cpu(ms->m_lkid),
4645 from_dlm_errno(le32_to_cpu(ms->m_result)));
4646 dlm_print_rsb(r);
4647 dlm_print_lkb(lkb);
4648 }
4649 }
4650
4651 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4652 {
4653 struct dlm_rsb *r = lkb->lkb_resource;
4654 int error;
4655
4656 hold_rsb(r);
4657 lock_rsb(r);
4658
4659 error = validate_message(lkb, ms);
4660 if (error)
4661 goto out;
4662
4663
4664 error = remove_from_waiters_ms(lkb, ms);
4665 if (error)
4666 goto out;
4667
4668 __receive_convert_reply(r, lkb, ms);
4669 out:
4670 unlock_rsb(r);
4671 put_rsb(r);
4672 }
4673
4674 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4675 {
4676 struct dlm_lkb *lkb;
4677 int error;
4678
4679 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4680 if (error)
4681 return error;
4682
4683 _receive_convert_reply(lkb, ms);
4684 dlm_put_lkb(lkb);
4685 return 0;
4686 }
4687
4688 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4689 {
4690 struct dlm_rsb *r = lkb->lkb_resource;
4691 int error;
4692
4693 hold_rsb(r);
4694 lock_rsb(r);
4695
4696 error = validate_message(lkb, ms);
4697 if (error)
4698 goto out;
4699
4700
4701 error = remove_from_waiters_ms(lkb, ms);
4702 if (error)
4703 goto out;
4704
4705
4706
4707 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4708 case -DLM_EUNLOCK:
4709 receive_flags_reply(lkb, ms);
4710 remove_lock_pc(r, lkb);
4711 queue_cast(r, lkb, -DLM_EUNLOCK);
4712 break;
4713 case -ENOENT:
4714 break;
4715 default:
4716 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4717 lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4718 }
4719 out:
4720 unlock_rsb(r);
4721 put_rsb(r);
4722 }
4723
4724 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4725 {
4726 struct dlm_lkb *lkb;
4727 int error;
4728
4729 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4730 if (error)
4731 return error;
4732
4733 _receive_unlock_reply(lkb, ms);
4734 dlm_put_lkb(lkb);
4735 return 0;
4736 }
4737
4738 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4739 {
4740 struct dlm_rsb *r = lkb->lkb_resource;
4741 int error;
4742
4743 hold_rsb(r);
4744 lock_rsb(r);
4745
4746 error = validate_message(lkb, ms);
4747 if (error)
4748 goto out;
4749
4750
4751 error = remove_from_waiters_ms(lkb, ms);
4752 if (error)
4753 goto out;
4754
4755
4756
4757 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4758 case -DLM_ECANCEL:
4759 receive_flags_reply(lkb, ms);
4760 revert_lock_pc(r, lkb);
4761 queue_cast(r, lkb, -DLM_ECANCEL);
4762 break;
4763 case 0:
4764 break;
4765 default:
4766 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4767 lkb->lkb_id,
4768 from_dlm_errno(le32_to_cpu(ms->m_result)));
4769 }
4770 out:
4771 unlock_rsb(r);
4772 put_rsb(r);
4773 }
4774
4775 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4776 {
4777 struct dlm_lkb *lkb;
4778 int error;
4779
4780 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4781 if (error)
4782 return error;
4783
4784 _receive_cancel_reply(lkb, ms);
4785 dlm_put_lkb(lkb);
4786 return 0;
4787 }
4788
4789 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4790 {
4791 struct dlm_lkb *lkb;
4792 struct dlm_rsb *r;
4793 int error, ret_nodeid;
4794 int do_lookup_list = 0;
4795
4796 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4797 if (error) {
4798 log_error(ls, "%s no lkid %x", __func__,
4799 le32_to_cpu(ms->m_lkid));
4800 return;
4801 }
4802
4803
4804
4805
4806 r = lkb->lkb_resource;
4807 hold_rsb(r);
4808 lock_rsb(r);
4809
4810 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4811 if (error)
4812 goto out;
4813
4814 ret_nodeid = le32_to_cpu(ms->m_nodeid);
4815
4816
4817
4818
4819
4820
4821
4822 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4823
4824 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4825 "master %d dir %d our %d first %x %s",
4826 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4827 ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4828 dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4829 }
4830
4831 if (ret_nodeid == dlm_our_nodeid()) {
4832 r->res_master_nodeid = ret_nodeid;
4833 r->res_nodeid = 0;
4834 do_lookup_list = 1;
4835 r->res_first_lkid = 0;
4836 } else if (ret_nodeid == -1) {
4837
4838 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4839 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4840 r->res_master_nodeid = 0;
4841 r->res_nodeid = -1;
4842 lkb->lkb_nodeid = -1;
4843 } else {
4844
4845 r->res_master_nodeid = ret_nodeid;
4846 r->res_nodeid = ret_nodeid;
4847 }
4848
4849 if (is_overlap(lkb)) {
4850 log_debug(ls, "receive_lookup_reply %x unlock %x",
4851 lkb->lkb_id, lkb->lkb_flags);
4852 queue_cast_overlap(r, lkb);
4853 unhold_lkb(lkb);
4854 goto out_list;
4855 }
4856
4857 _request_lock(r, lkb);
4858
4859 out_list:
4860 if (do_lookup_list)
4861 process_lookup_list(r);
4862 out:
4863 unlock_rsb(r);
4864 put_rsb(r);
4865 dlm_put_lkb(lkb);
4866 }
4867
4868 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4869 uint32_t saved_seq)
4870 {
4871 int error = 0, noent = 0;
4872
4873 if (!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid))) {
4874 log_limit(ls, "receive %d from non-member %d %x %x %d",
4875 le32_to_cpu(ms->m_type),
4876 le32_to_cpu(ms->m_header.h_nodeid),
4877 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4878 from_dlm_errno(le32_to_cpu(ms->m_result)));
4879 return;
4880 }
4881
4882 switch (ms->m_type) {
4883
4884
4885
4886 case cpu_to_le32(DLM_MSG_REQUEST):
4887 error = receive_request(ls, ms);
4888 break;
4889
4890 case cpu_to_le32(DLM_MSG_CONVERT):
4891 error = receive_convert(ls, ms);
4892 break;
4893
4894 case cpu_to_le32(DLM_MSG_UNLOCK):
4895 error = receive_unlock(ls, ms);
4896 break;
4897
4898 case cpu_to_le32(DLM_MSG_CANCEL):
4899 noent = 1;
4900 error = receive_cancel(ls, ms);
4901 break;
4902
4903
4904
4905 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4906 error = receive_request_reply(ls, ms);
4907 break;
4908
4909 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4910 error = receive_convert_reply(ls, ms);
4911 break;
4912
4913 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4914 error = receive_unlock_reply(ls, ms);
4915 break;
4916
4917 case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4918 error = receive_cancel_reply(ls, ms);
4919 break;
4920
4921
4922
4923 case cpu_to_le32(DLM_MSG_GRANT):
4924 noent = 1;
4925 error = receive_grant(ls, ms);
4926 break;
4927
4928 case cpu_to_le32(DLM_MSG_BAST):
4929 noent = 1;
4930 error = receive_bast(ls, ms);
4931 break;
4932
4933
4934
4935 case cpu_to_le32(DLM_MSG_LOOKUP):
4936 receive_lookup(ls, ms);
4937 break;
4938
4939 case cpu_to_le32(DLM_MSG_REMOVE):
4940 receive_remove(ls, ms);
4941 break;
4942
4943
4944
4945 case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4946 receive_lookup_reply(ls, ms);
4947 break;
4948
4949
4950
4951 case cpu_to_le32(DLM_MSG_PURGE):
4952 receive_purge(ls, ms);
4953 break;
4954
4955 default:
4956 log_error(ls, "unknown message type %d",
4957 le32_to_cpu(ms->m_type));
4958 }
4959
4960
4961
4962
4963
4964
4965
4966
4967
4968
4969
4970
4971 if (error == -ENOENT && noent) {
4972 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4973 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4974 le32_to_cpu(ms->m_header.h_nodeid),
4975 le32_to_cpu(ms->m_lkid), saved_seq);
4976 } else if (error == -ENOENT) {
4977 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4978 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4979 le32_to_cpu(ms->m_header.h_nodeid),
4980 le32_to_cpu(ms->m_lkid), saved_seq);
4981
4982 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4983 dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4984 }
4985
4986 if (error == -EINVAL) {
4987 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4988 "saved_seq %u",
4989 le32_to_cpu(ms->m_type),
4990 le32_to_cpu(ms->m_header.h_nodeid),
4991 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4992 saved_seq);
4993 }
4994 }
4995
4996
4997
4998
4999
5000
5001
5002
5003
5004 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
5005 int nodeid)
5006 {
5007 if (dlm_locking_stopped(ls)) {
5008
5009
5010
5011 if (!ls->ls_generation) {
5012 log_limit(ls, "receive %d from %d ignore old gen",
5013 le32_to_cpu(ms->m_type), nodeid);
5014 return;
5015 }
5016
5017 dlm_add_requestqueue(ls, nodeid, ms);
5018 } else {
5019 dlm_wait_requestqueue(ls);
5020 _receive_message(ls, ms, 0);
5021 }
5022 }
5023
5024
5025
5026
5027 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
5028 uint32_t saved_seq)
5029 {
5030 _receive_message(ls, ms, saved_seq);
5031 }
5032
5033
5034
5035
5036
5037
5038 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
5039 {
5040 struct dlm_header *hd = &p->header;
5041 struct dlm_ls *ls;
5042 int type = 0;
5043
5044 switch (hd->h_cmd) {
5045 case DLM_MSG:
5046 type = le32_to_cpu(p->message.m_type);
5047 break;
5048 case DLM_RCOM:
5049 type = le32_to_cpu(p->rcom.rc_type);
5050 break;
5051 default:
5052 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
5053 return;
5054 }
5055
5056 if (le32_to_cpu(hd->h_nodeid) != nodeid) {
5057 log_print("invalid h_nodeid %d from %d lockspace %x",
5058 le32_to_cpu(hd->h_nodeid), nodeid,
5059 le32_to_cpu(hd->u.h_lockspace));
5060 return;
5061 }
5062
5063 ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
5064 if (!ls) {
5065 if (dlm_config.ci_log_debug) {
5066 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
5067 "%u from %d cmd %d type %d\n",
5068 le32_to_cpu(hd->u.h_lockspace), nodeid,
5069 hd->h_cmd, type);
5070 }
5071
5072 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
5073 dlm_send_ls_not_ready(nodeid, &p->rcom);
5074 return;
5075 }
5076
5077
5078
5079
5080 down_read(&ls->ls_recv_active);
5081 if (hd->h_cmd == DLM_MSG)
5082 dlm_receive_message(ls, &p->message, nodeid);
5083 else
5084 dlm_receive_rcom(ls, &p->rcom, nodeid);
5085 up_read(&ls->ls_recv_active);
5086
5087 dlm_put_lockspace(ls);
5088 }
5089
5090 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5091 struct dlm_message *ms_stub)
5092 {
5093 if (middle_conversion(lkb)) {
5094 hold_lkb(lkb);
5095 memset(ms_stub, 0, sizeof(struct dlm_message));
5096 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
5097 ms_stub->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5098 ms_stub->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5099 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5100 _receive_convert_reply(lkb, ms_stub);
5101
5102
5103 lkb->lkb_grmode = DLM_LOCK_IV;
5104 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5105 unhold_lkb(lkb);
5106
5107 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5108 lkb->lkb_flags |= DLM_IFL_RESEND;
5109 }
5110
5111
5112
5113 }
5114
5115
5116
5117
5118 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5119 int dir_nodeid)
5120 {
5121 if (dlm_no_directory(ls))
5122 return 1;
5123
5124 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5125 return 1;
5126
5127 return 0;
5128 }
5129
5130
5131
5132
5133
5134
5135
5136 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5137 {
5138 struct dlm_lkb *lkb, *safe;
5139 struct dlm_message *ms_stub;
5140 int wait_type, stub_unlock_result, stub_cancel_result;
5141 int dir_nodeid;
5142
5143 ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL);
5144 if (!ms_stub)
5145 return;
5146
5147 mutex_lock(&ls->ls_waiters_mutex);
5148
5149 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5150
5151 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5152
5153
5154
5155
5156 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5157 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5158 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5159 lkb->lkb_id,
5160 lkb->lkb_remid,
5161 lkb->lkb_wait_type,
5162 lkb->lkb_resource->res_nodeid,
5163 lkb->lkb_nodeid,
5164 lkb->lkb_wait_nodeid,
5165 dir_nodeid);
5166 }
5167
5168
5169
5170
5171 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5172 lkb->lkb_flags |= DLM_IFL_RESEND;
5173 continue;
5174 }
5175
5176 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5177 continue;
5178
5179 wait_type = lkb->lkb_wait_type;
5180 stub_unlock_result = -DLM_EUNLOCK;
5181 stub_cancel_result = -DLM_ECANCEL;
5182
5183
5184
5185
5186
5187
5188 if (!wait_type) {
5189 if (is_overlap_cancel(lkb)) {
5190 wait_type = DLM_MSG_CANCEL;
5191 if (lkb->lkb_grmode == DLM_LOCK_IV)
5192 stub_cancel_result = 0;
5193 }
5194 if (is_overlap_unlock(lkb)) {
5195 wait_type = DLM_MSG_UNLOCK;
5196 if (lkb->lkb_grmode == DLM_LOCK_IV)
5197 stub_unlock_result = -ENOENT;
5198 }
5199
5200 log_debug(ls, "rwpre overlap %x %x %d %d %d",
5201 lkb->lkb_id, lkb->lkb_flags, wait_type,
5202 stub_cancel_result, stub_unlock_result);
5203 }
5204
5205 switch (wait_type) {
5206
5207 case DLM_MSG_REQUEST:
5208 lkb->lkb_flags |= DLM_IFL_RESEND;
5209 break;
5210
5211 case DLM_MSG_CONVERT:
5212 recover_convert_waiter(ls, lkb, ms_stub);
5213 break;
5214
5215 case DLM_MSG_UNLOCK:
5216 hold_lkb(lkb);
5217 memset(ms_stub, 0, sizeof(struct dlm_message));
5218 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
5219 ms_stub->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5220 ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_unlock_result));
5221 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5222 _receive_unlock_reply(lkb, ms_stub);
5223 dlm_put_lkb(lkb);
5224 break;
5225
5226 case DLM_MSG_CANCEL:
5227 hold_lkb(lkb);
5228 memset(ms_stub, 0, sizeof(struct dlm_message));
5229 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
5230 ms_stub->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5231 ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_cancel_result));
5232 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5233 _receive_cancel_reply(lkb, ms_stub);
5234 dlm_put_lkb(lkb);
5235 break;
5236
5237 default:
5238 log_error(ls, "invalid lkb wait_type %d %d",
5239 lkb->lkb_wait_type, wait_type);
5240 }
5241 schedule();
5242 }
5243 mutex_unlock(&ls->ls_waiters_mutex);
5244 kfree(ms_stub);
5245 }
5246
5247 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5248 {
5249 struct dlm_lkb *lkb = NULL, *iter;
5250
5251 mutex_lock(&ls->ls_waiters_mutex);
5252 list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5253 if (iter->lkb_flags & DLM_IFL_RESEND) {
5254 hold_lkb(iter);
5255 lkb = iter;
5256 break;
5257 }
5258 }
5259 mutex_unlock(&ls->ls_waiters_mutex);
5260
5261 return lkb;
5262 }
5263
5264
5265
5266
5267
5268
5269
5270
5271
5272
5273
5274
5275
5276
5277
5278
5279
5280 int dlm_recover_waiters_post(struct dlm_ls *ls)
5281 {
5282 struct dlm_lkb *lkb;
5283 struct dlm_rsb *r;
5284 int error = 0, mstype, err, oc, ou;
5285
5286 while (1) {
5287 if (dlm_locking_stopped(ls)) {
5288 log_debug(ls, "recover_waiters_post aborted");
5289 error = -EINTR;
5290 break;
5291 }
5292
5293 lkb = find_resend_waiter(ls);
5294 if (!lkb)
5295 break;
5296
5297 r = lkb->lkb_resource;
5298 hold_rsb(r);
5299 lock_rsb(r);
5300
5301 mstype = lkb->lkb_wait_type;
5302 oc = is_overlap_cancel(lkb);
5303 ou = is_overlap_unlock(lkb);
5304 err = 0;
5305
5306 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5307 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5308 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5309 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5310 dlm_dir_nodeid(r), oc, ou);
5311
5312
5313
5314
5315
5316 lkb->lkb_flags &= ~DLM_IFL_RESEND;
5317 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5318 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5319 lkb->lkb_wait_type = 0;
5320
5321
5322
5323 while (lkb->lkb_wait_count) {
5324 lkb->lkb_wait_count--;
5325 unhold_lkb(lkb);
5326 }
5327 mutex_lock(&ls->ls_waiters_mutex);
5328 list_del_init(&lkb->lkb_wait_reply);
5329 mutex_unlock(&ls->ls_waiters_mutex);
5330
5331 if (oc || ou) {
5332
5333 switch (mstype) {
5334 case DLM_MSG_LOOKUP:
5335 case DLM_MSG_REQUEST:
5336 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5337 -DLM_ECANCEL);
5338 unhold_lkb(lkb);
5339 break;
5340 case DLM_MSG_CONVERT:
5341 if (oc) {
5342 queue_cast(r, lkb, -DLM_ECANCEL);
5343 } else {
5344 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5345 _unlock_lock(r, lkb);
5346 }
5347 break;
5348 default:
5349 err = 1;
5350 }
5351 } else {
5352 switch (mstype) {
5353 case DLM_MSG_LOOKUP:
5354 case DLM_MSG_REQUEST:
5355 _request_lock(r, lkb);
5356 if (is_master(r))
5357 confirm_master(r, 0);
5358 break;
5359 case DLM_MSG_CONVERT:
5360 _convert_lock(r, lkb);
5361 break;
5362 default:
5363 err = 1;
5364 }
5365 }
5366
5367 if (err) {
5368 log_error(ls, "waiter %x msg %d r_nodeid %d "
5369 "dir_nodeid %d overlap %d %d",
5370 lkb->lkb_id, mstype, r->res_nodeid,
5371 dlm_dir_nodeid(r), oc, ou);
5372 }
5373 unlock_rsb(r);
5374 put_rsb(r);
5375 dlm_put_lkb(lkb);
5376 }
5377
5378 return error;
5379 }
5380
5381 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5382 struct list_head *list)
5383 {
5384 struct dlm_lkb *lkb, *safe;
5385
5386 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5387 if (!is_master_copy(lkb))
5388 continue;
5389
5390
5391
5392
5393 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5394 continue;
5395
5396 del_lkb(r, lkb);
5397
5398
5399 if (!dlm_put_lkb(lkb))
5400 log_error(ls, "purged mstcpy lkb not released");
5401 }
5402 }
5403
5404 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5405 {
5406 struct dlm_ls *ls = r->res_ls;
5407
5408 purge_mstcpy_list(ls, r, &r->res_grantqueue);
5409 purge_mstcpy_list(ls, r, &r->res_convertqueue);
5410 purge_mstcpy_list(ls, r, &r->res_waitqueue);
5411 }
5412
5413 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5414 struct list_head *list,
5415 int nodeid_gone, unsigned int *count)
5416 {
5417 struct dlm_lkb *lkb, *safe;
5418
5419 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5420 if (!is_master_copy(lkb))
5421 continue;
5422
5423 if ((lkb->lkb_nodeid == nodeid_gone) ||
5424 dlm_is_removed(ls, lkb->lkb_nodeid)) {
5425
5426
5427
5428 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5429 (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5430 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5431 }
5432
5433 del_lkb(r, lkb);
5434
5435
5436 if (!dlm_put_lkb(lkb))
5437 log_error(ls, "purged dead lkb not released");
5438
5439 rsb_set_flag(r, RSB_RECOVER_GRANT);
5440
5441 (*count)++;
5442 }
5443 }
5444 }
5445
5446
5447
5448 void dlm_recover_purge(struct dlm_ls *ls)
5449 {
5450 struct dlm_rsb *r;
5451 struct dlm_member *memb;
5452 int nodes_count = 0;
5453 int nodeid_gone = 0;
5454 unsigned int lkb_count = 0;
5455
5456
5457
5458
5459 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5460 nodes_count++;
5461 nodeid_gone = memb->nodeid;
5462 }
5463
5464 if (!nodes_count)
5465 return;
5466
5467 down_write(&ls->ls_root_sem);
5468 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5469 hold_rsb(r);
5470 lock_rsb(r);
5471 if (is_master(r)) {
5472 purge_dead_list(ls, r, &r->res_grantqueue,
5473 nodeid_gone, &lkb_count);
5474 purge_dead_list(ls, r, &r->res_convertqueue,
5475 nodeid_gone, &lkb_count);
5476 purge_dead_list(ls, r, &r->res_waitqueue,
5477 nodeid_gone, &lkb_count);
5478 }
5479 unlock_rsb(r);
5480 unhold_rsb(r);
5481 cond_resched();
5482 }
5483 up_write(&ls->ls_root_sem);
5484
5485 if (lkb_count)
5486 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5487 lkb_count, nodes_count);
5488 }
5489
5490 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5491 {
5492 struct rb_node *n;
5493 struct dlm_rsb *r;
5494
5495 spin_lock(&ls->ls_rsbtbl[bucket].lock);
5496 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5497 r = rb_entry(n, struct dlm_rsb, res_hashnode);
5498
5499 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5500 continue;
5501 if (!is_master(r)) {
5502 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5503 continue;
5504 }
5505 hold_rsb(r);
5506 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5507 return r;
5508 }
5509 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5510 return NULL;
5511 }
5512
5513
5514
5515
5516
5517
5518
5519
5520
5521
5522
5523
5524
5525
5526
5527
5528
5529
5530 void dlm_recover_grant(struct dlm_ls *ls)
5531 {
5532 struct dlm_rsb *r;
5533 int bucket = 0;
5534 unsigned int count = 0;
5535 unsigned int rsb_count = 0;
5536 unsigned int lkb_count = 0;
5537
5538 while (1) {
5539 r = find_grant_rsb(ls, bucket);
5540 if (!r) {
5541 if (bucket == ls->ls_rsbtbl_size - 1)
5542 break;
5543 bucket++;
5544 continue;
5545 }
5546 rsb_count++;
5547 count = 0;
5548 lock_rsb(r);
5549
5550 grant_pending_locks(r, &count);
5551 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5552 lkb_count += count;
5553 confirm_master(r, 0);
5554 unlock_rsb(r);
5555 put_rsb(r);
5556 cond_resched();
5557 }
5558
5559 if (lkb_count)
5560 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5561 lkb_count, rsb_count);
5562 }
5563
5564 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5565 uint32_t remid)
5566 {
5567 struct dlm_lkb *lkb;
5568
5569 list_for_each_entry(lkb, head, lkb_statequeue) {
5570 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5571 return lkb;
5572 }
5573 return NULL;
5574 }
5575
5576 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5577 uint32_t remid)
5578 {
5579 struct dlm_lkb *lkb;
5580
5581 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5582 if (lkb)
5583 return lkb;
5584 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5585 if (lkb)
5586 return lkb;
5587 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5588 if (lkb)
5589 return lkb;
5590 return NULL;
5591 }
5592
5593
5594 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5595 struct dlm_rsb *r, struct dlm_rcom *rc)
5596 {
5597 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5598
5599 lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5600 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5601 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5602 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5603 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5604 lkb->lkb_flags |= DLM_IFL_MSTCPY;
5605 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5606 lkb->lkb_rqmode = rl->rl_rqmode;
5607 lkb->lkb_grmode = rl->rl_grmode;
5608
5609
5610 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5611 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5612
5613 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5614 int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5615 sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5616 if (lvblen > ls->ls_lvblen)
5617 return -EINVAL;
5618 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5619 if (!lkb->lkb_lvbptr)
5620 return -ENOMEM;
5621 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5622 }
5623
5624
5625
5626
5627
5628 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5629 middle_conversion(lkb)) {
5630 rl->rl_status = DLM_LKSTS_CONVERT;
5631 lkb->lkb_grmode = DLM_LOCK_IV;
5632 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5633 }
5634
5635 return 0;
5636 }
5637
5638
5639
5640
5641
5642
5643
5644
5645 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5646 {
5647 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5648 struct dlm_rsb *r;
5649 struct dlm_lkb *lkb;
5650 uint32_t remid = 0;
5651 int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5652 int error;
5653
5654 if (rl->rl_parent_lkid) {
5655 error = -EOPNOTSUPP;
5656 goto out;
5657 }
5658
5659 remid = le32_to_cpu(rl->rl_lkid);
5660
5661
5662
5663
5664
5665
5666
5667
5668
5669 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5670 from_nodeid, R_RECEIVE_RECOVER, &r);
5671 if (error)
5672 goto out;
5673
5674 lock_rsb(r);
5675
5676 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5677 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5678 from_nodeid, remid);
5679 error = -EBADR;
5680 goto out_unlock;
5681 }
5682
5683 lkb = search_remid(r, from_nodeid, remid);
5684 if (lkb) {
5685 error = -EEXIST;
5686 goto out_remid;
5687 }
5688
5689 error = create_lkb(ls, &lkb);
5690 if (error)
5691 goto out_unlock;
5692
5693 error = receive_rcom_lock_args(ls, lkb, r, rc);
5694 if (error) {
5695 __put_lkb(ls, lkb);
5696 goto out_unlock;
5697 }
5698
5699 attach_lkb(r, lkb);
5700 add_lkb(r, lkb, rl->rl_status);
5701 ls->ls_recover_locks_in++;
5702
5703 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5704 rsb_set_flag(r, RSB_RECOVER_GRANT);
5705
5706 out_remid:
5707
5708
5709 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5710
5711 lkb->lkb_recover_seq = ls->ls_recover_seq;
5712
5713 out_unlock:
5714 unlock_rsb(r);
5715 put_rsb(r);
5716 out:
5717 if (error && error != -EEXIST)
5718 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5719 from_nodeid, remid, error);
5720 rl->rl_result = cpu_to_le32(error);
5721 return error;
5722 }
5723
5724
5725 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5726 {
5727 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5728 struct dlm_rsb *r;
5729 struct dlm_lkb *lkb;
5730 uint32_t lkid, remid;
5731 int error, result;
5732
5733 lkid = le32_to_cpu(rl->rl_lkid);
5734 remid = le32_to_cpu(rl->rl_remid);
5735 result = le32_to_cpu(rl->rl_result);
5736
5737 error = find_lkb(ls, lkid, &lkb);
5738 if (error) {
5739 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5740 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5741 result);
5742 return error;
5743 }
5744
5745 r = lkb->lkb_resource;
5746 hold_rsb(r);
5747 lock_rsb(r);
5748
5749 if (!is_process_copy(lkb)) {
5750 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5751 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5752 result);
5753 dlm_dump_rsb(r);
5754 unlock_rsb(r);
5755 put_rsb(r);
5756 dlm_put_lkb(lkb);
5757 return -EINVAL;
5758 }
5759
5760 switch (result) {
5761 case -EBADR:
5762
5763
5764
5765
5766 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5767 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5768 result);
5769
5770 dlm_send_rcom_lock(r, lkb);
5771 goto out;
5772 case -EEXIST:
5773 case 0:
5774 lkb->lkb_remid = remid;
5775 break;
5776 default:
5777 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5778 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5779 result);
5780 }
5781
5782
5783
5784 dlm_recovered_lock(r);
5785 out:
5786 unlock_rsb(r);
5787 put_rsb(r);
5788 dlm_put_lkb(lkb);
5789
5790 return 0;
5791 }
5792
5793 #ifdef CONFIG_DLM_DEPRECATED_API
5794 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5795 int mode, uint32_t flags, void *name, unsigned int namelen,
5796 unsigned long timeout_cs)
5797 #else
5798 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5799 int mode, uint32_t flags, void *name, unsigned int namelen)
5800 #endif
5801 {
5802 struct dlm_lkb *lkb;
5803 struct dlm_args args;
5804 int error;
5805
5806 dlm_lock_recovery(ls);
5807
5808 error = create_lkb(ls, &lkb);
5809 if (error) {
5810 kfree(ua);
5811 goto out;
5812 }
5813
5814 if (flags & DLM_LKF_VALBLK) {
5815 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5816 if (!ua->lksb.sb_lvbptr) {
5817 kfree(ua);
5818 __put_lkb(ls, lkb);
5819 error = -ENOMEM;
5820 goto out;
5821 }
5822 }
5823 #ifdef CONFIG_DLM_DEPRECATED_API
5824 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5825 fake_astfn, ua, fake_bastfn, &args);
5826 #else
5827 error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5828 fake_bastfn, &args);
5829 #endif
5830 if (error) {
5831 kfree(ua->lksb.sb_lvbptr);
5832 ua->lksb.sb_lvbptr = NULL;
5833 kfree(ua);
5834 __put_lkb(ls, lkb);
5835 goto out;
5836 }
5837
5838
5839
5840
5841 lkb->lkb_flags |= DLM_IFL_USER;
5842 error = request_lock(ls, lkb, name, namelen, &args);
5843
5844 switch (error) {
5845 case 0:
5846 break;
5847 case -EINPROGRESS:
5848 error = 0;
5849 break;
5850 case -EAGAIN:
5851 error = 0;
5852 fallthrough;
5853 default:
5854 __put_lkb(ls, lkb);
5855 goto out;
5856 }
5857
5858
5859 spin_lock(&ua->proc->locks_spin);
5860 hold_lkb(lkb);
5861 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5862 spin_unlock(&ua->proc->locks_spin);
5863 out:
5864 dlm_unlock_recovery(ls);
5865 return error;
5866 }
5867
5868 #ifdef CONFIG_DLM_DEPRECATED_API
5869 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5870 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5871 unsigned long timeout_cs)
5872 #else
5873 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5874 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5875 #endif
5876 {
5877 struct dlm_lkb *lkb;
5878 struct dlm_args args;
5879 struct dlm_user_args *ua;
5880 int error;
5881
5882 dlm_lock_recovery(ls);
5883
5884 error = find_lkb(ls, lkid, &lkb);
5885 if (error)
5886 goto out;
5887
5888
5889
5890
5891 ua = lkb->lkb_ua;
5892
5893 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5894 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5895 if (!ua->lksb.sb_lvbptr) {
5896 error = -ENOMEM;
5897 goto out_put;
5898 }
5899 }
5900 if (lvb_in && ua->lksb.sb_lvbptr)
5901 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5902
5903 ua->xid = ua_tmp->xid;
5904 ua->castparam = ua_tmp->castparam;
5905 ua->castaddr = ua_tmp->castaddr;
5906 ua->bastparam = ua_tmp->bastparam;
5907 ua->bastaddr = ua_tmp->bastaddr;
5908 ua->user_lksb = ua_tmp->user_lksb;
5909
5910 #ifdef CONFIG_DLM_DEPRECATED_API
5911 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5912 fake_astfn, ua, fake_bastfn, &args);
5913 #else
5914 error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5915 fake_bastfn, &args);
5916 #endif
5917 if (error)
5918 goto out_put;
5919
5920 error = convert_lock(ls, lkb, &args);
5921
5922 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5923 error = 0;
5924 out_put:
5925 dlm_put_lkb(lkb);
5926 out:
5927 dlm_unlock_recovery(ls);
5928 kfree(ua_tmp);
5929 return error;
5930 }
5931
5932
5933
5934
5935
5936
5937
5938 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5939 int mode, uint32_t flags, void *name, unsigned int namelen,
5940 uint32_t *lkid)
5941 {
5942 struct dlm_lkb *lkb = NULL, *iter;
5943 struct dlm_user_args *ua;
5944 int found_other_mode = 0;
5945 int rv = 0;
5946
5947 mutex_lock(&ls->ls_orphans_mutex);
5948 list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5949 if (iter->lkb_resource->res_length != namelen)
5950 continue;
5951 if (memcmp(iter->lkb_resource->res_name, name, namelen))
5952 continue;
5953 if (iter->lkb_grmode != mode) {
5954 found_other_mode = 1;
5955 continue;
5956 }
5957
5958 lkb = iter;
5959 list_del_init(&iter->lkb_ownqueue);
5960 iter->lkb_flags &= ~DLM_IFL_ORPHAN;
5961 *lkid = iter->lkb_id;
5962 break;
5963 }
5964 mutex_unlock(&ls->ls_orphans_mutex);
5965
5966 if (!lkb && found_other_mode) {
5967 rv = -EAGAIN;
5968 goto out;
5969 }
5970
5971 if (!lkb) {
5972 rv = -ENOENT;
5973 goto out;
5974 }
5975
5976 lkb->lkb_exflags = flags;
5977 lkb->lkb_ownpid = (int) current->pid;
5978
5979 ua = lkb->lkb_ua;
5980
5981 ua->proc = ua_tmp->proc;
5982 ua->xid = ua_tmp->xid;
5983 ua->castparam = ua_tmp->castparam;
5984 ua->castaddr = ua_tmp->castaddr;
5985 ua->bastparam = ua_tmp->bastparam;
5986 ua->bastaddr = ua_tmp->bastaddr;
5987 ua->user_lksb = ua_tmp->user_lksb;
5988
5989
5990
5991
5992
5993
5994
5995 spin_lock(&ua->proc->locks_spin);
5996 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5997 spin_unlock(&ua->proc->locks_spin);
5998 out:
5999 kfree(ua_tmp);
6000 return rv;
6001 }
6002
6003 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6004 uint32_t flags, uint32_t lkid, char *lvb_in)
6005 {
6006 struct dlm_lkb *lkb;
6007 struct dlm_args args;
6008 struct dlm_user_args *ua;
6009 int error;
6010
6011 dlm_lock_recovery(ls);
6012
6013 error = find_lkb(ls, lkid, &lkb);
6014 if (error)
6015 goto out;
6016
6017 ua = lkb->lkb_ua;
6018
6019 if (lvb_in && ua->lksb.sb_lvbptr)
6020 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
6021 if (ua_tmp->castparam)
6022 ua->castparam = ua_tmp->castparam;
6023 ua->user_lksb = ua_tmp->user_lksb;
6024
6025 error = set_unlock_args(flags, ua, &args);
6026 if (error)
6027 goto out_put;
6028
6029 error = unlock_lock(ls, lkb, &args);
6030
6031 if (error == -DLM_EUNLOCK)
6032 error = 0;
6033
6034 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
6035 error = 0;
6036 if (error)
6037 goto out_put;
6038
6039 spin_lock(&ua->proc->locks_spin);
6040
6041 if (!list_empty(&lkb->lkb_ownqueue))
6042 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
6043 spin_unlock(&ua->proc->locks_spin);
6044 out_put:
6045 dlm_put_lkb(lkb);
6046 out:
6047 dlm_unlock_recovery(ls);
6048 kfree(ua_tmp);
6049 return error;
6050 }
6051
6052 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6053 uint32_t flags, uint32_t lkid)
6054 {
6055 struct dlm_lkb *lkb;
6056 struct dlm_args args;
6057 struct dlm_user_args *ua;
6058 int error;
6059
6060 dlm_lock_recovery(ls);
6061
6062 error = find_lkb(ls, lkid, &lkb);
6063 if (error)
6064 goto out;
6065
6066 ua = lkb->lkb_ua;
6067 if (ua_tmp->castparam)
6068 ua->castparam = ua_tmp->castparam;
6069 ua->user_lksb = ua_tmp->user_lksb;
6070
6071 error = set_unlock_args(flags, ua, &args);
6072 if (error)
6073 goto out_put;
6074
6075 error = cancel_lock(ls, lkb, &args);
6076
6077 if (error == -DLM_ECANCEL)
6078 error = 0;
6079
6080 if (error == -EBUSY)
6081 error = 0;
6082 out_put:
6083 dlm_put_lkb(lkb);
6084 out:
6085 dlm_unlock_recovery(ls);
6086 kfree(ua_tmp);
6087 return error;
6088 }
6089
6090 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6091 {
6092 struct dlm_lkb *lkb;
6093 struct dlm_args args;
6094 struct dlm_user_args *ua;
6095 struct dlm_rsb *r;
6096 int error;
6097
6098 dlm_lock_recovery(ls);
6099
6100 error = find_lkb(ls, lkid, &lkb);
6101 if (error)
6102 goto out;
6103
6104 ua = lkb->lkb_ua;
6105
6106 error = set_unlock_args(flags, ua, &args);
6107 if (error)
6108 goto out_put;
6109
6110
6111
6112 r = lkb->lkb_resource;
6113 hold_rsb(r);
6114 lock_rsb(r);
6115
6116 error = validate_unlock_args(lkb, &args);
6117 if (error)
6118 goto out_r;
6119 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
6120
6121 error = _cancel_lock(r, lkb);
6122 out_r:
6123 unlock_rsb(r);
6124 put_rsb(r);
6125
6126 if (error == -DLM_ECANCEL)
6127 error = 0;
6128
6129 if (error == -EBUSY)
6130 error = 0;
6131 out_put:
6132 dlm_put_lkb(lkb);
6133 out:
6134 dlm_unlock_recovery(ls);
6135 return error;
6136 }
6137
6138
6139
6140
6141 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6142 {
6143 struct dlm_args args;
6144 int error;
6145
6146 hold_lkb(lkb);
6147 mutex_lock(&ls->ls_orphans_mutex);
6148 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6149 mutex_unlock(&ls->ls_orphans_mutex);
6150
6151 set_unlock_args(0, lkb->lkb_ua, &args);
6152
6153 error = cancel_lock(ls, lkb, &args);
6154 if (error == -DLM_ECANCEL)
6155 error = 0;
6156 return error;
6157 }
6158
6159
6160
6161
6162
6163
6164 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6165 {
6166 struct dlm_args args;
6167 int error;
6168
6169 set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6170 lkb->lkb_ua, &args);
6171
6172 error = unlock_lock(ls, lkb, &args);
6173 if (error == -DLM_EUNLOCK)
6174 error = 0;
6175 return error;
6176 }
6177
6178
6179
6180
6181
6182 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6183 struct dlm_user_proc *proc)
6184 {
6185 struct dlm_lkb *lkb = NULL;
6186
6187 mutex_lock(&ls->ls_clear_proc_locks);
6188 if (list_empty(&proc->locks))
6189 goto out;
6190
6191 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6192 list_del_init(&lkb->lkb_ownqueue);
6193
6194 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6195 lkb->lkb_flags |= DLM_IFL_ORPHAN;
6196 else
6197 lkb->lkb_flags |= DLM_IFL_DEAD;
6198 out:
6199 mutex_unlock(&ls->ls_clear_proc_locks);
6200 return lkb;
6201 }
6202
6203
6204
6205
6206
6207
6208
6209
6210
6211
6212
6213 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6214 {
6215 struct dlm_lkb *lkb, *safe;
6216
6217 dlm_lock_recovery(ls);
6218
6219 while (1) {
6220 lkb = del_proc_lock(ls, proc);
6221 if (!lkb)
6222 break;
6223 del_timeout(lkb);
6224 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6225 orphan_proc_lock(ls, lkb);
6226 else
6227 unlock_proc_lock(ls, lkb);
6228
6229
6230
6231
6232
6233 dlm_put_lkb(lkb);
6234 }
6235
6236 mutex_lock(&ls->ls_clear_proc_locks);
6237
6238
6239 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6240 list_del_init(&lkb->lkb_ownqueue);
6241 lkb->lkb_flags |= DLM_IFL_DEAD;
6242 dlm_put_lkb(lkb);
6243 }
6244
6245 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6246 memset(&lkb->lkb_callbacks, 0,
6247 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6248 list_del_init(&lkb->lkb_cb_list);
6249 dlm_put_lkb(lkb);
6250 }
6251
6252 mutex_unlock(&ls->ls_clear_proc_locks);
6253 dlm_unlock_recovery(ls);
6254 }
6255
6256 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6257 {
6258 struct dlm_lkb *lkb, *safe;
6259
6260 while (1) {
6261 lkb = NULL;
6262 spin_lock(&proc->locks_spin);
6263 if (!list_empty(&proc->locks)) {
6264 lkb = list_entry(proc->locks.next, struct dlm_lkb,
6265 lkb_ownqueue);
6266 list_del_init(&lkb->lkb_ownqueue);
6267 }
6268 spin_unlock(&proc->locks_spin);
6269
6270 if (!lkb)
6271 break;
6272
6273 lkb->lkb_flags |= DLM_IFL_DEAD;
6274 unlock_proc_lock(ls, lkb);
6275 dlm_put_lkb(lkb);
6276 }
6277
6278 spin_lock(&proc->locks_spin);
6279 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6280 list_del_init(&lkb->lkb_ownqueue);
6281 lkb->lkb_flags |= DLM_IFL_DEAD;
6282 dlm_put_lkb(lkb);
6283 }
6284 spin_unlock(&proc->locks_spin);
6285
6286 spin_lock(&proc->asts_spin);
6287 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6288 memset(&lkb->lkb_callbacks, 0,
6289 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6290 list_del_init(&lkb->lkb_cb_list);
6291 dlm_put_lkb(lkb);
6292 }
6293 spin_unlock(&proc->asts_spin);
6294 }
6295
6296
6297
6298 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6299 {
6300 struct dlm_lkb *lkb, *safe;
6301
6302 mutex_lock(&ls->ls_orphans_mutex);
6303 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6304 if (pid && lkb->lkb_ownpid != pid)
6305 continue;
6306 unlock_proc_lock(ls, lkb);
6307 list_del_init(&lkb->lkb_ownqueue);
6308 dlm_put_lkb(lkb);
6309 }
6310 mutex_unlock(&ls->ls_orphans_mutex);
6311 }
6312
6313 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6314 {
6315 struct dlm_message *ms;
6316 struct dlm_mhandle *mh;
6317 int error;
6318
6319 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6320 DLM_MSG_PURGE, &ms, &mh);
6321 if (error)
6322 return error;
6323 ms->m_nodeid = cpu_to_le32(nodeid);
6324 ms->m_pid = cpu_to_le32(pid);
6325
6326 return send_message(mh, ms);
6327 }
6328
6329 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6330 int nodeid, int pid)
6331 {
6332 int error = 0;
6333
6334 if (nodeid && (nodeid != dlm_our_nodeid())) {
6335 error = send_purge(ls, nodeid, pid);
6336 } else {
6337 dlm_lock_recovery(ls);
6338 if (pid == current->pid)
6339 purge_proc_locks(ls, proc);
6340 else
6341 do_purge(ls, nodeid, pid);
6342 dlm_unlock_recovery(ls);
6343 }
6344 return error;
6345 }
6346
6347
6348 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6349 int lkb_nodeid, unsigned int lkb_flags, int lkb_status)
6350 {
6351 struct dlm_lksb *lksb;
6352 struct dlm_lkb *lkb;
6353 struct dlm_rsb *r;
6354 int error;
6355
6356
6357 if (lkb_flags & DLM_IFL_USER)
6358 return -EOPNOTSUPP;
6359
6360 lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6361 if (!lksb)
6362 return -ENOMEM;
6363
6364 error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6365 if (error) {
6366 kfree(lksb);
6367 return error;
6368 }
6369
6370 lkb->lkb_flags = lkb_flags;
6371 lkb->lkb_nodeid = lkb_nodeid;
6372 lkb->lkb_lksb = lksb;
6373
6374 if (~lkb_flags & DLM_IFL_USER)
6375 lkb->lkb_astparam = (void *)0xDEADBEEF;
6376
6377 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6378 if (error) {
6379 kfree(lksb);
6380 __put_lkb(ls, lkb);
6381 return error;
6382 }
6383
6384 lock_rsb(r);
6385 attach_lkb(r, lkb);
6386 add_lkb(r, lkb, lkb_status);
6387 unlock_rsb(r);
6388 put_rsb(r);
6389
6390 return 0;
6391 }
6392
6393 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6394 int mstype, int to_nodeid)
6395 {
6396 struct dlm_lkb *lkb;
6397 int error;
6398
6399 error = find_lkb(ls, lkb_id, &lkb);
6400 if (error)
6401 return error;
6402
6403 error = add_to_waiters(lkb, mstype, to_nodeid);
6404 dlm_put_lkb(lkb);
6405 return error;
6406 }
6407