0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 #include "dlm_internal.h"
0013 #include "lockspace.h"
0014 #include "member.h"
0015 #include "dir.h"
0016 #include "ast.h"
0017 #include "recover.h"
0018 #include "lowcomms.h"
0019 #include "lock.h"
0020 #include "requestqueue.h"
0021 #include "recoverd.h"
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031 static int enable_locking(struct dlm_ls *ls, uint64_t seq)
0032 {
0033 int error = -EINTR;
0034
0035 down_write(&ls->ls_recv_active);
0036
0037 spin_lock(&ls->ls_recover_lock);
0038 if (ls->ls_recover_seq == seq) {
0039 set_bit(LSFL_RUNNING, &ls->ls_flags);
0040
0041 up_write(&ls->ls_in_recovery);
0042 clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
0043 error = 0;
0044 }
0045 spin_unlock(&ls->ls_recover_lock);
0046
0047 up_write(&ls->ls_recv_active);
0048 return error;
0049 }
0050
0051 static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
0052 {
0053 unsigned long start;
0054 int error, neg = 0;
0055
0056 log_rinfo(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
0057
0058 mutex_lock(&ls->ls_recoverd_active);
0059
0060 dlm_callback_suspend(ls);
0061
0062 dlm_clear_toss(ls);
0063
0064
0065
0066
0067
0068
0069 dlm_create_root_list(ls);
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079 error = dlm_recover_members(ls, rv, &neg);
0080 if (error) {
0081 log_rinfo(ls, "dlm_recover_members error %d", error);
0082 goto fail;
0083 }
0084
0085 dlm_recover_dir_nodeid(ls);
0086
0087 ls->ls_recover_dir_sent_res = 0;
0088 ls->ls_recover_dir_sent_msg = 0;
0089 ls->ls_recover_locks_in = 0;
0090
0091 dlm_set_recover_status(ls, DLM_RS_NODES);
0092
0093 error = dlm_recover_members_wait(ls);
0094 if (error) {
0095 log_rinfo(ls, "dlm_recover_members_wait error %d", error);
0096 goto fail;
0097 }
0098
0099 start = jiffies;
0100
0101
0102
0103
0104
0105
0106 error = dlm_recover_directory(ls);
0107 if (error) {
0108 log_rinfo(ls, "dlm_recover_directory error %d", error);
0109 goto fail;
0110 }
0111
0112 dlm_set_recover_status(ls, DLM_RS_DIR);
0113
0114 error = dlm_recover_directory_wait(ls);
0115 if (error) {
0116 log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
0117 goto fail;
0118 }
0119
0120 log_rinfo(ls, "dlm_recover_directory %u out %u messages",
0121 ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
0122
0123
0124
0125
0126
0127
0128
0129 dlm_recover_waiters_pre(ls);
0130
0131 if (dlm_recovery_stopped(ls)) {
0132 error = -EINTR;
0133 goto fail;
0134 }
0135
0136 if (neg || dlm_no_directory(ls)) {
0137
0138
0139
0140
0141 dlm_recover_purge(ls);
0142
0143
0144
0145
0146
0147
0148 error = dlm_recover_masters(ls);
0149 if (error) {
0150 log_rinfo(ls, "dlm_recover_masters error %d", error);
0151 goto fail;
0152 }
0153
0154
0155
0156
0157
0158 error = dlm_recover_locks(ls);
0159 if (error) {
0160 log_rinfo(ls, "dlm_recover_locks error %d", error);
0161 goto fail;
0162 }
0163
0164 dlm_set_recover_status(ls, DLM_RS_LOCKS);
0165
0166 error = dlm_recover_locks_wait(ls);
0167 if (error) {
0168 log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
0169 goto fail;
0170 }
0171
0172 log_rinfo(ls, "dlm_recover_locks %u in",
0173 ls->ls_recover_locks_in);
0174
0175
0176
0177
0178
0179
0180
0181 dlm_recover_rsbs(ls);
0182 } else {
0183
0184
0185
0186
0187
0188 dlm_set_recover_status(ls, DLM_RS_LOCKS);
0189
0190 error = dlm_recover_locks_wait(ls);
0191 if (error) {
0192 log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
0193 goto fail;
0194 }
0195 }
0196
0197 dlm_release_root_list(ls);
0198
0199
0200
0201
0202
0203
0204
0205 dlm_purge_requestqueue(ls);
0206
0207 dlm_set_recover_status(ls, DLM_RS_DONE);
0208
0209 error = dlm_recover_done_wait(ls);
0210 if (error) {
0211 log_rinfo(ls, "dlm_recover_done_wait error %d", error);
0212 goto fail;
0213 }
0214
0215 dlm_clear_members_gone(ls);
0216
0217 dlm_adjust_timeouts(ls);
0218
0219 dlm_callback_resume(ls);
0220
0221 error = enable_locking(ls, rv->seq);
0222 if (error) {
0223 log_rinfo(ls, "enable_locking error %d", error);
0224 goto fail;
0225 }
0226
0227 error = dlm_process_requestqueue(ls);
0228 if (error) {
0229 log_rinfo(ls, "dlm_process_requestqueue error %d", error);
0230 goto fail;
0231 }
0232
0233 error = dlm_recover_waiters_post(ls);
0234 if (error) {
0235 log_rinfo(ls, "dlm_recover_waiters_post error %d", error);
0236 goto fail;
0237 }
0238
0239 dlm_recover_grant(ls);
0240
0241 log_rinfo(ls, "dlm_recover %llu generation %u done: %u ms",
0242 (unsigned long long)rv->seq, ls->ls_generation,
0243 jiffies_to_msecs(jiffies - start));
0244 mutex_unlock(&ls->ls_recoverd_active);
0245
0246 return 0;
0247
0248 fail:
0249 dlm_release_root_list(ls);
0250 mutex_unlock(&ls->ls_recoverd_active);
0251
0252 return error;
0253 }
0254
0255
0256
0257
0258
0259 static void do_ls_recovery(struct dlm_ls *ls)
0260 {
0261 struct dlm_recover *rv = NULL;
0262 int error;
0263
0264 spin_lock(&ls->ls_recover_lock);
0265 rv = ls->ls_recover_args;
0266 ls->ls_recover_args = NULL;
0267 if (rv && ls->ls_recover_seq == rv->seq)
0268 clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
0269 spin_unlock(&ls->ls_recover_lock);
0270
0271 if (rv) {
0272 error = ls_recover(ls, rv);
0273 switch (error) {
0274 case 0:
0275 ls->ls_recovery_result = 0;
0276 complete(&ls->ls_recovery_done);
0277
0278 dlm_lsop_recover_done(ls);
0279 break;
0280 case -EINTR:
0281
0282
0283
0284 log_rinfo(ls, "%s %llu interrupted and should be queued to run again",
0285 __func__, (unsigned long long)rv->seq);
0286 break;
0287 default:
0288 log_rinfo(ls, "%s %llu error %d", __func__,
0289 (unsigned long long)rv->seq, error);
0290
0291
0292 ls->ls_recovery_result = error;
0293 complete(&ls->ls_recovery_done);
0294 break;
0295 }
0296
0297 kfree(rv->nodes);
0298 kfree(rv);
0299 }
0300 }
0301
0302 static int dlm_recoverd(void *arg)
0303 {
0304 struct dlm_ls *ls;
0305
0306 ls = dlm_find_lockspace_local(arg);
0307 if (!ls) {
0308 log_print("dlm_recoverd: no lockspace %p", arg);
0309 return -1;
0310 }
0311
0312 down_write(&ls->ls_in_recovery);
0313 set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
0314 wake_up(&ls->ls_recover_lock_wait);
0315
0316 while (1) {
0317
0318
0319
0320
0321
0322 set_current_state(TASK_INTERRUPTIBLE);
0323 if (kthread_should_stop()) {
0324 set_current_state(TASK_RUNNING);
0325 break;
0326 }
0327 if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) &&
0328 !test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
0329 if (kthread_should_stop())
0330 break;
0331 schedule();
0332 }
0333 set_current_state(TASK_RUNNING);
0334
0335 if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
0336 down_write(&ls->ls_in_recovery);
0337 set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
0338 wake_up(&ls->ls_recover_lock_wait);
0339 }
0340
0341 if (test_and_clear_bit(LSFL_RECOVER_WORK, &ls->ls_flags))
0342 do_ls_recovery(ls);
0343 }
0344
0345 if (test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags))
0346 up_write(&ls->ls_in_recovery);
0347
0348 dlm_put_lockspace(ls);
0349 return 0;
0350 }
0351
0352 int dlm_recoverd_start(struct dlm_ls *ls)
0353 {
0354 struct task_struct *p;
0355 int error = 0;
0356
0357 p = kthread_run(dlm_recoverd, ls, "dlm_recoverd");
0358 if (IS_ERR(p))
0359 error = PTR_ERR(p);
0360 else
0361 ls->ls_recoverd_task = p;
0362 return error;
0363 }
0364
0365 void dlm_recoverd_stop(struct dlm_ls *ls)
0366 {
0367 kthread_stop(ls->ls_recoverd_task);
0368 }
0369
0370 void dlm_recoverd_suspend(struct dlm_ls *ls)
0371 {
0372 wake_up(&ls->ls_wait_general);
0373 mutex_lock(&ls->ls_recoverd_active);
0374 }
0375
0376 void dlm_recoverd_resume(struct dlm_ls *ls)
0377 {
0378 mutex_unlock(&ls->ls_recoverd_active);
0379 }
0380