0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #include <linux/module.h>
0012 #include <linux/fs.h>
0013 #include <linux/types.h>
0014 #include <linux/highmem.h>
0015 #include <linux/init.h>
0016 #include <linux/sysctl.h>
0017 #include <linux/random.h>
0018 #include <linux/blkdev.h>
0019 #include <linux/socket.h>
0020 #include <linux/inet.h>
0021 #include <linux/timer.h>
0022 #include <linux/kthread.h>
0023 #include <linux/delay.h>
0024
0025
0026 #include "../cluster/heartbeat.h"
0027 #include "../cluster/nodemanager.h"
0028 #include "../cluster/tcp.h"
0029
0030 #include "dlmapi.h"
0031 #include "dlmcommon.h"
0032 #include "dlmdomain.h"
0033
0034 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD)
0035 #include "../cluster/masklog.h"
0036
0037 static int dlm_thread(void *data);
0038 static void dlm_flush_asts(struct dlm_ctxt *dlm);
0039
0040
0041
0042 void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags)
0043 {
0044 DECLARE_WAITQUEUE(wait, current);
0045
0046 assert_spin_locked(&res->spinlock);
0047
0048 add_wait_queue(&res->wq, &wait);
0049 repeat:
0050 set_current_state(TASK_UNINTERRUPTIBLE);
0051 if (res->state & flags) {
0052 spin_unlock(&res->spinlock);
0053 schedule();
0054 spin_lock(&res->spinlock);
0055 goto repeat;
0056 }
0057 remove_wait_queue(&res->wq, &wait);
0058 __set_current_state(TASK_RUNNING);
0059 }
0060
0061 int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
0062 {
0063 if (list_empty(&res->granted) &&
0064 list_empty(&res->converting) &&
0065 list_empty(&res->blocked))
0066 return 0;
0067 return 1;
0068 }
0069
0070
0071
0072
0073
0074 int __dlm_lockres_unused(struct dlm_lock_resource *res)
0075 {
0076 int bit;
0077
0078 assert_spin_locked(&res->spinlock);
0079
0080 if (__dlm_lockres_has_locks(res))
0081 return 0;
0082
0083
0084 if (res->inflight_locks)
0085 return 0;
0086
0087 if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY)
0088 return 0;
0089
0090 if (res->state & (DLM_LOCK_RES_RECOVERING|
0091 DLM_LOCK_RES_RECOVERY_WAITING))
0092 return 0;
0093
0094
0095 bit = find_first_bit(res->refmap, O2NM_MAX_NODES);
0096 if (bit < O2NM_MAX_NODES)
0097 return 0;
0098
0099 return 1;
0100 }
0101
0102
0103
0104
0105
0106 void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
0107 struct dlm_lock_resource *res)
0108 {
0109 assert_spin_locked(&dlm->spinlock);
0110 assert_spin_locked(&res->spinlock);
0111
0112 if (__dlm_lockres_unused(res)){
0113 if (list_empty(&res->purge)) {
0114 mlog(0, "%s: Adding res %.*s to purge list\n",
0115 dlm->name, res->lockname.len, res->lockname.name);
0116
0117 res->last_used = jiffies;
0118 dlm_lockres_get(res);
0119 list_add_tail(&res->purge, &dlm->purge_list);
0120 dlm->purge_count++;
0121 }
0122 } else if (!list_empty(&res->purge)) {
0123 mlog(0, "%s: Removing res %.*s from purge list\n",
0124 dlm->name, res->lockname.len, res->lockname.name);
0125
0126 list_del_init(&res->purge);
0127 dlm_lockres_put(res);
0128 dlm->purge_count--;
0129 }
0130 }
0131
0132 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
0133 struct dlm_lock_resource *res)
0134 {
0135 spin_lock(&dlm->spinlock);
0136 spin_lock(&res->spinlock);
0137
0138 __dlm_lockres_calc_usage(dlm, res);
0139
0140 spin_unlock(&res->spinlock);
0141 spin_unlock(&dlm->spinlock);
0142 }
0143
0144
0145
0146
0147
0148
0149
0150 void __dlm_do_purge_lockres(struct dlm_ctxt *dlm,
0151 struct dlm_lock_resource *res)
0152 {
0153 assert_spin_locked(&dlm->spinlock);
0154 assert_spin_locked(&res->spinlock);
0155
0156 if (!list_empty(&res->purge)) {
0157 mlog(0, "%s: Removing res %.*s from purgelist\n",
0158 dlm->name, res->lockname.len, res->lockname.name);
0159 list_del_init(&res->purge);
0160 dlm_lockres_put(res);
0161 dlm->purge_count--;
0162 }
0163
0164 if (!__dlm_lockres_unused(res)) {
0165 mlog(ML_ERROR, "%s: res %.*s in use after deref\n",
0166 dlm->name, res->lockname.len, res->lockname.name);
0167 __dlm_print_one_lock_resource(res);
0168 BUG();
0169 }
0170
0171 __dlm_unhash_lockres(dlm, res);
0172
0173 spin_lock(&dlm->track_lock);
0174 if (!list_empty(&res->tracking))
0175 list_del_init(&res->tracking);
0176 else {
0177 mlog(ML_ERROR, "%s: Resource %.*s not on the Tracking list\n",
0178 dlm->name, res->lockname.len, res->lockname.name);
0179 __dlm_print_one_lock_resource(res);
0180 }
0181 spin_unlock(&dlm->track_lock);
0182
0183
0184
0185
0186
0187 res->state &= ~DLM_LOCK_RES_DROPPING_REF;
0188 }
0189
0190 static void dlm_purge_lockres(struct dlm_ctxt *dlm,
0191 struct dlm_lock_resource *res)
0192 {
0193 int master;
0194 int ret = 0;
0195
0196 assert_spin_locked(&dlm->spinlock);
0197 assert_spin_locked(&res->spinlock);
0198
0199 master = (res->owner == dlm->node_num);
0200
0201 mlog(0, "%s: Purging res %.*s, master %d\n", dlm->name,
0202 res->lockname.len, res->lockname.name, master);
0203
0204 if (!master) {
0205 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
0206 mlog(ML_NOTICE, "%s: res %.*s already in DLM_LOCK_RES_DROPPING_REF state\n",
0207 dlm->name, res->lockname.len, res->lockname.name);
0208 spin_unlock(&res->spinlock);
0209 return;
0210 }
0211
0212 res->state |= DLM_LOCK_RES_DROPPING_REF;
0213
0214 spin_unlock(&res->spinlock);
0215 spin_unlock(&dlm->spinlock);
0216
0217 spin_lock(&res->spinlock);
0218
0219 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
0220 spin_unlock(&res->spinlock);
0221
0222
0223 ret = dlm_drop_lockres_ref(dlm, res);
0224 if (ret < 0) {
0225 if (!dlm_is_host_down(ret))
0226 BUG();
0227 }
0228 spin_lock(&dlm->spinlock);
0229 spin_lock(&res->spinlock);
0230 }
0231
0232 if (!list_empty(&res->purge)) {
0233 mlog(0, "%s: Removing res %.*s from purgelist, master %d\n",
0234 dlm->name, res->lockname.len, res->lockname.name, master);
0235 list_del_init(&res->purge);
0236 dlm_lockres_put(res);
0237 dlm->purge_count--;
0238 }
0239
0240 if (!master && ret == DLM_DEREF_RESPONSE_INPROG) {
0241 mlog(0, "%s: deref %.*s in progress\n",
0242 dlm->name, res->lockname.len, res->lockname.name);
0243 spin_unlock(&res->spinlock);
0244 return;
0245 }
0246
0247 if (!__dlm_lockres_unused(res)) {
0248 mlog(ML_ERROR, "%s: res %.*s in use after deref\n",
0249 dlm->name, res->lockname.len, res->lockname.name);
0250 __dlm_print_one_lock_resource(res);
0251 BUG();
0252 }
0253
0254 __dlm_unhash_lockres(dlm, res);
0255
0256 spin_lock(&dlm->track_lock);
0257 if (!list_empty(&res->tracking))
0258 list_del_init(&res->tracking);
0259 else {
0260 mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
0261 res->lockname.len, res->lockname.name);
0262 __dlm_print_one_lock_resource(res);
0263 }
0264 spin_unlock(&dlm->track_lock);
0265
0266
0267
0268 if (!master) {
0269 res->state &= ~DLM_LOCK_RES_DROPPING_REF;
0270 spin_unlock(&res->spinlock);
0271 wake_up(&res->wq);
0272 } else
0273 spin_unlock(&res->spinlock);
0274 }
0275
0276 static void dlm_run_purge_list(struct dlm_ctxt *dlm,
0277 int purge_now)
0278 {
0279 unsigned int run_max, unused;
0280 unsigned long purge_jiffies;
0281 struct dlm_lock_resource *lockres;
0282
0283 spin_lock(&dlm->spinlock);
0284 run_max = dlm->purge_count;
0285
0286 while(run_max && !list_empty(&dlm->purge_list)) {
0287 run_max--;
0288
0289 lockres = list_entry(dlm->purge_list.next,
0290 struct dlm_lock_resource, purge);
0291
0292 spin_lock(&lockres->spinlock);
0293
0294 purge_jiffies = lockres->last_used +
0295 msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
0296
0297
0298
0299 if (!purge_now && time_after(purge_jiffies, jiffies)) {
0300
0301
0302
0303
0304 spin_unlock(&lockres->spinlock);
0305 break;
0306 }
0307
0308
0309
0310
0311
0312 unused = __dlm_lockres_unused(lockres);
0313 if (!unused ||
0314 (lockres->state & DLM_LOCK_RES_MIGRATING) ||
0315 (lockres->inflight_assert_workers != 0)) {
0316 mlog(0, "%s: res %.*s is in use or being remastered, "
0317 "used %d, state %d, assert master workers %u\n",
0318 dlm->name, lockres->lockname.len,
0319 lockres->lockname.name,
0320 !unused, lockres->state,
0321 lockres->inflight_assert_workers);
0322 list_move_tail(&lockres->purge, &dlm->purge_list);
0323 spin_unlock(&lockres->spinlock);
0324 continue;
0325 }
0326
0327 dlm_lockres_get(lockres);
0328
0329 dlm_purge_lockres(dlm, lockres);
0330
0331 dlm_lockres_put(lockres);
0332
0333
0334 cond_resched_lock(&dlm->spinlock);
0335 }
0336
0337 spin_unlock(&dlm->spinlock);
0338 }
0339
0340 static void dlm_shuffle_lists(struct dlm_ctxt *dlm,
0341 struct dlm_lock_resource *res)
0342 {
0343 struct dlm_lock *lock, *target;
0344 int can_grant = 1;
0345
0346
0347
0348
0349
0350
0351
0352 assert_spin_locked(&dlm->ast_lock);
0353 assert_spin_locked(&res->spinlock);
0354 BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING|
0355 DLM_LOCK_RES_RECOVERING|
0356 DLM_LOCK_RES_IN_PROGRESS)));
0357
0358 converting:
0359 if (list_empty(&res->converting))
0360 goto blocked;
0361 mlog(0, "%s: res %.*s has locks on the convert queue\n", dlm->name,
0362 res->lockname.len, res->lockname.name);
0363
0364 target = list_entry(res->converting.next, struct dlm_lock, list);
0365 if (target->ml.convert_type == LKM_IVMODE) {
0366 mlog(ML_ERROR, "%s: res %.*s converting lock to invalid mode\n",
0367 dlm->name, res->lockname.len, res->lockname.name);
0368 BUG();
0369 }
0370 list_for_each_entry(lock, &res->granted, list) {
0371 if (lock==target)
0372 continue;
0373 if (!dlm_lock_compatible(lock->ml.type,
0374 target->ml.convert_type)) {
0375 can_grant = 0;
0376
0377 if (lock->ml.highest_blocked == LKM_IVMODE) {
0378 __dlm_lockres_reserve_ast(res);
0379 __dlm_queue_bast(dlm, lock);
0380 }
0381
0382 if (lock->ml.highest_blocked < target->ml.convert_type)
0383 lock->ml.highest_blocked =
0384 target->ml.convert_type;
0385 }
0386 }
0387
0388 list_for_each_entry(lock, &res->converting, list) {
0389 if (lock==target)
0390 continue;
0391 if (!dlm_lock_compatible(lock->ml.type,
0392 target->ml.convert_type)) {
0393 can_grant = 0;
0394 if (lock->ml.highest_blocked == LKM_IVMODE) {
0395 __dlm_lockres_reserve_ast(res);
0396 __dlm_queue_bast(dlm, lock);
0397 }
0398 if (lock->ml.highest_blocked < target->ml.convert_type)
0399 lock->ml.highest_blocked =
0400 target->ml.convert_type;
0401 }
0402 }
0403
0404
0405 if (can_grant) {
0406 spin_lock(&target->spinlock);
0407 BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
0408
0409 mlog(0, "%s: res %.*s, AST for Converting lock %u:%llu, type "
0410 "%d => %d, node %u\n", dlm->name, res->lockname.len,
0411 res->lockname.name,
0412 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)),
0413 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)),
0414 target->ml.type,
0415 target->ml.convert_type, target->ml.node);
0416
0417 target->ml.type = target->ml.convert_type;
0418 target->ml.convert_type = LKM_IVMODE;
0419 list_move_tail(&target->list, &res->granted);
0420
0421 BUG_ON(!target->lksb);
0422 target->lksb->status = DLM_NORMAL;
0423
0424 spin_unlock(&target->spinlock);
0425
0426 __dlm_lockres_reserve_ast(res);
0427 __dlm_queue_ast(dlm, target);
0428
0429 goto converting;
0430 }
0431
0432 blocked:
0433 if (list_empty(&res->blocked))
0434 goto leave;
0435 target = list_entry(res->blocked.next, struct dlm_lock, list);
0436
0437 list_for_each_entry(lock, &res->granted, list) {
0438 if (lock==target)
0439 continue;
0440 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
0441 can_grant = 0;
0442 if (lock->ml.highest_blocked == LKM_IVMODE) {
0443 __dlm_lockres_reserve_ast(res);
0444 __dlm_queue_bast(dlm, lock);
0445 }
0446 if (lock->ml.highest_blocked < target->ml.type)
0447 lock->ml.highest_blocked = target->ml.type;
0448 }
0449 }
0450
0451 list_for_each_entry(lock, &res->converting, list) {
0452 if (lock==target)
0453 continue;
0454 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
0455 can_grant = 0;
0456 if (lock->ml.highest_blocked == LKM_IVMODE) {
0457 __dlm_lockres_reserve_ast(res);
0458 __dlm_queue_bast(dlm, lock);
0459 }
0460 if (lock->ml.highest_blocked < target->ml.type)
0461 lock->ml.highest_blocked = target->ml.type;
0462 }
0463 }
0464
0465
0466
0467 if (can_grant) {
0468 spin_lock(&target->spinlock);
0469 BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
0470
0471 mlog(0, "%s: res %.*s, AST for Blocked lock %u:%llu, type %d, "
0472 "node %u\n", dlm->name, res->lockname.len,
0473 res->lockname.name,
0474 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)),
0475 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)),
0476 target->ml.type, target->ml.node);
0477
0478
0479 list_move_tail(&target->list, &res->granted);
0480
0481 BUG_ON(!target->lksb);
0482 target->lksb->status = DLM_NORMAL;
0483
0484 spin_unlock(&target->spinlock);
0485
0486 __dlm_lockres_reserve_ast(res);
0487 __dlm_queue_ast(dlm, target);
0488
0489 goto converting;
0490 }
0491
0492 leave:
0493 return;
0494 }
0495
0496
0497 void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
0498 {
0499 if (res) {
0500 spin_lock(&dlm->spinlock);
0501 spin_lock(&res->spinlock);
0502 __dlm_dirty_lockres(dlm, res);
0503 spin_unlock(&res->spinlock);
0504 spin_unlock(&dlm->spinlock);
0505 }
0506 wake_up(&dlm->dlm_thread_wq);
0507 }
0508
0509 void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
0510 {
0511 assert_spin_locked(&dlm->spinlock);
0512 assert_spin_locked(&res->spinlock);
0513
0514
0515 if (res->owner == dlm->node_num) {
0516 if (res->state & (DLM_LOCK_RES_MIGRATING |
0517 DLM_LOCK_RES_BLOCK_DIRTY))
0518 return;
0519
0520 if (list_empty(&res->dirty)) {
0521
0522 dlm_lockres_get(res);
0523 list_add_tail(&res->dirty, &dlm->dirty_list);
0524 res->state |= DLM_LOCK_RES_DIRTY;
0525 }
0526 }
0527
0528 mlog(0, "%s: res %.*s\n", dlm->name, res->lockname.len,
0529 res->lockname.name);
0530 }
0531
0532
0533
0534 int dlm_launch_thread(struct dlm_ctxt *dlm)
0535 {
0536 mlog(0, "Starting dlm_thread...\n");
0537
0538 dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm-%s",
0539 dlm->name);
0540 if (IS_ERR(dlm->dlm_thread_task)) {
0541 mlog_errno(PTR_ERR(dlm->dlm_thread_task));
0542 dlm->dlm_thread_task = NULL;
0543 return -EINVAL;
0544 }
0545
0546 return 0;
0547 }
0548
0549 void dlm_complete_thread(struct dlm_ctxt *dlm)
0550 {
0551 if (dlm->dlm_thread_task) {
0552 mlog(ML_KTHREAD, "Waiting for dlm thread to exit\n");
0553 kthread_stop(dlm->dlm_thread_task);
0554 dlm->dlm_thread_task = NULL;
0555 }
0556 }
0557
0558 static int dlm_dirty_list_empty(struct dlm_ctxt *dlm)
0559 {
0560 int empty;
0561
0562 spin_lock(&dlm->spinlock);
0563 empty = list_empty(&dlm->dirty_list);
0564 spin_unlock(&dlm->spinlock);
0565
0566 return empty;
0567 }
0568
0569 static void dlm_flush_asts(struct dlm_ctxt *dlm)
0570 {
0571 int ret;
0572 struct dlm_lock *lock;
0573 struct dlm_lock_resource *res;
0574 u8 hi;
0575
0576 spin_lock(&dlm->ast_lock);
0577 while (!list_empty(&dlm->pending_asts)) {
0578 lock = list_entry(dlm->pending_asts.next,
0579 struct dlm_lock, ast_list);
0580
0581 dlm_lock_get(lock);
0582 res = lock->lockres;
0583 mlog(0, "%s: res %.*s, Flush AST for lock %u:%llu, type %d, "
0584 "node %u\n", dlm->name, res->lockname.len,
0585 res->lockname.name,
0586 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
0587 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
0588 lock->ml.type, lock->ml.node);
0589
0590 BUG_ON(!lock->ast_pending);
0591
0592
0593 list_del_init(&lock->ast_list);
0594 dlm_lock_put(lock);
0595 spin_unlock(&dlm->ast_lock);
0596
0597 if (lock->ml.node != dlm->node_num) {
0598 ret = dlm_do_remote_ast(dlm, res, lock);
0599 if (ret < 0)
0600 mlog_errno(ret);
0601 } else
0602 dlm_do_local_ast(dlm, res, lock);
0603
0604 spin_lock(&dlm->ast_lock);
0605
0606
0607
0608 if (!list_empty(&lock->ast_list)) {
0609 mlog(0, "%s: res %.*s, AST queued while flushing last "
0610 "one\n", dlm->name, res->lockname.len,
0611 res->lockname.name);
0612 } else
0613 lock->ast_pending = 0;
0614
0615
0616
0617 dlm_lock_put(lock);
0618 dlm_lockres_release_ast(dlm, res);
0619 }
0620
0621 while (!list_empty(&dlm->pending_basts)) {
0622 lock = list_entry(dlm->pending_basts.next,
0623 struct dlm_lock, bast_list);
0624
0625 dlm_lock_get(lock);
0626 res = lock->lockres;
0627
0628 BUG_ON(!lock->bast_pending);
0629
0630
0631 spin_lock(&lock->spinlock);
0632 BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE);
0633 hi = lock->ml.highest_blocked;
0634 lock->ml.highest_blocked = LKM_IVMODE;
0635 spin_unlock(&lock->spinlock);
0636
0637
0638 list_del_init(&lock->bast_list);
0639 dlm_lock_put(lock);
0640 spin_unlock(&dlm->ast_lock);
0641
0642 mlog(0, "%s: res %.*s, Flush BAST for lock %u:%llu, "
0643 "blocked %d, node %u\n",
0644 dlm->name, res->lockname.len, res->lockname.name,
0645 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
0646 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
0647 hi, lock->ml.node);
0648
0649 if (lock->ml.node != dlm->node_num) {
0650 ret = dlm_send_proxy_bast(dlm, res, lock, hi);
0651 if (ret < 0)
0652 mlog_errno(ret);
0653 } else
0654 dlm_do_local_bast(dlm, res, lock, hi);
0655
0656 spin_lock(&dlm->ast_lock);
0657
0658
0659
0660 if (!list_empty(&lock->bast_list)) {
0661 mlog(0, "%s: res %.*s, BAST queued while flushing last "
0662 "one\n", dlm->name, res->lockname.len,
0663 res->lockname.name);
0664 } else
0665 lock->bast_pending = 0;
0666
0667
0668
0669 dlm_lock_put(lock);
0670 dlm_lockres_release_ast(dlm, res);
0671 }
0672 wake_up(&dlm->ast_wq);
0673 spin_unlock(&dlm->ast_lock);
0674 }
0675
0676
0677 #define DLM_THREAD_TIMEOUT_MS (4 * 1000)
0678 #define DLM_THREAD_MAX_DIRTY 100
0679
0680 static int dlm_thread(void *data)
0681 {
0682 struct dlm_lock_resource *res;
0683 struct dlm_ctxt *dlm = data;
0684 unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS);
0685
0686 mlog(0, "dlm thread running for %s...\n", dlm->name);
0687
0688 while (!kthread_should_stop()) {
0689 int n = DLM_THREAD_MAX_DIRTY;
0690
0691
0692
0693
0694
0695 dlm_run_purge_list(dlm, dlm_shutting_down(dlm));
0696
0697
0698
0699
0700
0701
0702
0703
0704 spin_lock(&dlm->spinlock);
0705 while (!list_empty(&dlm->dirty_list)) {
0706 int delay = 0;
0707 res = list_entry(dlm->dirty_list.next,
0708 struct dlm_lock_resource, dirty);
0709
0710
0711
0712 BUG_ON(!res);
0713 dlm_lockres_get(res);
0714
0715 spin_lock(&res->spinlock);
0716
0717 list_del_init(&res->dirty);
0718 spin_unlock(&res->spinlock);
0719 spin_unlock(&dlm->spinlock);
0720
0721 dlm_lockres_put(res);
0722
0723
0724
0725
0726 spin_lock(&dlm->ast_lock);
0727 spin_lock(&res->spinlock);
0728 if (res->owner != dlm->node_num) {
0729 __dlm_print_one_lock_resource(res);
0730 mlog(ML_ERROR, "%s: inprog %d, mig %d, reco %d,"
0731 " dirty %d\n", dlm->name,
0732 !!(res->state & DLM_LOCK_RES_IN_PROGRESS),
0733 !!(res->state & DLM_LOCK_RES_MIGRATING),
0734 !!(res->state & DLM_LOCK_RES_RECOVERING),
0735 !!(res->state & DLM_LOCK_RES_DIRTY));
0736 }
0737 BUG_ON(res->owner != dlm->node_num);
0738
0739
0740
0741
0742 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
0743 if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
0744 DLM_LOCK_RES_RECOVERING |
0745 DLM_LOCK_RES_RECOVERY_WAITING)) {
0746
0747 res->state &= ~DLM_LOCK_RES_DIRTY;
0748 spin_unlock(&res->spinlock);
0749 spin_unlock(&dlm->ast_lock);
0750 mlog(0, "%s: res %.*s, inprogress, delay list "
0751 "shuffle, state %d\n", dlm->name,
0752 res->lockname.len, res->lockname.name,
0753 res->state);
0754 delay = 1;
0755 goto in_progress;
0756 }
0757
0758
0759
0760
0761
0762
0763
0764 dlm_shuffle_lists(dlm, res);
0765 res->state &= ~DLM_LOCK_RES_DIRTY;
0766 spin_unlock(&res->spinlock);
0767 spin_unlock(&dlm->ast_lock);
0768
0769 dlm_lockres_calc_usage(dlm, res);
0770
0771 in_progress:
0772
0773 spin_lock(&dlm->spinlock);
0774
0775
0776 if (delay) {
0777 spin_lock(&res->spinlock);
0778 __dlm_dirty_lockres(dlm, res);
0779 spin_unlock(&res->spinlock);
0780 }
0781 dlm_lockres_put(res);
0782
0783
0784
0785 if (!--n) {
0786 mlog(0, "%s: Throttling dlm thread\n",
0787 dlm->name);
0788 break;
0789 }
0790 }
0791
0792 spin_unlock(&dlm->spinlock);
0793 dlm_flush_asts(dlm);
0794
0795
0796 if (!n) {
0797 cond_resched();
0798 continue;
0799 }
0800
0801 wait_event_interruptible_timeout(dlm->dlm_thread_wq,
0802 !dlm_dirty_list_empty(dlm) ||
0803 kthread_should_stop(),
0804 timeout);
0805 }
0806
0807 mlog(0, "quitting DLM thread\n");
0808 return 0;
0809 }