Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-only
0002 /******************************************************************************
0003 *******************************************************************************
0004 **
0005 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
0006 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
0007 **
0008 **
0009 *******************************************************************************
0010 ******************************************************************************/
0011 
0012 #include "dlm_internal.h"
0013 #include "lockspace.h"
0014 #include "member.h"
0015 #include "dir.h"
0016 #include "ast.h"
0017 #include "recover.h"
0018 #include "lowcomms.h"
0019 #include "lock.h"
0020 #include "requestqueue.h"
0021 #include "recoverd.h"
0022 
0023 
0024 /* If the start for which we're re-enabling locking (seq) has been superseded
0025    by a newer stop (ls_recover_seq), we need to leave locking disabled.
0026 
0027    We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
0028    locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
0029    enables locking and clears the requestqueue between a and b. */
0030 
0031 static int enable_locking(struct dlm_ls *ls, uint64_t seq)
0032 {
0033     int error = -EINTR;
0034 
0035     down_write(&ls->ls_recv_active);
0036 
0037     spin_lock(&ls->ls_recover_lock);
0038     if (ls->ls_recover_seq == seq) {
0039         set_bit(LSFL_RUNNING, &ls->ls_flags);
0040         /* unblocks processes waiting to enter the dlm */
0041         up_write(&ls->ls_in_recovery);
0042         clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
0043         error = 0;
0044     }
0045     spin_unlock(&ls->ls_recover_lock);
0046 
0047     up_write(&ls->ls_recv_active);
0048     return error;
0049 }
0050 
0051 static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
0052 {
0053     unsigned long start;
0054     int error, neg = 0;
0055 
0056     log_rinfo(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
0057 
0058     mutex_lock(&ls->ls_recoverd_active);
0059 
0060     dlm_callback_suspend(ls);
0061 
0062     dlm_clear_toss(ls);
0063 
0064     /*
0065      * This list of root rsb's will be the basis of most of the recovery
0066      * routines.
0067      */
0068 
0069     dlm_create_root_list(ls);
0070 
0071     /*
0072      * Add or remove nodes from the lockspace's ls_nodes list.
0073      *
0074      * Due to the fact that we must report all membership changes to lsops
0075      * or midcomms layer, it is not permitted to abort ls_recover() until
0076      * this is done.
0077      */
0078 
0079     error = dlm_recover_members(ls, rv, &neg);
0080     if (error) {
0081         log_rinfo(ls, "dlm_recover_members error %d", error);
0082         goto fail;
0083     }
0084 
0085     dlm_recover_dir_nodeid(ls);
0086 
0087     ls->ls_recover_dir_sent_res = 0;
0088     ls->ls_recover_dir_sent_msg = 0;
0089     ls->ls_recover_locks_in = 0;
0090 
0091     dlm_set_recover_status(ls, DLM_RS_NODES);
0092 
0093     error = dlm_recover_members_wait(ls);
0094     if (error) {
0095         log_rinfo(ls, "dlm_recover_members_wait error %d", error);
0096         goto fail;
0097     }
0098 
0099     start = jiffies;
0100 
0101     /*
0102      * Rebuild our own share of the directory by collecting from all other
0103      * nodes their master rsb names that hash to us.
0104      */
0105 
0106     error = dlm_recover_directory(ls);
0107     if (error) {
0108         log_rinfo(ls, "dlm_recover_directory error %d", error);
0109         goto fail;
0110     }
0111 
0112     dlm_set_recover_status(ls, DLM_RS_DIR);
0113 
0114     error = dlm_recover_directory_wait(ls);
0115     if (error) {
0116         log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
0117         goto fail;
0118     }
0119 
0120     log_rinfo(ls, "dlm_recover_directory %u out %u messages",
0121           ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
0122 
0123     /*
0124      * We may have outstanding operations that are waiting for a reply from
0125      * a failed node.  Mark these to be resent after recovery.  Unlock and
0126      * cancel ops can just be completed.
0127      */
0128 
0129     dlm_recover_waiters_pre(ls);
0130 
0131     if (dlm_recovery_stopped(ls)) {
0132         error = -EINTR;
0133         goto fail;
0134     }
0135 
0136     if (neg || dlm_no_directory(ls)) {
0137         /*
0138          * Clear lkb's for departed nodes.
0139          */
0140 
0141         dlm_recover_purge(ls);
0142 
0143         /*
0144          * Get new master nodeid's for rsb's that were mastered on
0145          * departed nodes.
0146          */
0147 
0148         error = dlm_recover_masters(ls);
0149         if (error) {
0150             log_rinfo(ls, "dlm_recover_masters error %d", error);
0151             goto fail;
0152         }
0153 
0154         /*
0155          * Send our locks on remastered rsb's to the new masters.
0156          */
0157 
0158         error = dlm_recover_locks(ls);
0159         if (error) {
0160             log_rinfo(ls, "dlm_recover_locks error %d", error);
0161             goto fail;
0162         }
0163 
0164         dlm_set_recover_status(ls, DLM_RS_LOCKS);
0165 
0166         error = dlm_recover_locks_wait(ls);
0167         if (error) {
0168             log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
0169             goto fail;
0170         }
0171 
0172         log_rinfo(ls, "dlm_recover_locks %u in",
0173               ls->ls_recover_locks_in);
0174 
0175         /*
0176          * Finalize state in master rsb's now that all locks can be
0177          * checked.  This includes conversion resolution and lvb
0178          * settings.
0179          */
0180 
0181         dlm_recover_rsbs(ls);
0182     } else {
0183         /*
0184          * Other lockspace members may be going through the "neg" steps
0185          * while also adding us to the lockspace, in which case they'll
0186          * be doing the recover_locks (RS_LOCKS) barrier.
0187          */
0188         dlm_set_recover_status(ls, DLM_RS_LOCKS);
0189 
0190         error = dlm_recover_locks_wait(ls);
0191         if (error) {
0192             log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
0193             goto fail;
0194         }
0195     }
0196 
0197     dlm_release_root_list(ls);
0198 
0199     /*
0200      * Purge directory-related requests that are saved in requestqueue.
0201      * All dir requests from before recovery are invalid now due to the dir
0202      * rebuild and will be resent by the requesting nodes.
0203      */
0204 
0205     dlm_purge_requestqueue(ls);
0206 
0207     dlm_set_recover_status(ls, DLM_RS_DONE);
0208 
0209     error = dlm_recover_done_wait(ls);
0210     if (error) {
0211         log_rinfo(ls, "dlm_recover_done_wait error %d", error);
0212         goto fail;
0213     }
0214 
0215     dlm_clear_members_gone(ls);
0216 
0217     dlm_adjust_timeouts(ls);
0218 
0219     dlm_callback_resume(ls);
0220 
0221     error = enable_locking(ls, rv->seq);
0222     if (error) {
0223         log_rinfo(ls, "enable_locking error %d", error);
0224         goto fail;
0225     }
0226 
0227     error = dlm_process_requestqueue(ls);
0228     if (error) {
0229         log_rinfo(ls, "dlm_process_requestqueue error %d", error);
0230         goto fail;
0231     }
0232 
0233     error = dlm_recover_waiters_post(ls);
0234     if (error) {
0235         log_rinfo(ls, "dlm_recover_waiters_post error %d", error);
0236         goto fail;
0237     }
0238 
0239     dlm_recover_grant(ls);
0240 
0241     log_rinfo(ls, "dlm_recover %llu generation %u done: %u ms",
0242           (unsigned long long)rv->seq, ls->ls_generation,
0243           jiffies_to_msecs(jiffies - start));
0244     mutex_unlock(&ls->ls_recoverd_active);
0245 
0246     return 0;
0247 
0248  fail:
0249     dlm_release_root_list(ls);
0250     mutex_unlock(&ls->ls_recoverd_active);
0251 
0252     return error;
0253 }
0254 
0255 /* The dlm_ls_start() that created the rv we take here may already have been
0256    stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP
0257    flag set. */
0258 
0259 static void do_ls_recovery(struct dlm_ls *ls)
0260 {
0261     struct dlm_recover *rv = NULL;
0262     int error;
0263 
0264     spin_lock(&ls->ls_recover_lock);
0265     rv = ls->ls_recover_args;
0266     ls->ls_recover_args = NULL;
0267     if (rv && ls->ls_recover_seq == rv->seq)
0268         clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
0269     spin_unlock(&ls->ls_recover_lock);
0270 
0271     if (rv) {
0272         error = ls_recover(ls, rv);
0273         switch (error) {
0274         case 0:
0275             ls->ls_recovery_result = 0;
0276             complete(&ls->ls_recovery_done);
0277 
0278             dlm_lsop_recover_done(ls);
0279             break;
0280         case -EINTR:
0281             /* if recovery was interrupted -EINTR we wait for the next
0282              * ls_recover() iteration until it hopefully succeeds.
0283              */
0284             log_rinfo(ls, "%s %llu interrupted and should be queued to run again",
0285                   __func__, (unsigned long long)rv->seq);
0286             break;
0287         default:
0288             log_rinfo(ls, "%s %llu error %d", __func__,
0289                   (unsigned long long)rv->seq, error);
0290 
0291             /* let new_lockspace() get aware of critical error */
0292             ls->ls_recovery_result = error;
0293             complete(&ls->ls_recovery_done);
0294             break;
0295         }
0296 
0297         kfree(rv->nodes);
0298         kfree(rv);
0299     }
0300 }
0301 
0302 static int dlm_recoverd(void *arg)
0303 {
0304     struct dlm_ls *ls;
0305 
0306     ls = dlm_find_lockspace_local(arg);
0307     if (!ls) {
0308         log_print("dlm_recoverd: no lockspace %p", arg);
0309         return -1;
0310     }
0311 
0312     down_write(&ls->ls_in_recovery);
0313     set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
0314     wake_up(&ls->ls_recover_lock_wait);
0315 
0316     while (1) {
0317         /*
0318          * We call kthread_should_stop() after set_current_state().
0319          * This is because it works correctly if kthread_stop() is
0320          * called just before set_current_state().
0321          */
0322         set_current_state(TASK_INTERRUPTIBLE);
0323         if (kthread_should_stop()) {
0324             set_current_state(TASK_RUNNING);
0325             break;
0326         }
0327         if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) &&
0328             !test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
0329             if (kthread_should_stop())
0330                 break;
0331             schedule();
0332         }
0333         set_current_state(TASK_RUNNING);
0334 
0335         if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
0336             down_write(&ls->ls_in_recovery);
0337             set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
0338             wake_up(&ls->ls_recover_lock_wait);
0339         }
0340 
0341         if (test_and_clear_bit(LSFL_RECOVER_WORK, &ls->ls_flags))
0342             do_ls_recovery(ls);
0343     }
0344 
0345     if (test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags))
0346         up_write(&ls->ls_in_recovery);
0347 
0348     dlm_put_lockspace(ls);
0349     return 0;
0350 }
0351 
0352 int dlm_recoverd_start(struct dlm_ls *ls)
0353 {
0354     struct task_struct *p;
0355     int error = 0;
0356 
0357     p = kthread_run(dlm_recoverd, ls, "dlm_recoverd");
0358     if (IS_ERR(p))
0359         error = PTR_ERR(p);
0360     else
0361                 ls->ls_recoverd_task = p;
0362     return error;
0363 }
0364 
0365 void dlm_recoverd_stop(struct dlm_ls *ls)
0366 {
0367     kthread_stop(ls->ls_recoverd_task);
0368 }
0369 
0370 void dlm_recoverd_suspend(struct dlm_ls *ls)
0371 {
0372     wake_up(&ls->ls_wait_general);
0373     mutex_lock(&ls->ls_recoverd_active);
0374 }
0375 
0376 void dlm_recoverd_resume(struct dlm_ls *ls)
0377 {
0378     mutex_unlock(&ls->ls_recoverd_active);
0379 }
0380