Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-only
0002 /*
0003  * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
0004  * Copyright 2004-2011 Red Hat, Inc.
0005  */
0006 
0007 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
0008 
0009 #include <linux/fs.h>
0010 #include <linux/dlm.h>
0011 #include <linux/slab.h>
0012 #include <linux/types.h>
0013 #include <linux/delay.h>
0014 #include <linux/gfs2_ondisk.h>
0015 #include <linux/sched/signal.h>
0016 
0017 #include "incore.h"
0018 #include "glock.h"
0019 #include "glops.h"
0020 #include "recovery.h"
0021 #include "util.h"
0022 #include "sys.h"
0023 #include "trace_gfs2.h"
0024 
0025 /**
0026  * gfs2_update_stats - Update time based stats
0027  * @s: The stats to update (local or global)
0028  * @index: The index inside @s
0029  * @sample: New data to include
0030  */
0031 static inline void gfs2_update_stats(struct gfs2_lkstats *s, unsigned index,
0032                      s64 sample)
0033 {
0034     /*
0035      * @delta is the difference between the current rtt sample and the
0036      * running average srtt. We add 1/8 of that to the srtt in order to
0037      * update the current srtt estimate. The variance estimate is a bit
0038      * more complicated. We subtract the current variance estimate from
0039      * the abs value of the @delta and add 1/4 of that to the running
0040      * total.  That's equivalent to 3/4 of the current variance
0041      * estimate plus 1/4 of the abs of @delta.
0042      *
0043      * Note that the index points at the array entry containing the
0044      * smoothed mean value, and the variance is always in the following
0045      * entry
0046      *
0047      * Reference: TCP/IP Illustrated, vol 2, p. 831,832
0048      * All times are in units of integer nanoseconds. Unlike the TCP/IP
0049      * case, they are not scaled fixed point.
0050      */
0051 
0052     s64 delta = sample - s->stats[index];
0053     s->stats[index] += (delta >> 3);
0054     index++;
0055     s->stats[index] += (s64)(abs(delta) - s->stats[index]) >> 2;
0056 }
0057 
0058 /**
0059  * gfs2_update_reply_times - Update locking statistics
0060  * @gl: The glock to update
0061  *
0062  * This assumes that gl->gl_dstamp has been set earlier.
0063  *
0064  * The rtt (lock round trip time) is an estimate of the time
0065  * taken to perform a dlm lock request. We update it on each
0066  * reply from the dlm.
0067  *
0068  * The blocking flag is set on the glock for all dlm requests
0069  * which may potentially block due to lock requests from other nodes.
0070  * DLM requests where the current lock state is exclusive, the
0071  * requested state is null (or unlocked) or where the TRY or
0072  * TRY_1CB flags are set are classified as non-blocking. All
0073  * other DLM requests are counted as (potentially) blocking.
0074  */
0075 static inline void gfs2_update_reply_times(struct gfs2_glock *gl)
0076 {
0077     struct gfs2_pcpu_lkstats *lks;
0078     const unsigned gltype = gl->gl_name.ln_type;
0079     unsigned index = test_bit(GLF_BLOCKING, &gl->gl_flags) ?
0080              GFS2_LKS_SRTTB : GFS2_LKS_SRTT;
0081     s64 rtt;
0082 
0083     preempt_disable();
0084     rtt = ktime_to_ns(ktime_sub(ktime_get_real(), gl->gl_dstamp));
0085     lks = this_cpu_ptr(gl->gl_name.ln_sbd->sd_lkstats);
0086     gfs2_update_stats(&gl->gl_stats, index, rtt);       /* Local */
0087     gfs2_update_stats(&lks->lkstats[gltype], index, rtt);   /* Global */
0088     preempt_enable();
0089 
0090     trace_gfs2_glock_lock_time(gl, rtt);
0091 }
0092 
0093 /**
0094  * gfs2_update_request_times - Update locking statistics
0095  * @gl: The glock to update
0096  *
0097  * The irt (lock inter-request times) measures the average time
0098  * between requests to the dlm. It is updated immediately before
0099  * each dlm call.
0100  */
0101 
0102 static inline void gfs2_update_request_times(struct gfs2_glock *gl)
0103 {
0104     struct gfs2_pcpu_lkstats *lks;
0105     const unsigned gltype = gl->gl_name.ln_type;
0106     ktime_t dstamp;
0107     s64 irt;
0108 
0109     preempt_disable();
0110     dstamp = gl->gl_dstamp;
0111     gl->gl_dstamp = ktime_get_real();
0112     irt = ktime_to_ns(ktime_sub(gl->gl_dstamp, dstamp));
0113     lks = this_cpu_ptr(gl->gl_name.ln_sbd->sd_lkstats);
0114     gfs2_update_stats(&gl->gl_stats, GFS2_LKS_SIRT, irt);       /* Local */
0115     gfs2_update_stats(&lks->lkstats[gltype], GFS2_LKS_SIRT, irt);   /* Global */
0116     preempt_enable();
0117 }
0118  
0119 static void gdlm_ast(void *arg)
0120 {
0121     struct gfs2_glock *gl = arg;
0122     unsigned ret = gl->gl_state;
0123 
0124     gfs2_update_reply_times(gl);
0125     BUG_ON(gl->gl_lksb.sb_flags & DLM_SBF_DEMOTED);
0126 
0127     if ((gl->gl_lksb.sb_flags & DLM_SBF_VALNOTVALID) && gl->gl_lksb.sb_lvbptr)
0128         memset(gl->gl_lksb.sb_lvbptr, 0, GDLM_LVB_SIZE);
0129 
0130     switch (gl->gl_lksb.sb_status) {
0131     case -DLM_EUNLOCK: /* Unlocked, so glock can be freed */
0132         if (gl->gl_ops->go_free)
0133             gl->gl_ops->go_free(gl);
0134         gfs2_glock_free(gl);
0135         return;
0136     case -DLM_ECANCEL: /* Cancel while getting lock */
0137         ret |= LM_OUT_CANCELED;
0138         goto out;
0139     case -EAGAIN: /* Try lock fails */
0140     case -EDEADLK: /* Deadlock detected */
0141         goto out;
0142     case -ETIMEDOUT: /* Canceled due to timeout */
0143         ret |= LM_OUT_ERROR;
0144         goto out;
0145     case 0: /* Success */
0146         break;
0147     default: /* Something unexpected */
0148         BUG();
0149     }
0150 
0151     ret = gl->gl_req;
0152     if (gl->gl_lksb.sb_flags & DLM_SBF_ALTMODE) {
0153         if (gl->gl_req == LM_ST_SHARED)
0154             ret = LM_ST_DEFERRED;
0155         else if (gl->gl_req == LM_ST_DEFERRED)
0156             ret = LM_ST_SHARED;
0157         else
0158             BUG();
0159     }
0160 
0161     set_bit(GLF_INITIAL, &gl->gl_flags);
0162     gfs2_glock_complete(gl, ret);
0163     return;
0164 out:
0165     if (!test_bit(GLF_INITIAL, &gl->gl_flags))
0166         gl->gl_lksb.sb_lkid = 0;
0167     gfs2_glock_complete(gl, ret);
0168 }
0169 
0170 static void gdlm_bast(void *arg, int mode)
0171 {
0172     struct gfs2_glock *gl = arg;
0173 
0174     switch (mode) {
0175     case DLM_LOCK_EX:
0176         gfs2_glock_cb(gl, LM_ST_UNLOCKED);
0177         break;
0178     case DLM_LOCK_CW:
0179         gfs2_glock_cb(gl, LM_ST_DEFERRED);
0180         break;
0181     case DLM_LOCK_PR:
0182         gfs2_glock_cb(gl, LM_ST_SHARED);
0183         break;
0184     default:
0185         fs_err(gl->gl_name.ln_sbd, "unknown bast mode %d\n", mode);
0186         BUG();
0187     }
0188 }
0189 
0190 /* convert gfs lock-state to dlm lock-mode */
0191 
0192 static int make_mode(struct gfs2_sbd *sdp, const unsigned int lmstate)
0193 {
0194     switch (lmstate) {
0195     case LM_ST_UNLOCKED:
0196         return DLM_LOCK_NL;
0197     case LM_ST_EXCLUSIVE:
0198         return DLM_LOCK_EX;
0199     case LM_ST_DEFERRED:
0200         return DLM_LOCK_CW;
0201     case LM_ST_SHARED:
0202         return DLM_LOCK_PR;
0203     }
0204     fs_err(sdp, "unknown LM state %d\n", lmstate);
0205     BUG();
0206     return -1;
0207 }
0208 
0209 static u32 make_flags(struct gfs2_glock *gl, const unsigned int gfs_flags,
0210               const int req)
0211 {
0212     u32 lkf = 0;
0213 
0214     if (gl->gl_lksb.sb_lvbptr)
0215         lkf |= DLM_LKF_VALBLK;
0216 
0217     if (gfs_flags & LM_FLAG_TRY)
0218         lkf |= DLM_LKF_NOQUEUE;
0219 
0220     if (gfs_flags & LM_FLAG_TRY_1CB) {
0221         lkf |= DLM_LKF_NOQUEUE;
0222         lkf |= DLM_LKF_NOQUEUEBAST;
0223     }
0224 
0225     if (gfs_flags & LM_FLAG_PRIORITY) {
0226         lkf |= DLM_LKF_NOORDER;
0227         lkf |= DLM_LKF_HEADQUE;
0228     }
0229 
0230     if (gfs_flags & LM_FLAG_ANY) {
0231         if (req == DLM_LOCK_PR)
0232             lkf |= DLM_LKF_ALTCW;
0233         else if (req == DLM_LOCK_CW)
0234             lkf |= DLM_LKF_ALTPR;
0235         else
0236             BUG();
0237     }
0238 
0239     if (gl->gl_lksb.sb_lkid != 0) {
0240         lkf |= DLM_LKF_CONVERT;
0241         if (test_bit(GLF_BLOCKING, &gl->gl_flags))
0242             lkf |= DLM_LKF_QUECVT;
0243     }
0244 
0245     return lkf;
0246 }
0247 
0248 static void gfs2_reverse_hex(char *c, u64 value)
0249 {
0250     *c = '0';
0251     while (value) {
0252         *c-- = hex_asc[value & 0x0f];
0253         value >>= 4;
0254     }
0255 }
0256 
0257 static int gdlm_lock(struct gfs2_glock *gl, unsigned int req_state,
0258              unsigned int flags)
0259 {
0260     struct lm_lockstruct *ls = &gl->gl_name.ln_sbd->sd_lockstruct;
0261     int req;
0262     u32 lkf;
0263     char strname[GDLM_STRNAME_BYTES] = "";
0264     int error;
0265 
0266     req = make_mode(gl->gl_name.ln_sbd, req_state);
0267     lkf = make_flags(gl, flags, req);
0268     gfs2_glstats_inc(gl, GFS2_LKS_DCOUNT);
0269     gfs2_sbstats_inc(gl, GFS2_LKS_DCOUNT);
0270     if (gl->gl_lksb.sb_lkid) {
0271         gfs2_update_request_times(gl);
0272     } else {
0273         memset(strname, ' ', GDLM_STRNAME_BYTES - 1);
0274         strname[GDLM_STRNAME_BYTES - 1] = '\0';
0275         gfs2_reverse_hex(strname + 7, gl->gl_name.ln_type);
0276         gfs2_reverse_hex(strname + 23, gl->gl_name.ln_number);
0277         gl->gl_dstamp = ktime_get_real();
0278     }
0279     /*
0280      * Submit the actual lock request.
0281      */
0282 
0283 again:
0284     error = dlm_lock(ls->ls_dlm, req, &gl->gl_lksb, lkf, strname,
0285             GDLM_STRNAME_BYTES - 1, 0, gdlm_ast, gl, gdlm_bast);
0286     if (error == -EBUSY) {
0287         msleep(20);
0288         goto again;
0289     }
0290     return error;
0291 }
0292 
0293 static void gdlm_put_lock(struct gfs2_glock *gl)
0294 {
0295     struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
0296     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
0297     int error;
0298 
0299     if (gl->gl_lksb.sb_lkid == 0) {
0300         gfs2_glock_free(gl);
0301         return;
0302     }
0303 
0304     clear_bit(GLF_BLOCKING, &gl->gl_flags);
0305     gfs2_glstats_inc(gl, GFS2_LKS_DCOUNT);
0306     gfs2_sbstats_inc(gl, GFS2_LKS_DCOUNT);
0307     gfs2_update_request_times(gl);
0308 
0309     /* don't want to call dlm if we've unmounted the lock protocol */
0310     if (test_bit(DFL_UNMOUNT, &ls->ls_recover_flags)) {
0311         gfs2_glock_free(gl);
0312         return;
0313     }
0314     /* don't want to skip dlm_unlock writing the lvb when lock has one */
0315 
0316     if (test_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags) &&
0317         !gl->gl_lksb.sb_lvbptr) {
0318         gfs2_glock_free(gl);
0319         return;
0320     }
0321 
0322 again:
0323     error = dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_VALBLK,
0324                NULL, gl);
0325     if (error == -EBUSY) {
0326         msleep(20);
0327         goto again;
0328     }
0329 
0330     if (error) {
0331         fs_err(sdp, "gdlm_unlock %x,%llx err=%d\n",
0332                gl->gl_name.ln_type,
0333                (unsigned long long)gl->gl_name.ln_number, error);
0334         return;
0335     }
0336 }
0337 
0338 static void gdlm_cancel(struct gfs2_glock *gl)
0339 {
0340     struct lm_lockstruct *ls = &gl->gl_name.ln_sbd->sd_lockstruct;
0341     dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_CANCEL, NULL, gl);
0342 }
0343 
0344 /*
0345  * dlm/gfs2 recovery coordination using dlm_recover callbacks
0346  *
0347  *  0. gfs2 checks for another cluster node withdraw, needing journal replay
0348  *  1. dlm_controld sees lockspace members change
0349  *  2. dlm_controld blocks dlm-kernel locking activity
0350  *  3. dlm_controld within dlm-kernel notifies gfs2 (recover_prep)
0351  *  4. dlm_controld starts and finishes its own user level recovery
0352  *  5. dlm_controld starts dlm-kernel dlm_recoverd to do kernel recovery
0353  *  6. dlm_recoverd notifies gfs2 of failed nodes (recover_slot)
0354  *  7. dlm_recoverd does its own lock recovery
0355  *  8. dlm_recoverd unblocks dlm-kernel locking activity
0356  *  9. dlm_recoverd notifies gfs2 when done (recover_done with new generation)
0357  * 10. gfs2_control updates control_lock lvb with new generation and jid bits
0358  * 11. gfs2_control enqueues journals for gfs2_recover to recover (maybe none)
0359  * 12. gfs2_recover dequeues and recovers journals of failed nodes
0360  * 13. gfs2_recover provides recovery results to gfs2_control (recovery_result)
0361  * 14. gfs2_control updates control_lock lvb jid bits for recovered journals
0362  * 15. gfs2_control unblocks normal locking when all journals are recovered
0363  *
0364  * - failures during recovery
0365  *
0366  * recover_prep() may set BLOCK_LOCKS (step 3) again before gfs2_control
0367  * clears BLOCK_LOCKS (step 15), e.g. another node fails while still
0368  * recovering for a prior failure.  gfs2_control needs a way to detect
0369  * this so it can leave BLOCK_LOCKS set in step 15.  This is managed using
0370  * the recover_block and recover_start values.
0371  *
0372  * recover_done() provides a new lockspace generation number each time it
0373  * is called (step 9).  This generation number is saved as recover_start.
0374  * When recover_prep() is called, it sets BLOCK_LOCKS and sets
0375  * recover_block = recover_start.  So, while recover_block is equal to
0376  * recover_start, BLOCK_LOCKS should remain set.  (recover_spin must
0377  * be held around the BLOCK_LOCKS/recover_block/recover_start logic.)
0378  *
0379  * - more specific gfs2 steps in sequence above
0380  *
0381  *  3. recover_prep sets BLOCK_LOCKS and sets recover_block = recover_start
0382  *  6. recover_slot records any failed jids (maybe none)
0383  *  9. recover_done sets recover_start = new generation number
0384  * 10. gfs2_control sets control_lock lvb = new gen + bits for failed jids
0385  * 12. gfs2_recover does journal recoveries for failed jids identified above
0386  * 14. gfs2_control clears control_lock lvb bits for recovered jids
0387  * 15. gfs2_control checks if recover_block == recover_start (step 3 occured
0388  *     again) then do nothing, otherwise if recover_start > recover_block
0389  *     then clear BLOCK_LOCKS.
0390  *
0391  * - parallel recovery steps across all nodes
0392  *
0393  * All nodes attempt to update the control_lock lvb with the new generation
0394  * number and jid bits, but only the first to get the control_lock EX will
0395  * do so; others will see that it's already done (lvb already contains new
0396  * generation number.)
0397  *
0398  * . All nodes get the same recover_prep/recover_slot/recover_done callbacks
0399  * . All nodes attempt to set control_lock lvb gen + bits for the new gen
0400  * . One node gets control_lock first and writes the lvb, others see it's done
0401  * . All nodes attempt to recover jids for which they see control_lock bits set
0402  * . One node succeeds for a jid, and that one clears the jid bit in the lvb
0403  * . All nodes will eventually see all lvb bits clear and unblock locks
0404  *
0405  * - is there a problem with clearing an lvb bit that should be set
0406  *   and missing a journal recovery?
0407  *
0408  * 1. jid fails
0409  * 2. lvb bit set for step 1
0410  * 3. jid recovered for step 1
0411  * 4. jid taken again (new mount)
0412  * 5. jid fails (for step 4)
0413  * 6. lvb bit set for step 5 (will already be set)
0414  * 7. lvb bit cleared for step 3
0415  *
0416  * This is not a problem because the failure in step 5 does not
0417  * require recovery, because the mount in step 4 could not have
0418  * progressed far enough to unblock locks and access the fs.  The
0419  * control_mount() function waits for all recoveries to be complete
0420  * for the latest lockspace generation before ever unblocking locks
0421  * and returning.  The mount in step 4 waits until the recovery in
0422  * step 1 is done.
0423  *
0424  * - special case of first mounter: first node to mount the fs
0425  *
0426  * The first node to mount a gfs2 fs needs to check all the journals
0427  * and recover any that need recovery before other nodes are allowed
0428  * to mount the fs.  (Others may begin mounting, but they must wait
0429  * for the first mounter to be done before taking locks on the fs
0430  * or accessing the fs.)  This has two parts:
0431  *
0432  * 1. The mounted_lock tells a node it's the first to mount the fs.
0433  * Each node holds the mounted_lock in PR while it's mounted.
0434  * Each node tries to acquire the mounted_lock in EX when it mounts.
0435  * If a node is granted the mounted_lock EX it means there are no
0436  * other mounted nodes (no PR locks exist), and it is the first mounter.
0437  * The mounted_lock is demoted to PR when first recovery is done, so
0438  * others will fail to get an EX lock, but will get a PR lock.
0439  *
0440  * 2. The control_lock blocks others in control_mount() while the first
0441  * mounter is doing first mount recovery of all journals.
0442  * A mounting node needs to acquire control_lock in EX mode before
0443  * it can proceed.  The first mounter holds control_lock in EX while doing
0444  * the first mount recovery, blocking mounts from other nodes, then demotes
0445  * control_lock to NL when it's done (others_may_mount/first_done),
0446  * allowing other nodes to continue mounting.
0447  *
0448  * first mounter:
0449  * control_lock EX/NOQUEUE success
0450  * mounted_lock EX/NOQUEUE success (no other PR, so no other mounters)
0451  * set first=1
0452  * do first mounter recovery
0453  * mounted_lock EX->PR
0454  * control_lock EX->NL, write lvb generation
0455  *
0456  * other mounter:
0457  * control_lock EX/NOQUEUE success (if fail -EAGAIN, retry)
0458  * mounted_lock EX/NOQUEUE fail -EAGAIN (expected due to other mounters PR)
0459  * mounted_lock PR/NOQUEUE success
0460  * read lvb generation
0461  * control_lock EX->NL
0462  * set first=0
0463  *
0464  * - mount during recovery
0465  *
0466  * If a node mounts while others are doing recovery (not first mounter),
0467  * the mounting node will get its initial recover_done() callback without
0468  * having seen any previous failures/callbacks.
0469  *
0470  * It must wait for all recoveries preceding its mount to be finished
0471  * before it unblocks locks.  It does this by repeating the "other mounter"
0472  * steps above until the lvb generation number is >= its mount generation
0473  * number (from initial recover_done) and all lvb bits are clear.
0474  *
0475  * - control_lock lvb format
0476  *
0477  * 4 bytes generation number: the latest dlm lockspace generation number
0478  * from recover_done callback.  Indicates the jid bitmap has been updated
0479  * to reflect all slot failures through that generation.
0480  * 4 bytes unused.
0481  * GDLM_LVB_SIZE-8 bytes of jid bit map. If bit N is set, it indicates
0482  * that jid N needs recovery.
0483  */
0484 
0485 #define JID_BITMAP_OFFSET 8 /* 4 byte generation number + 4 byte unused */
0486 
0487 static void control_lvb_read(struct lm_lockstruct *ls, uint32_t *lvb_gen,
0488                  char *lvb_bits)
0489 {
0490     __le32 gen;
0491     memcpy(lvb_bits, ls->ls_control_lvb, GDLM_LVB_SIZE);
0492     memcpy(&gen, lvb_bits, sizeof(__le32));
0493     *lvb_gen = le32_to_cpu(gen);
0494 }
0495 
0496 static void control_lvb_write(struct lm_lockstruct *ls, uint32_t lvb_gen,
0497                   char *lvb_bits)
0498 {
0499     __le32 gen;
0500     memcpy(ls->ls_control_lvb, lvb_bits, GDLM_LVB_SIZE);
0501     gen = cpu_to_le32(lvb_gen);
0502     memcpy(ls->ls_control_lvb, &gen, sizeof(__le32));
0503 }
0504 
0505 static int all_jid_bits_clear(char *lvb)
0506 {
0507     return !memchr_inv(lvb + JID_BITMAP_OFFSET, 0,
0508             GDLM_LVB_SIZE - JID_BITMAP_OFFSET);
0509 }
0510 
0511 static void sync_wait_cb(void *arg)
0512 {
0513     struct lm_lockstruct *ls = arg;
0514     complete(&ls->ls_sync_wait);
0515 }
0516 
0517 static int sync_unlock(struct gfs2_sbd *sdp, struct dlm_lksb *lksb, char *name)
0518 {
0519     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
0520     int error;
0521 
0522     error = dlm_unlock(ls->ls_dlm, lksb->sb_lkid, 0, lksb, ls);
0523     if (error) {
0524         fs_err(sdp, "%s lkid %x error %d\n",
0525                name, lksb->sb_lkid, error);
0526         return error;
0527     }
0528 
0529     wait_for_completion(&ls->ls_sync_wait);
0530 
0531     if (lksb->sb_status != -DLM_EUNLOCK) {
0532         fs_err(sdp, "%s lkid %x status %d\n",
0533                name, lksb->sb_lkid, lksb->sb_status);
0534         return -1;
0535     }
0536     return 0;
0537 }
0538 
0539 static int sync_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags,
0540              unsigned int num, struct dlm_lksb *lksb, char *name)
0541 {
0542     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
0543     char strname[GDLM_STRNAME_BYTES];
0544     int error, status;
0545 
0546     memset(strname, 0, GDLM_STRNAME_BYTES);
0547     snprintf(strname, GDLM_STRNAME_BYTES, "%8x%16x", LM_TYPE_NONDISK, num);
0548 
0549     error = dlm_lock(ls->ls_dlm, mode, lksb, flags,
0550              strname, GDLM_STRNAME_BYTES - 1,
0551              0, sync_wait_cb, ls, NULL);
0552     if (error) {
0553         fs_err(sdp, "%s lkid %x flags %x mode %d error %d\n",
0554                name, lksb->sb_lkid, flags, mode, error);
0555         return error;
0556     }
0557 
0558     wait_for_completion(&ls->ls_sync_wait);
0559 
0560     status = lksb->sb_status;
0561 
0562     if (status && status != -EAGAIN) {
0563         fs_err(sdp, "%s lkid %x flags %x mode %d status %d\n",
0564                name, lksb->sb_lkid, flags, mode, status);
0565     }
0566 
0567     return status;
0568 }
0569 
0570 static int mounted_unlock(struct gfs2_sbd *sdp)
0571 {
0572     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
0573     return sync_unlock(sdp, &ls->ls_mounted_lksb, "mounted_lock");
0574 }
0575 
0576 static int mounted_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags)
0577 {
0578     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
0579     return sync_lock(sdp, mode, flags, GFS2_MOUNTED_LOCK,
0580              &ls->ls_mounted_lksb, "mounted_lock");
0581 }
0582 
0583 static int control_unlock(struct gfs2_sbd *sdp)
0584 {
0585     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
0586     return sync_unlock(sdp, &ls->ls_control_lksb, "control_lock");
0587 }
0588 
0589 static int control_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags)
0590 {
0591     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
0592     return sync_lock(sdp, mode, flags, GFS2_CONTROL_LOCK,
0593              &ls->ls_control_lksb, "control_lock");
0594 }
0595 
0596 /**
0597  * remote_withdraw - react to a node withdrawing from the file system
0598  * @sdp: The superblock
0599  */
0600 static void remote_withdraw(struct gfs2_sbd *sdp)
0601 {
0602     struct gfs2_jdesc *jd;
0603     int ret = 0, count = 0;
0604 
0605     list_for_each_entry(jd, &sdp->sd_jindex_list, jd_list) {
0606         if (jd->jd_jid == sdp->sd_lockstruct.ls_jid)
0607             continue;
0608         ret = gfs2_recover_journal(jd, true);
0609         if (ret)
0610             break;
0611         count++;
0612     }
0613 
0614     /* Now drop the additional reference we acquired */
0615     fs_err(sdp, "Journals checked: %d, ret = %d.\n", count, ret);
0616 }
0617 
0618 static void gfs2_control_func(struct work_struct *work)
0619 {
0620     struct gfs2_sbd *sdp = container_of(work, struct gfs2_sbd, sd_control_work.work);
0621     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
0622     uint32_t block_gen, start_gen, lvb_gen, flags;
0623     int recover_set = 0;
0624     int write_lvb = 0;
0625     int recover_size;
0626     int i, error;
0627 
0628     /* First check for other nodes that may have done a withdraw. */
0629     if (test_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags)) {
0630         remote_withdraw(sdp);
0631         clear_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags);
0632         return;
0633     }
0634 
0635     spin_lock(&ls->ls_recover_spin);
0636     /*
0637      * No MOUNT_DONE means we're still mounting; control_mount()
0638      * will set this flag, after which this thread will take over
0639      * all further clearing of BLOCK_LOCKS.
0640      *
0641      * FIRST_MOUNT means this node is doing first mounter recovery,
0642      * for which recovery control is handled by
0643      * control_mount()/control_first_done(), not this thread.
0644      */
0645     if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
0646          test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
0647         spin_unlock(&ls->ls_recover_spin);
0648         return;
0649     }
0650     block_gen = ls->ls_recover_block;
0651     start_gen = ls->ls_recover_start;
0652     spin_unlock(&ls->ls_recover_spin);
0653 
0654     /*
0655      * Equal block_gen and start_gen implies we are between
0656      * recover_prep and recover_done callbacks, which means
0657      * dlm recovery is in progress and dlm locking is blocked.
0658      * There's no point trying to do any work until recover_done.
0659      */
0660 
0661     if (block_gen == start_gen)
0662         return;
0663 
0664     /*
0665      * Propagate recover_submit[] and recover_result[] to lvb:
0666      * dlm_recoverd adds to recover_submit[] jids needing recovery
0667      * gfs2_recover adds to recover_result[] journal recovery results
0668      *
0669      * set lvb bit for jids in recover_submit[] if the lvb has not
0670      * yet been updated for the generation of the failure
0671      *
0672      * clear lvb bit for jids in recover_result[] if the result of
0673      * the journal recovery is SUCCESS
0674      */
0675 
0676     error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
0677     if (error) {
0678         fs_err(sdp, "control lock EX error %d\n", error);
0679         return;
0680     }
0681 
0682     control_lvb_read(ls, &lvb_gen, ls->ls_lvb_bits);
0683 
0684     spin_lock(&ls->ls_recover_spin);
0685     if (block_gen != ls->ls_recover_block ||
0686         start_gen != ls->ls_recover_start) {
0687         fs_info(sdp, "recover generation %u block1 %u %u\n",
0688             start_gen, block_gen, ls->ls_recover_block);
0689         spin_unlock(&ls->ls_recover_spin);
0690         control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
0691         return;
0692     }
0693 
0694     recover_size = ls->ls_recover_size;
0695 
0696     if (lvb_gen <= start_gen) {
0697         /*
0698          * Clear lvb bits for jids we've successfully recovered.
0699          * Because all nodes attempt to recover failed journals,
0700          * a journal can be recovered multiple times successfully
0701          * in succession.  Only the first will really do recovery,
0702          * the others find it clean, but still report a successful
0703          * recovery.  So, another node may have already recovered
0704          * the jid and cleared the lvb bit for it.
0705          */
0706         for (i = 0; i < recover_size; i++) {
0707             if (ls->ls_recover_result[i] != LM_RD_SUCCESS)
0708                 continue;
0709 
0710             ls->ls_recover_result[i] = 0;
0711 
0712             if (!test_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET))
0713                 continue;
0714 
0715             __clear_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET);
0716             write_lvb = 1;
0717         }
0718     }
0719 
0720     if (lvb_gen == start_gen) {
0721         /*
0722          * Failed slots before start_gen are already set in lvb.
0723          */
0724         for (i = 0; i < recover_size; i++) {
0725             if (!ls->ls_recover_submit[i])
0726                 continue;
0727             if (ls->ls_recover_submit[i] < lvb_gen)
0728                 ls->ls_recover_submit[i] = 0;
0729         }
0730     } else if (lvb_gen < start_gen) {
0731         /*
0732          * Failed slots before start_gen are not yet set in lvb.
0733          */
0734         for (i = 0; i < recover_size; i++) {
0735             if (!ls->ls_recover_submit[i])
0736                 continue;
0737             if (ls->ls_recover_submit[i] < start_gen) {
0738                 ls->ls_recover_submit[i] = 0;
0739                 __set_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET);
0740             }
0741         }
0742         /* even if there are no bits to set, we need to write the
0743            latest generation to the lvb */
0744         write_lvb = 1;
0745     } else {
0746         /*
0747          * we should be getting a recover_done() for lvb_gen soon
0748          */
0749     }
0750     spin_unlock(&ls->ls_recover_spin);
0751 
0752     if (write_lvb) {
0753         control_lvb_write(ls, start_gen, ls->ls_lvb_bits);
0754         flags = DLM_LKF_CONVERT | DLM_LKF_VALBLK;
0755     } else {
0756         flags = DLM_LKF_CONVERT;
0757     }
0758 
0759     error = control_lock(sdp, DLM_LOCK_NL, flags);
0760     if (error) {
0761         fs_err(sdp, "control lock NL error %d\n", error);
0762         return;
0763     }
0764 
0765     /*
0766      * Everyone will see jid bits set in the lvb, run gfs2_recover_set(),
0767      * and clear a jid bit in the lvb if the recovery is a success.
0768      * Eventually all journals will be recovered, all jid bits will
0769      * be cleared in the lvb, and everyone will clear BLOCK_LOCKS.
0770      */
0771 
0772     for (i = 0; i < recover_size; i++) {
0773         if (test_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET)) {
0774             fs_info(sdp, "recover generation %u jid %d\n",
0775                 start_gen, i);
0776             gfs2_recover_set(sdp, i);
0777             recover_set++;
0778         }
0779     }
0780     if (recover_set)
0781         return;
0782 
0783     /*
0784      * No more jid bits set in lvb, all recovery is done, unblock locks
0785      * (unless a new recover_prep callback has occured blocking locks
0786      * again while working above)
0787      */
0788 
0789     spin_lock(&ls->ls_recover_spin);
0790     if (ls->ls_recover_block == block_gen &&
0791         ls->ls_recover_start == start_gen) {
0792         clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
0793         spin_unlock(&ls->ls_recover_spin);
0794         fs_info(sdp, "recover generation %u done\n", start_gen);
0795         gfs2_glock_thaw(sdp);
0796     } else {
0797         fs_info(sdp, "recover generation %u block2 %u %u\n",
0798             start_gen, block_gen, ls->ls_recover_block);
0799         spin_unlock(&ls->ls_recover_spin);
0800     }
0801 }
0802 
0803 static int control_mount(struct gfs2_sbd *sdp)
0804 {
0805     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
0806     uint32_t start_gen, block_gen, mount_gen, lvb_gen;
0807     int mounted_mode;
0808     int retries = 0;
0809     int error;
0810 
0811     memset(&ls->ls_mounted_lksb, 0, sizeof(struct dlm_lksb));
0812     memset(&ls->ls_control_lksb, 0, sizeof(struct dlm_lksb));
0813     memset(&ls->ls_control_lvb, 0, GDLM_LVB_SIZE);
0814     ls->ls_control_lksb.sb_lvbptr = ls->ls_control_lvb;
0815     init_completion(&ls->ls_sync_wait);
0816 
0817     set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
0818 
0819     error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_VALBLK);
0820     if (error) {
0821         fs_err(sdp, "control_mount control_lock NL error %d\n", error);
0822         return error;
0823     }
0824 
0825     error = mounted_lock(sdp, DLM_LOCK_NL, 0);
0826     if (error) {
0827         fs_err(sdp, "control_mount mounted_lock NL error %d\n", error);
0828         control_unlock(sdp);
0829         return error;
0830     }
0831     mounted_mode = DLM_LOCK_NL;
0832 
0833 restart:
0834     if (retries++ && signal_pending(current)) {
0835         error = -EINTR;
0836         goto fail;
0837     }
0838 
0839     /*
0840      * We always start with both locks in NL. control_lock is
0841      * demoted to NL below so we don't need to do it here.
0842      */
0843 
0844     if (mounted_mode != DLM_LOCK_NL) {
0845         error = mounted_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
0846         if (error)
0847             goto fail;
0848         mounted_mode = DLM_LOCK_NL;
0849     }
0850 
0851     /*
0852      * Other nodes need to do some work in dlm recovery and gfs2_control
0853      * before the recover_done and control_lock will be ready for us below.
0854      * A delay here is not required but often avoids having to retry.
0855      */
0856 
0857     msleep_interruptible(500);
0858 
0859     /*
0860      * Acquire control_lock in EX and mounted_lock in either EX or PR.
0861      * control_lock lvb keeps track of any pending journal recoveries.
0862      * mounted_lock indicates if any other nodes have the fs mounted.
0863      */
0864 
0865     error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE|DLM_LKF_VALBLK);
0866     if (error == -EAGAIN) {
0867         goto restart;
0868     } else if (error) {
0869         fs_err(sdp, "control_mount control_lock EX error %d\n", error);
0870         goto fail;
0871     }
0872 
0873     /**
0874      * If we're a spectator, we don't want to take the lock in EX because
0875      * we cannot do the first-mount responsibility it implies: recovery.
0876      */
0877     if (sdp->sd_args.ar_spectator)
0878         goto locks_done;
0879 
0880     error = mounted_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE);
0881     if (!error) {
0882         mounted_mode = DLM_LOCK_EX;
0883         goto locks_done;
0884     } else if (error != -EAGAIN) {
0885         fs_err(sdp, "control_mount mounted_lock EX error %d\n", error);
0886         goto fail;
0887     }
0888 
0889     error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE);
0890     if (!error) {
0891         mounted_mode = DLM_LOCK_PR;
0892         goto locks_done;
0893     } else {
0894         /* not even -EAGAIN should happen here */
0895         fs_err(sdp, "control_mount mounted_lock PR error %d\n", error);
0896         goto fail;
0897     }
0898 
0899 locks_done:
0900     /*
0901      * If we got both locks above in EX, then we're the first mounter.
0902      * If not, then we need to wait for the control_lock lvb to be
0903      * updated by other mounted nodes to reflect our mount generation.
0904      *
0905      * In simple first mounter cases, first mounter will see zero lvb_gen,
0906      * but in cases where all existing nodes leave/fail before mounting
0907      * nodes finish control_mount, then all nodes will be mounting and
0908      * lvb_gen will be non-zero.
0909      */
0910 
0911     control_lvb_read(ls, &lvb_gen, ls->ls_lvb_bits);
0912 
0913     if (lvb_gen == 0xFFFFFFFF) {
0914         /* special value to force mount attempts to fail */
0915         fs_err(sdp, "control_mount control_lock disabled\n");
0916         error = -EINVAL;
0917         goto fail;
0918     }
0919 
0920     if (mounted_mode == DLM_LOCK_EX) {
0921         /* first mounter, keep both EX while doing first recovery */
0922         spin_lock(&ls->ls_recover_spin);
0923         clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
0924         set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags);
0925         set_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
0926         spin_unlock(&ls->ls_recover_spin);
0927         fs_info(sdp, "first mounter control generation %u\n", lvb_gen);
0928         return 0;
0929     }
0930 
0931     error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
0932     if (error)
0933         goto fail;
0934 
0935     /*
0936      * We are not first mounter, now we need to wait for the control_lock
0937      * lvb generation to be >= the generation from our first recover_done
0938      * and all lvb bits to be clear (no pending journal recoveries.)
0939      */
0940 
0941     if (!all_jid_bits_clear(ls->ls_lvb_bits)) {
0942         /* journals need recovery, wait until all are clear */
0943         fs_info(sdp, "control_mount wait for journal recovery\n");
0944         goto restart;
0945     }
0946 
0947     spin_lock(&ls->ls_recover_spin);
0948     block_gen = ls->ls_recover_block;
0949     start_gen = ls->ls_recover_start;
0950     mount_gen = ls->ls_recover_mount;
0951 
0952     if (lvb_gen < mount_gen) {
0953         /* wait for mounted nodes to update control_lock lvb to our
0954            generation, which might include new recovery bits set */
0955         if (sdp->sd_args.ar_spectator) {
0956             fs_info(sdp, "Recovery is required. Waiting for a "
0957                 "non-spectator to mount.\n");
0958             msleep_interruptible(1000);
0959         } else {
0960             fs_info(sdp, "control_mount wait1 block %u start %u "
0961                 "mount %u lvb %u flags %lx\n", block_gen,
0962                 start_gen, mount_gen, lvb_gen,
0963                 ls->ls_recover_flags);
0964         }
0965         spin_unlock(&ls->ls_recover_spin);
0966         goto restart;
0967     }
0968 
0969     if (lvb_gen != start_gen) {
0970         /* wait for mounted nodes to update control_lock lvb to the
0971            latest recovery generation */
0972         fs_info(sdp, "control_mount wait2 block %u start %u mount %u "
0973             "lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
0974             lvb_gen, ls->ls_recover_flags);
0975         spin_unlock(&ls->ls_recover_spin);
0976         goto restart;
0977     }
0978 
0979     if (block_gen == start_gen) {
0980         /* dlm recovery in progress, wait for it to finish */
0981         fs_info(sdp, "control_mount wait3 block %u start %u mount %u "
0982             "lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
0983             lvb_gen, ls->ls_recover_flags);
0984         spin_unlock(&ls->ls_recover_spin);
0985         goto restart;
0986     }
0987 
0988     clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
0989     set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags);
0990     memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t));
0991     memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t));
0992     spin_unlock(&ls->ls_recover_spin);
0993     return 0;
0994 
0995 fail:
0996     mounted_unlock(sdp);
0997     control_unlock(sdp);
0998     return error;
0999 }
1000 
1001 static int control_first_done(struct gfs2_sbd *sdp)
1002 {
1003     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1004     uint32_t start_gen, block_gen;
1005     int error;
1006 
1007 restart:
1008     spin_lock(&ls->ls_recover_spin);
1009     start_gen = ls->ls_recover_start;
1010     block_gen = ls->ls_recover_block;
1011 
1012     if (test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags) ||
1013         !test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
1014         !test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1015         /* sanity check, should not happen */
1016         fs_err(sdp, "control_first_done start %u block %u flags %lx\n",
1017                start_gen, block_gen, ls->ls_recover_flags);
1018         spin_unlock(&ls->ls_recover_spin);
1019         control_unlock(sdp);
1020         return -1;
1021     }
1022 
1023     if (start_gen == block_gen) {
1024         /*
1025          * Wait for the end of a dlm recovery cycle to switch from
1026          * first mounter recovery.  We can ignore any recover_slot
1027          * callbacks between the recover_prep and next recover_done
1028          * because we are still the first mounter and any failed nodes
1029          * have not fully mounted, so they don't need recovery.
1030          */
1031         spin_unlock(&ls->ls_recover_spin);
1032         fs_info(sdp, "control_first_done wait gen %u\n", start_gen);
1033 
1034         wait_on_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY,
1035                 TASK_UNINTERRUPTIBLE);
1036         goto restart;
1037     }
1038 
1039     clear_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
1040     set_bit(DFL_FIRST_MOUNT_DONE, &ls->ls_recover_flags);
1041     memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t));
1042     memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t));
1043     spin_unlock(&ls->ls_recover_spin);
1044 
1045     memset(ls->ls_lvb_bits, 0, GDLM_LVB_SIZE);
1046     control_lvb_write(ls, start_gen, ls->ls_lvb_bits);
1047 
1048     error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT);
1049     if (error)
1050         fs_err(sdp, "control_first_done mounted PR error %d\n", error);
1051 
1052     error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
1053     if (error)
1054         fs_err(sdp, "control_first_done control NL error %d\n", error);
1055 
1056     return error;
1057 }
1058 
1059 /*
1060  * Expand static jid arrays if necessary (by increments of RECOVER_SIZE_INC)
1061  * to accommodate the largest slot number.  (NB dlm slot numbers start at 1,
1062  * gfs2 jids start at 0, so jid = slot - 1)
1063  */
1064 
1065 #define RECOVER_SIZE_INC 16
1066 
1067 static int set_recover_size(struct gfs2_sbd *sdp, struct dlm_slot *slots,
1068                 int num_slots)
1069 {
1070     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1071     uint32_t *submit = NULL;
1072     uint32_t *result = NULL;
1073     uint32_t old_size, new_size;
1074     int i, max_jid;
1075 
1076     if (!ls->ls_lvb_bits) {
1077         ls->ls_lvb_bits = kzalloc(GDLM_LVB_SIZE, GFP_NOFS);
1078         if (!ls->ls_lvb_bits)
1079             return -ENOMEM;
1080     }
1081 
1082     max_jid = 0;
1083     for (i = 0; i < num_slots; i++) {
1084         if (max_jid < slots[i].slot - 1)
1085             max_jid = slots[i].slot - 1;
1086     }
1087 
1088     old_size = ls->ls_recover_size;
1089     new_size = old_size;
1090     while (new_size < max_jid + 1)
1091         new_size += RECOVER_SIZE_INC;
1092     if (new_size == old_size)
1093         return 0;
1094 
1095     submit = kcalloc(new_size, sizeof(uint32_t), GFP_NOFS);
1096     result = kcalloc(new_size, sizeof(uint32_t), GFP_NOFS);
1097     if (!submit || !result) {
1098         kfree(submit);
1099         kfree(result);
1100         return -ENOMEM;
1101     }
1102 
1103     spin_lock(&ls->ls_recover_spin);
1104     memcpy(submit, ls->ls_recover_submit, old_size * sizeof(uint32_t));
1105     memcpy(result, ls->ls_recover_result, old_size * sizeof(uint32_t));
1106     kfree(ls->ls_recover_submit);
1107     kfree(ls->ls_recover_result);
1108     ls->ls_recover_submit = submit;
1109     ls->ls_recover_result = result;
1110     ls->ls_recover_size = new_size;
1111     spin_unlock(&ls->ls_recover_spin);
1112     return 0;
1113 }
1114 
1115 static void free_recover_size(struct lm_lockstruct *ls)
1116 {
1117     kfree(ls->ls_lvb_bits);
1118     kfree(ls->ls_recover_submit);
1119     kfree(ls->ls_recover_result);
1120     ls->ls_recover_submit = NULL;
1121     ls->ls_recover_result = NULL;
1122     ls->ls_recover_size = 0;
1123     ls->ls_lvb_bits = NULL;
1124 }
1125 
1126 /* dlm calls before it does lock recovery */
1127 
1128 static void gdlm_recover_prep(void *arg)
1129 {
1130     struct gfs2_sbd *sdp = arg;
1131     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1132 
1133     if (gfs2_withdrawn(sdp)) {
1134         fs_err(sdp, "recover_prep ignored due to withdraw.\n");
1135         return;
1136     }
1137     spin_lock(&ls->ls_recover_spin);
1138     ls->ls_recover_block = ls->ls_recover_start;
1139     set_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags);
1140 
1141     if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
1142          test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1143         spin_unlock(&ls->ls_recover_spin);
1144         return;
1145     }
1146     set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
1147     spin_unlock(&ls->ls_recover_spin);
1148 }
1149 
1150 /* dlm calls after recover_prep has been completed on all lockspace members;
1151    identifies slot/jid of failed member */
1152 
1153 static void gdlm_recover_slot(void *arg, struct dlm_slot *slot)
1154 {
1155     struct gfs2_sbd *sdp = arg;
1156     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1157     int jid = slot->slot - 1;
1158 
1159     if (gfs2_withdrawn(sdp)) {
1160         fs_err(sdp, "recover_slot jid %d ignored due to withdraw.\n",
1161                jid);
1162         return;
1163     }
1164     spin_lock(&ls->ls_recover_spin);
1165     if (ls->ls_recover_size < jid + 1) {
1166         fs_err(sdp, "recover_slot jid %d gen %u short size %d\n",
1167                jid, ls->ls_recover_block, ls->ls_recover_size);
1168         spin_unlock(&ls->ls_recover_spin);
1169         return;
1170     }
1171 
1172     if (ls->ls_recover_submit[jid]) {
1173         fs_info(sdp, "recover_slot jid %d gen %u prev %u\n",
1174             jid, ls->ls_recover_block, ls->ls_recover_submit[jid]);
1175     }
1176     ls->ls_recover_submit[jid] = ls->ls_recover_block;
1177     spin_unlock(&ls->ls_recover_spin);
1178 }
1179 
1180 /* dlm calls after recover_slot and after it completes lock recovery */
1181 
1182 static void gdlm_recover_done(void *arg, struct dlm_slot *slots, int num_slots,
1183                   int our_slot, uint32_t generation)
1184 {
1185     struct gfs2_sbd *sdp = arg;
1186     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1187 
1188     if (gfs2_withdrawn(sdp)) {
1189         fs_err(sdp, "recover_done ignored due to withdraw.\n");
1190         return;
1191     }
1192     /* ensure the ls jid arrays are large enough */
1193     set_recover_size(sdp, slots, num_slots);
1194 
1195     spin_lock(&ls->ls_recover_spin);
1196     ls->ls_recover_start = generation;
1197 
1198     if (!ls->ls_recover_mount) {
1199         ls->ls_recover_mount = generation;
1200         ls->ls_jid = our_slot - 1;
1201     }
1202 
1203     if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
1204         queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work, 0);
1205 
1206     clear_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags);
1207     smp_mb__after_atomic();
1208     wake_up_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY);
1209     spin_unlock(&ls->ls_recover_spin);
1210 }
1211 
1212 /* gfs2_recover thread has a journal recovery result */
1213 
1214 static void gdlm_recovery_result(struct gfs2_sbd *sdp, unsigned int jid,
1215                  unsigned int result)
1216 {
1217     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1218 
1219     if (gfs2_withdrawn(sdp)) {
1220         fs_err(sdp, "recovery_result jid %d ignored due to withdraw.\n",
1221                jid);
1222         return;
1223     }
1224     if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1225         return;
1226 
1227     /* don't care about the recovery of own journal during mount */
1228     if (jid == ls->ls_jid)
1229         return;
1230 
1231     spin_lock(&ls->ls_recover_spin);
1232     if (test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1233         spin_unlock(&ls->ls_recover_spin);
1234         return;
1235     }
1236     if (ls->ls_recover_size < jid + 1) {
1237         fs_err(sdp, "recovery_result jid %d short size %d\n",
1238                jid, ls->ls_recover_size);
1239         spin_unlock(&ls->ls_recover_spin);
1240         return;
1241     }
1242 
1243     fs_info(sdp, "recover jid %d result %s\n", jid,
1244         result == LM_RD_GAVEUP ? "busy" : "success");
1245 
1246     ls->ls_recover_result[jid] = result;
1247 
1248     /* GAVEUP means another node is recovering the journal; delay our
1249        next attempt to recover it, to give the other node a chance to
1250        finish before trying again */
1251 
1252     if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
1253         queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work,
1254                    result == LM_RD_GAVEUP ? HZ : 0);
1255     spin_unlock(&ls->ls_recover_spin);
1256 }
1257 
1258 static const struct dlm_lockspace_ops gdlm_lockspace_ops = {
1259     .recover_prep = gdlm_recover_prep,
1260     .recover_slot = gdlm_recover_slot,
1261     .recover_done = gdlm_recover_done,
1262 };
1263 
1264 static int gdlm_mount(struct gfs2_sbd *sdp, const char *table)
1265 {
1266     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1267     char cluster[GFS2_LOCKNAME_LEN];
1268     const char *fsname;
1269     uint32_t flags;
1270     int error, ops_result;
1271 
1272     /*
1273      * initialize everything
1274      */
1275 
1276     INIT_DELAYED_WORK(&sdp->sd_control_work, gfs2_control_func);
1277     spin_lock_init(&ls->ls_recover_spin);
1278     ls->ls_recover_flags = 0;
1279     ls->ls_recover_mount = 0;
1280     ls->ls_recover_start = 0;
1281     ls->ls_recover_block = 0;
1282     ls->ls_recover_size = 0;
1283     ls->ls_recover_submit = NULL;
1284     ls->ls_recover_result = NULL;
1285     ls->ls_lvb_bits = NULL;
1286 
1287     error = set_recover_size(sdp, NULL, 0);
1288     if (error)
1289         goto fail;
1290 
1291     /*
1292      * prepare dlm_new_lockspace args
1293      */
1294 
1295     fsname = strchr(table, ':');
1296     if (!fsname) {
1297         fs_info(sdp, "no fsname found\n");
1298         error = -EINVAL;
1299         goto fail_free;
1300     }
1301     memset(cluster, 0, sizeof(cluster));
1302     memcpy(cluster, table, strlen(table) - strlen(fsname));
1303     fsname++;
1304 
1305     flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL;
1306 
1307     /*
1308      * create/join lockspace
1309      */
1310 
1311     error = dlm_new_lockspace(fsname, cluster, flags, GDLM_LVB_SIZE,
1312                   &gdlm_lockspace_ops, sdp, &ops_result,
1313                   &ls->ls_dlm);
1314     if (error) {
1315         fs_err(sdp, "dlm_new_lockspace error %d\n", error);
1316         goto fail_free;
1317     }
1318 
1319     if (ops_result < 0) {
1320         /*
1321          * dlm does not support ops callbacks,
1322          * old dlm_controld/gfs_controld are used, try without ops.
1323          */
1324         fs_info(sdp, "dlm lockspace ops not used\n");
1325         free_recover_size(ls);
1326         set_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags);
1327         return 0;
1328     }
1329 
1330     if (!test_bit(SDF_NOJOURNALID, &sdp->sd_flags)) {
1331         fs_err(sdp, "dlm lockspace ops disallow jid preset\n");
1332         error = -EINVAL;
1333         goto fail_release;
1334     }
1335 
1336     /*
1337      * control_mount() uses control_lock to determine first mounter,
1338      * and for later mounts, waits for any recoveries to be cleared.
1339      */
1340 
1341     error = control_mount(sdp);
1342     if (error) {
1343         fs_err(sdp, "mount control error %d\n", error);
1344         goto fail_release;
1345     }
1346 
1347     ls->ls_first = !!test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
1348     clear_bit(SDF_NOJOURNALID, &sdp->sd_flags);
1349     smp_mb__after_atomic();
1350     wake_up_bit(&sdp->sd_flags, SDF_NOJOURNALID);
1351     return 0;
1352 
1353 fail_release:
1354     dlm_release_lockspace(ls->ls_dlm, 2);
1355 fail_free:
1356     free_recover_size(ls);
1357 fail:
1358     return error;
1359 }
1360 
1361 static void gdlm_first_done(struct gfs2_sbd *sdp)
1362 {
1363     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1364     int error;
1365 
1366     if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1367         return;
1368 
1369     error = control_first_done(sdp);
1370     if (error)
1371         fs_err(sdp, "mount first_done error %d\n", error);
1372 }
1373 
1374 static void gdlm_unmount(struct gfs2_sbd *sdp)
1375 {
1376     struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1377 
1378     if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1379         goto release;
1380 
1381     /* wait for gfs2_control_wq to be done with this mount */
1382 
1383     spin_lock(&ls->ls_recover_spin);
1384     set_bit(DFL_UNMOUNT, &ls->ls_recover_flags);
1385     spin_unlock(&ls->ls_recover_spin);
1386     flush_delayed_work(&sdp->sd_control_work);
1387 
1388     /* mounted_lock and control_lock will be purged in dlm recovery */
1389 release:
1390     if (ls->ls_dlm) {
1391         dlm_release_lockspace(ls->ls_dlm, 2);
1392         ls->ls_dlm = NULL;
1393     }
1394 
1395     free_recover_size(ls);
1396 }
1397 
1398 static const match_table_t dlm_tokens = {
1399     { Opt_jid, "jid=%d"},
1400     { Opt_id, "id=%d"},
1401     { Opt_first, "first=%d"},
1402     { Opt_nodir, "nodir=%d"},
1403     { Opt_err, NULL },
1404 };
1405 
1406 const struct lm_lockops gfs2_dlm_ops = {
1407     .lm_proto_name = "lock_dlm",
1408     .lm_mount = gdlm_mount,
1409     .lm_first_done = gdlm_first_done,
1410     .lm_recovery_result = gdlm_recovery_result,
1411     .lm_unmount = gdlm_unmount,
1412     .lm_put_lock = gdlm_put_lock,
1413     .lm_lock = gdlm_lock,
1414     .lm_cancel = gdlm_cancel,
1415     .lm_tokens = &dlm_tokens,
1416 };
1417