Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-only
0002 /******************************************************************************
0003 *******************************************************************************
0004 **
0005 **  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
0006 **
0007 **
0008 *******************************************************************************
0009 ******************************************************************************/
0010 
0011 #include "dlm_internal.h"
0012 #include "lockspace.h"
0013 #include "member.h"
0014 #include "recoverd.h"
0015 #include "recover.h"
0016 #include "rcom.h"
0017 #include "config.h"
0018 #include "midcomms.h"
0019 #include "lowcomms.h"
0020 
0021 int dlm_slots_version(struct dlm_header *h)
0022 {
0023     if ((le32_to_cpu(h->h_version) & 0x0000FFFF) < DLM_HEADER_SLOTS)
0024         return 0;
0025     return 1;
0026 }
0027 
0028 void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
0029            struct dlm_member *memb)
0030 {
0031     struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
0032 
0033     if (!dlm_slots_version(&rc->rc_header))
0034         return;
0035 
0036     memb->slot = le16_to_cpu(rf->rf_our_slot);
0037     memb->generation = le32_to_cpu(rf->rf_generation);
0038 }
0039 
0040 void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
0041 {
0042     struct dlm_slot *slot;
0043     struct rcom_slot *ro;
0044     int i;
0045 
0046     ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
0047 
0048     /* ls_slots array is sparse, but not rcom_slots */
0049 
0050     for (i = 0; i < ls->ls_slots_size; i++) {
0051         slot = &ls->ls_slots[i];
0052         if (!slot->nodeid)
0053             continue;
0054         ro->ro_nodeid = cpu_to_le32(slot->nodeid);
0055         ro->ro_slot = cpu_to_le16(slot->slot);
0056         ro++;
0057     }
0058 }
0059 
0060 #define SLOT_DEBUG_LINE 128
0061 
0062 static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
0063               struct rcom_slot *ro0, struct dlm_slot *array,
0064               int array_size)
0065 {
0066     char line[SLOT_DEBUG_LINE];
0067     int len = SLOT_DEBUG_LINE - 1;
0068     int pos = 0;
0069     int ret, i;
0070 
0071     memset(line, 0, sizeof(line));
0072 
0073     if (array) {
0074         for (i = 0; i < array_size; i++) {
0075             if (!array[i].nodeid)
0076                 continue;
0077 
0078             ret = snprintf(line + pos, len - pos, " %d:%d",
0079                        array[i].slot, array[i].nodeid);
0080             if (ret >= len - pos)
0081                 break;
0082             pos += ret;
0083         }
0084     } else if (ro0) {
0085         for (i = 0; i < num_slots; i++) {
0086             ret = snprintf(line + pos, len - pos, " %d:%d",
0087                        ro0[i].ro_slot, ro0[i].ro_nodeid);
0088             if (ret >= len - pos)
0089                 break;
0090             pos += ret;
0091         }
0092     }
0093 
0094     log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
0095 }
0096 
0097 int dlm_slots_copy_in(struct dlm_ls *ls)
0098 {
0099     struct dlm_member *memb;
0100     struct dlm_rcom *rc = ls->ls_recover_buf;
0101     struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
0102     struct rcom_slot *ro0, *ro;
0103     int our_nodeid = dlm_our_nodeid();
0104     int i, num_slots;
0105     uint32_t gen;
0106 
0107     if (!dlm_slots_version(&rc->rc_header))
0108         return -1;
0109 
0110     gen = le32_to_cpu(rf->rf_generation);
0111     if (gen <= ls->ls_generation) {
0112         log_error(ls, "dlm_slots_copy_in gen %u old %u",
0113               gen, ls->ls_generation);
0114     }
0115     ls->ls_generation = gen;
0116 
0117     num_slots = le16_to_cpu(rf->rf_num_slots);
0118     if (!num_slots)
0119         return -1;
0120 
0121     ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
0122 
0123     log_slots(ls, gen, num_slots, ro0, NULL, 0);
0124 
0125     list_for_each_entry(memb, &ls->ls_nodes, list) {
0126         for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
0127             if (le32_to_cpu(ro->ro_nodeid) != memb->nodeid)
0128                 continue;
0129             memb->slot = le16_to_cpu(ro->ro_slot);
0130             memb->slot_prev = memb->slot;
0131             break;
0132         }
0133 
0134         if (memb->nodeid == our_nodeid) {
0135             if (ls->ls_slot && ls->ls_slot != memb->slot) {
0136                 log_error(ls, "dlm_slots_copy_in our slot "
0137                       "changed %d %d", ls->ls_slot,
0138                       memb->slot);
0139                 return -1;
0140             }
0141 
0142             if (!ls->ls_slot)
0143                 ls->ls_slot = memb->slot;
0144         }
0145 
0146         if (!memb->slot) {
0147             log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
0148                    memb->nodeid);
0149             return -1;
0150         }
0151     }
0152 
0153     return 0;
0154 }
0155 
0156 /* for any nodes that do not support slots, we will not have set memb->slot
0157    in wait_status_all(), so memb->slot will remain -1, and we will not
0158    assign slots or set ls_num_slots here */
0159 
0160 int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
0161              struct dlm_slot **slots_out, uint32_t *gen_out)
0162 {
0163     struct dlm_member *memb;
0164     struct dlm_slot *array;
0165     int our_nodeid = dlm_our_nodeid();
0166     int array_size, max_slots, i;
0167     int need = 0;
0168     int max = 0;
0169     int num = 0;
0170     uint32_t gen = 0;
0171 
0172     /* our own memb struct will have slot -1 gen 0 */
0173 
0174     list_for_each_entry(memb, &ls->ls_nodes, list) {
0175         if (memb->nodeid == our_nodeid) {
0176             memb->slot = ls->ls_slot;
0177             memb->generation = ls->ls_generation;
0178             break;
0179         }
0180     }
0181 
0182     list_for_each_entry(memb, &ls->ls_nodes, list) {
0183         if (memb->generation > gen)
0184             gen = memb->generation;
0185 
0186         /* node doesn't support slots */
0187 
0188         if (memb->slot == -1)
0189             return -1;
0190 
0191         /* node needs a slot assigned */
0192 
0193         if (!memb->slot)
0194             need++;
0195 
0196         /* node has a slot assigned */
0197 
0198         num++;
0199 
0200         if (!max || max < memb->slot)
0201             max = memb->slot;
0202 
0203         /* sanity check, once slot is assigned it shouldn't change */
0204 
0205         if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
0206             log_error(ls, "nodeid %d slot changed %d %d",
0207                   memb->nodeid, memb->slot_prev, memb->slot);
0208             return -1;
0209         }
0210         memb->slot_prev = memb->slot;
0211     }
0212 
0213     array_size = max + need;
0214     array = kcalloc(array_size, sizeof(*array), GFP_NOFS);
0215     if (!array)
0216         return -ENOMEM;
0217 
0218     num = 0;
0219 
0220     /* fill in slots (offsets) that are used */
0221 
0222     list_for_each_entry(memb, &ls->ls_nodes, list) {
0223         if (!memb->slot)
0224             continue;
0225 
0226         if (memb->slot > array_size) {
0227             log_error(ls, "invalid slot number %d", memb->slot);
0228             kfree(array);
0229             return -1;
0230         }
0231 
0232         array[memb->slot - 1].nodeid = memb->nodeid;
0233         array[memb->slot - 1].slot = memb->slot;
0234         num++;
0235     }
0236 
0237     /* assign new slots from unused offsets */
0238 
0239     list_for_each_entry(memb, &ls->ls_nodes, list) {
0240         if (memb->slot)
0241             continue;
0242 
0243         for (i = 0; i < array_size; i++) {
0244             if (array[i].nodeid)
0245                 continue;
0246 
0247             memb->slot = i + 1;
0248             memb->slot_prev = memb->slot;
0249             array[i].nodeid = memb->nodeid;
0250             array[i].slot = memb->slot;
0251             num++;
0252 
0253             if (!ls->ls_slot && memb->nodeid == our_nodeid)
0254                 ls->ls_slot = memb->slot;
0255             break;
0256         }
0257 
0258         if (!memb->slot) {
0259             log_error(ls, "no free slot found");
0260             kfree(array);
0261             return -1;
0262         }
0263     }
0264 
0265     gen++;
0266 
0267     log_slots(ls, gen, num, NULL, array, array_size);
0268 
0269     max_slots = (DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom) -
0270              sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
0271 
0272     if (num > max_slots) {
0273         log_error(ls, "num_slots %d exceeds max_slots %d",
0274               num, max_slots);
0275         kfree(array);
0276         return -1;
0277     }
0278 
0279     *gen_out = gen;
0280     *slots_out = array;
0281     *slots_size = array_size;
0282     *num_slots = num;
0283     return 0;
0284 }
0285 
0286 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
0287 {
0288     struct dlm_member *memb = NULL;
0289     struct list_head *tmp;
0290     struct list_head *newlist = &new->list;
0291     struct list_head *head = &ls->ls_nodes;
0292 
0293     list_for_each(tmp, head) {
0294         memb = list_entry(tmp, struct dlm_member, list);
0295         if (new->nodeid < memb->nodeid)
0296             break;
0297     }
0298 
0299     if (!memb)
0300         list_add_tail(newlist, head);
0301     else {
0302         /* FIXME: can use list macro here */
0303         newlist->prev = tmp->prev;
0304         newlist->next = tmp;
0305         tmp->prev->next = newlist;
0306         tmp->prev = newlist;
0307     }
0308 }
0309 
0310 static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
0311 {
0312     struct dlm_member *memb;
0313     int error;
0314 
0315     memb = kzalloc(sizeof(*memb), GFP_NOFS);
0316     if (!memb)
0317         return -ENOMEM;
0318 
0319     error = dlm_lowcomms_connect_node(node->nodeid);
0320     if (error < 0) {
0321         kfree(memb);
0322         return error;
0323     }
0324 
0325     memb->nodeid = node->nodeid;
0326     memb->weight = node->weight;
0327     memb->comm_seq = node->comm_seq;
0328     dlm_midcomms_add_member(node->nodeid);
0329     add_ordered_member(ls, memb);
0330     ls->ls_num_nodes++;
0331     return 0;
0332 }
0333 
0334 static struct dlm_member *find_memb(struct list_head *head, int nodeid)
0335 {
0336     struct dlm_member *memb;
0337 
0338     list_for_each_entry(memb, head, list) {
0339         if (memb->nodeid == nodeid)
0340             return memb;
0341     }
0342     return NULL;
0343 }
0344 
0345 int dlm_is_member(struct dlm_ls *ls, int nodeid)
0346 {
0347     if (find_memb(&ls->ls_nodes, nodeid))
0348         return 1;
0349     return 0;
0350 }
0351 
0352 int dlm_is_removed(struct dlm_ls *ls, int nodeid)
0353 {
0354     if (find_memb(&ls->ls_nodes_gone, nodeid))
0355         return 1;
0356     return 0;
0357 }
0358 
0359 static void clear_memb_list(struct list_head *head,
0360                 void (*after_del)(int nodeid))
0361 {
0362     struct dlm_member *memb;
0363 
0364     while (!list_empty(head)) {
0365         memb = list_entry(head->next, struct dlm_member, list);
0366         list_del(&memb->list);
0367         if (after_del)
0368             after_del(memb->nodeid);
0369         kfree(memb);
0370     }
0371 }
0372 
0373 static void clear_members_cb(int nodeid)
0374 {
0375     dlm_midcomms_remove_member(nodeid);
0376 }
0377 
0378 void dlm_clear_members(struct dlm_ls *ls)
0379 {
0380     clear_memb_list(&ls->ls_nodes, clear_members_cb);
0381     ls->ls_num_nodes = 0;
0382 }
0383 
0384 void dlm_clear_members_gone(struct dlm_ls *ls)
0385 {
0386     clear_memb_list(&ls->ls_nodes_gone, NULL);
0387 }
0388 
0389 static void make_member_array(struct dlm_ls *ls)
0390 {
0391     struct dlm_member *memb;
0392     int i, w, x = 0, total = 0, all_zero = 0, *array;
0393 
0394     kfree(ls->ls_node_array);
0395     ls->ls_node_array = NULL;
0396 
0397     list_for_each_entry(memb, &ls->ls_nodes, list) {
0398         if (memb->weight)
0399             total += memb->weight;
0400     }
0401 
0402     /* all nodes revert to weight of 1 if all have weight 0 */
0403 
0404     if (!total) {
0405         total = ls->ls_num_nodes;
0406         all_zero = 1;
0407     }
0408 
0409     ls->ls_total_weight = total;
0410     array = kmalloc_array(total, sizeof(*array), GFP_NOFS);
0411     if (!array)
0412         return;
0413 
0414     list_for_each_entry(memb, &ls->ls_nodes, list) {
0415         if (!all_zero && !memb->weight)
0416             continue;
0417 
0418         if (all_zero)
0419             w = 1;
0420         else
0421             w = memb->weight;
0422 
0423         DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
0424 
0425         for (i = 0; i < w; i++)
0426             array[x++] = memb->nodeid;
0427     }
0428 
0429     ls->ls_node_array = array;
0430 }
0431 
0432 /* send a status request to all members just to establish comms connections */
0433 
0434 static int ping_members(struct dlm_ls *ls)
0435 {
0436     struct dlm_member *memb;
0437     int error = 0;
0438 
0439     list_for_each_entry(memb, &ls->ls_nodes, list) {
0440         if (dlm_recovery_stopped(ls)) {
0441             error = -EINTR;
0442             break;
0443         }
0444         error = dlm_rcom_status(ls, memb->nodeid, 0);
0445         if (error)
0446             break;
0447     }
0448     if (error)
0449         log_rinfo(ls, "ping_members aborted %d last nodeid %d",
0450               error, ls->ls_recover_nodeid);
0451     return error;
0452 }
0453 
0454 static void dlm_lsop_recover_prep(struct dlm_ls *ls)
0455 {
0456     if (!ls->ls_ops || !ls->ls_ops->recover_prep)
0457         return;
0458     ls->ls_ops->recover_prep(ls->ls_ops_arg);
0459 }
0460 
0461 static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
0462 {
0463     struct dlm_slot slot;
0464     uint32_t seq;
0465     int error;
0466 
0467     if (!ls->ls_ops || !ls->ls_ops->recover_slot)
0468         return;
0469 
0470     /* if there is no comms connection with this node
0471        or the present comms connection is newer
0472        than the one when this member was added, then
0473        we consider the node to have failed (versus
0474        being removed due to dlm_release_lockspace) */
0475 
0476     error = dlm_comm_seq(memb->nodeid, &seq);
0477 
0478     if (!error && seq == memb->comm_seq)
0479         return;
0480 
0481     slot.nodeid = memb->nodeid;
0482     slot.slot = memb->slot;
0483 
0484     ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
0485 }
0486 
0487 void dlm_lsop_recover_done(struct dlm_ls *ls)
0488 {
0489     struct dlm_member *memb;
0490     struct dlm_slot *slots;
0491     int i, num;
0492 
0493     if (!ls->ls_ops || !ls->ls_ops->recover_done)
0494         return;
0495 
0496     num = ls->ls_num_nodes;
0497     slots = kcalloc(num, sizeof(*slots), GFP_KERNEL);
0498     if (!slots)
0499         return;
0500 
0501     i = 0;
0502     list_for_each_entry(memb, &ls->ls_nodes, list) {
0503         if (i == num) {
0504             log_error(ls, "dlm_lsop_recover_done bad num %d", num);
0505             goto out;
0506         }
0507         slots[i].nodeid = memb->nodeid;
0508         slots[i].slot = memb->slot;
0509         i++;
0510     }
0511 
0512     ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
0513                  ls->ls_slot, ls->ls_generation);
0514  out:
0515     kfree(slots);
0516 }
0517 
0518 static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
0519                         int nodeid)
0520 {
0521     int i;
0522 
0523     for (i = 0; i < rv->nodes_count; i++) {
0524         if (rv->nodes[i].nodeid == nodeid)
0525             return &rv->nodes[i];
0526     }
0527     return NULL;
0528 }
0529 
0530 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
0531 {
0532     struct dlm_member *memb, *safe;
0533     struct dlm_config_node *node;
0534     int i, error, neg = 0, low = -1;
0535 
0536     /* previously removed members that we've not finished removing need to
0537      * count as a negative change so the "neg" recovery steps will happen
0538      *
0539      * This functionality must report all member changes to lsops or
0540      * midcomms layer and must never return before.
0541      */
0542 
0543     list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
0544         log_rinfo(ls, "prev removed member %d", memb->nodeid);
0545         neg++;
0546     }
0547 
0548     /* move departed members from ls_nodes to ls_nodes_gone */
0549 
0550     list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
0551         node = find_config_node(rv, memb->nodeid);
0552         if (node && !node->new)
0553             continue;
0554 
0555         if (!node) {
0556             log_rinfo(ls, "remove member %d", memb->nodeid);
0557         } else {
0558             /* removed and re-added */
0559             log_rinfo(ls, "remove member %d comm_seq %u %u",
0560                   memb->nodeid, memb->comm_seq, node->comm_seq);
0561         }
0562 
0563         neg++;
0564         list_move(&memb->list, &ls->ls_nodes_gone);
0565         dlm_midcomms_remove_member(memb->nodeid);
0566         ls->ls_num_nodes--;
0567         dlm_lsop_recover_slot(ls, memb);
0568     }
0569 
0570     /* add new members to ls_nodes */
0571 
0572     for (i = 0; i < rv->nodes_count; i++) {
0573         node = &rv->nodes[i];
0574         if (dlm_is_member(ls, node->nodeid))
0575             continue;
0576         dlm_add_member(ls, node);
0577         log_rinfo(ls, "add member %d", node->nodeid);
0578     }
0579 
0580     list_for_each_entry(memb, &ls->ls_nodes, list) {
0581         if (low == -1 || memb->nodeid < low)
0582             low = memb->nodeid;
0583     }
0584     ls->ls_low_nodeid = low;
0585 
0586     make_member_array(ls);
0587     *neg_out = neg;
0588 
0589     error = ping_members(ls);
0590     log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
0591     return error;
0592 }
0593 
0594 /* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
0595    dlm_ls_start() is called on any of them to start the new recovery. */
0596 
0597 int dlm_ls_stop(struct dlm_ls *ls)
0598 {
0599     int new;
0600 
0601     /*
0602      * Prevent dlm_recv from being in the middle of something when we do
0603      * the stop.  This includes ensuring dlm_recv isn't processing a
0604      * recovery message (rcom), while dlm_recoverd is aborting and
0605      * resetting things from an in-progress recovery.  i.e. we want
0606      * dlm_recoverd to abort its recovery without worrying about dlm_recv
0607      * processing an rcom at the same time.  Stopping dlm_recv also makes
0608      * it easy for dlm_receive_message() to check locking stopped and add a
0609      * message to the requestqueue without races.
0610      */
0611 
0612     down_write(&ls->ls_recv_active);
0613 
0614     /*
0615      * Abort any recovery that's in progress (see RECOVER_STOP,
0616      * dlm_recovery_stopped()) and tell any other threads running in the
0617      * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
0618      */
0619 
0620     spin_lock(&ls->ls_recover_lock);
0621     set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
0622     new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
0623     ls->ls_recover_seq++;
0624     spin_unlock(&ls->ls_recover_lock);
0625 
0626     /*
0627      * Let dlm_recv run again, now any normal messages will be saved on the
0628      * requestqueue for later.
0629      */
0630 
0631     up_write(&ls->ls_recv_active);
0632 
0633     /*
0634      * This in_recovery lock does two things:
0635      * 1) Keeps this function from returning until all threads are out
0636      *    of locking routines and locking is truly stopped.
0637      * 2) Keeps any new requests from being processed until it's unlocked
0638      *    when recovery is complete.
0639      */
0640 
0641     if (new) {
0642         set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
0643         wake_up_process(ls->ls_recoverd_task);
0644         wait_event(ls->ls_recover_lock_wait,
0645                test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
0646     }
0647 
0648     /*
0649      * The recoverd suspend/resume makes sure that dlm_recoverd (if
0650      * running) has noticed RECOVER_STOP above and quit processing the
0651      * previous recovery.
0652      */
0653 
0654     dlm_recoverd_suspend(ls);
0655 
0656     spin_lock(&ls->ls_recover_lock);
0657     kfree(ls->ls_slots);
0658     ls->ls_slots = NULL;
0659     ls->ls_num_slots = 0;
0660     ls->ls_slots_size = 0;
0661     ls->ls_recover_status = 0;
0662     spin_unlock(&ls->ls_recover_lock);
0663 
0664     dlm_recoverd_resume(ls);
0665 
0666     if (!ls->ls_recover_begin)
0667         ls->ls_recover_begin = jiffies;
0668 
0669     /* call recover_prep ops only once and not multiple times
0670      * for each possible dlm_ls_stop() when recovery is already
0671      * stopped.
0672      *
0673      * If we successful was able to clear LSFL_RUNNING bit and
0674      * it was set we know it is the first dlm_ls_stop() call.
0675      */
0676     if (new)
0677         dlm_lsop_recover_prep(ls);
0678 
0679     return 0;
0680 }
0681 
0682 int dlm_ls_start(struct dlm_ls *ls)
0683 {
0684     struct dlm_recover *rv, *rv_old;
0685     struct dlm_config_node *nodes = NULL;
0686     int error, count;
0687 
0688     rv = kzalloc(sizeof(*rv), GFP_NOFS);
0689     if (!rv)
0690         return -ENOMEM;
0691 
0692     error = dlm_config_nodes(ls->ls_name, &nodes, &count);
0693     if (error < 0)
0694         goto fail_rv;
0695 
0696     spin_lock(&ls->ls_recover_lock);
0697 
0698     /* the lockspace needs to be stopped before it can be started */
0699 
0700     if (!dlm_locking_stopped(ls)) {
0701         spin_unlock(&ls->ls_recover_lock);
0702         log_error(ls, "start ignored: lockspace running");
0703         error = -EINVAL;
0704         goto fail;
0705     }
0706 
0707     rv->nodes = nodes;
0708     rv->nodes_count = count;
0709     rv->seq = ++ls->ls_recover_seq;
0710     rv_old = ls->ls_recover_args;
0711     ls->ls_recover_args = rv;
0712     spin_unlock(&ls->ls_recover_lock);
0713 
0714     if (rv_old) {
0715         log_error(ls, "unused recovery %llx %d",
0716               (unsigned long long)rv_old->seq, rv_old->nodes_count);
0717         kfree(rv_old->nodes);
0718         kfree(rv_old);
0719     }
0720 
0721     set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
0722     wake_up_process(ls->ls_recoverd_task);
0723     return 0;
0724 
0725  fail:
0726     kfree(nodes);
0727  fail_rv:
0728     kfree(rv);
0729     return error;
0730 }
0731