Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-only
0002 /******************************************************************************
0003 *******************************************************************************
0004 **
0005 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
0006 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
0007 **
0008 **
0009 *******************************************************************************
0010 ******************************************************************************/
0011 
0012 #include "dlm_internal.h"
0013 #include "lockspace.h"
0014 #include "member.h"
0015 #include "lowcomms.h"
0016 #include "rcom.h"
0017 #include "config.h"
0018 #include "memory.h"
0019 #include "recover.h"
0020 #include "util.h"
0021 #include "lock.h"
0022 #include "dir.h"
0023 
0024 /*
0025  * We use the upper 16 bits of the hash value to select the directory node.
0026  * Low bits are used for distribution of rsb's among hash buckets on each node.
0027  *
0028  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
0029  * num_nodes to the hash value.  This value in the desired range is used as an
0030  * offset into the sorted list of nodeid's to give the particular nodeid.
0031  */
0032 
0033 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
0034 {
0035     uint32_t node;
0036 
0037     if (ls->ls_num_nodes == 1)
0038         return dlm_our_nodeid();
0039     else {
0040         node = (hash >> 16) % ls->ls_total_weight;
0041         return ls->ls_node_array[node];
0042     }
0043 }
0044 
0045 int dlm_dir_nodeid(struct dlm_rsb *r)
0046 {
0047     return r->res_dir_nodeid;
0048 }
0049 
0050 void dlm_recover_dir_nodeid(struct dlm_ls *ls)
0051 {
0052     struct dlm_rsb *r;
0053 
0054     down_read(&ls->ls_root_sem);
0055     list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
0056         r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
0057     }
0058     up_read(&ls->ls_root_sem);
0059 }
0060 
0061 int dlm_recover_directory(struct dlm_ls *ls)
0062 {
0063     struct dlm_member *memb;
0064     char *b, *last_name = NULL;
0065     int error = -ENOMEM, last_len, nodeid, result;
0066     uint16_t namelen;
0067     unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
0068 
0069     log_rinfo(ls, "dlm_recover_directory");
0070 
0071     if (dlm_no_directory(ls))
0072         goto out_status;
0073 
0074     last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
0075     if (!last_name)
0076         goto out;
0077 
0078     list_for_each_entry(memb, &ls->ls_nodes, list) {
0079         if (memb->nodeid == dlm_our_nodeid())
0080             continue;
0081 
0082         memset(last_name, 0, DLM_RESNAME_MAXLEN);
0083         last_len = 0;
0084 
0085         for (;;) {
0086             int left;
0087             if (dlm_recovery_stopped(ls)) {
0088                 error = -EINTR;
0089                 goto out_free;
0090             }
0091 
0092             error = dlm_rcom_names(ls, memb->nodeid,
0093                            last_name, last_len);
0094             if (error)
0095                 goto out_free;
0096 
0097             cond_resched();
0098 
0099             /*
0100              * pick namelen/name pairs out of received buffer
0101              */
0102 
0103             b = ls->ls_recover_buf->rc_buf;
0104             left = le16_to_cpu(ls->ls_recover_buf->rc_header.h_length);
0105             left -= sizeof(struct dlm_rcom);
0106 
0107             for (;;) {
0108                 __be16 v;
0109 
0110                 error = -EINVAL;
0111                 if (left < sizeof(__be16))
0112                     goto out_free;
0113 
0114                 memcpy(&v, b, sizeof(__be16));
0115                 namelen = be16_to_cpu(v);
0116                 b += sizeof(__be16);
0117                 left -= sizeof(__be16);
0118 
0119                 /* namelen of 0xFFFFF marks end of names for
0120                    this node; namelen of 0 marks end of the
0121                    buffer */
0122 
0123                 if (namelen == 0xFFFF)
0124                     goto done;
0125                 if (!namelen)
0126                     break;
0127 
0128                 if (namelen > left)
0129                     goto out_free;
0130 
0131                 if (namelen > DLM_RESNAME_MAXLEN)
0132                     goto out_free;
0133 
0134                 error = dlm_master_lookup(ls, memb->nodeid,
0135                               b, namelen,
0136                               DLM_LU_RECOVER_DIR,
0137                               &nodeid, &result);
0138                 if (error) {
0139                     log_error(ls, "recover_dir lookup %d",
0140                           error);
0141                     goto out_free;
0142                 }
0143 
0144                 /* The name was found in rsbtbl, but the
0145                  * master nodeid is different from
0146                  * memb->nodeid which says it is the master.
0147                  * This should not happen. */
0148 
0149                 if (result == DLM_LU_MATCH &&
0150                     nodeid != memb->nodeid) {
0151                     count_bad++;
0152                     log_error(ls, "recover_dir lookup %d "
0153                           "nodeid %d memb %d bad %u",
0154                           result, nodeid, memb->nodeid,
0155                           count_bad);
0156                     print_hex_dump_bytes("dlm_recover_dir ",
0157                                  DUMP_PREFIX_NONE,
0158                                  b, namelen);
0159                 }
0160 
0161                 /* The name was found in rsbtbl, and the
0162                  * master nodeid matches memb->nodeid. */
0163 
0164                 if (result == DLM_LU_MATCH &&
0165                     nodeid == memb->nodeid) {
0166                     count_match++;
0167                 }
0168 
0169                 /* The name was not found in rsbtbl and was
0170                  * added with memb->nodeid as the master. */
0171 
0172                 if (result == DLM_LU_ADD) {
0173                     count_add++;
0174                 }
0175 
0176                 last_len = namelen;
0177                 memcpy(last_name, b, namelen);
0178                 b += namelen;
0179                 left -= namelen;
0180                 count++;
0181             }
0182         }
0183      done:
0184         ;
0185     }
0186 
0187  out_status:
0188     error = 0;
0189     dlm_set_recover_status(ls, DLM_RS_DIR);
0190 
0191     log_rinfo(ls, "dlm_recover_directory %u in %u new",
0192           count, count_add);
0193  out_free:
0194     kfree(last_name);
0195  out:
0196     return error;
0197 }
0198 
0199 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
0200 {
0201     struct dlm_rsb *r;
0202     uint32_t hash, bucket;
0203     int rv;
0204 
0205     hash = jhash(name, len, 0);
0206     bucket = hash & (ls->ls_rsbtbl_size - 1);
0207 
0208     spin_lock(&ls->ls_rsbtbl[bucket].lock);
0209     rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
0210     if (rv)
0211         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
0212                      name, len, &r);
0213     spin_unlock(&ls->ls_rsbtbl[bucket].lock);
0214 
0215     if (!rv)
0216         return r;
0217 
0218     down_read(&ls->ls_root_sem);
0219     list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
0220         if (len == r->res_length && !memcmp(name, r->res_name, len)) {
0221             up_read(&ls->ls_root_sem);
0222             log_debug(ls, "find_rsb_root revert to root_list %s",
0223                   r->res_name);
0224             return r;
0225         }
0226     }
0227     up_read(&ls->ls_root_sem);
0228     return NULL;
0229 }
0230 
0231 /* Find the rsb where we left off (or start again), then send rsb names
0232    for rsb's we're master of and whose directory node matches the requesting
0233    node.  inbuf is the rsb name last sent, inlen is the name's length */
0234 
0235 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
0236                char *outbuf, int outlen, int nodeid)
0237 {
0238     struct list_head *list;
0239     struct dlm_rsb *r;
0240     int offset = 0, dir_nodeid;
0241     __be16 be_namelen;
0242 
0243     down_read(&ls->ls_root_sem);
0244 
0245     if (inlen > 1) {
0246         r = find_rsb_root(ls, inbuf, inlen);
0247         if (!r) {
0248             inbuf[inlen - 1] = '\0';
0249             log_error(ls, "copy_master_names from %d start %d %s",
0250                   nodeid, inlen, inbuf);
0251             goto out;
0252         }
0253         list = r->res_root_list.next;
0254     } else {
0255         list = ls->ls_root_list.next;
0256     }
0257 
0258     for (offset = 0; list != &ls->ls_root_list; list = list->next) {
0259         r = list_entry(list, struct dlm_rsb, res_root_list);
0260         if (r->res_nodeid)
0261             continue;
0262 
0263         dir_nodeid = dlm_dir_nodeid(r);
0264         if (dir_nodeid != nodeid)
0265             continue;
0266 
0267         /*
0268          * The block ends when we can't fit the following in the
0269          * remaining buffer space:
0270          * namelen (uint16_t) +
0271          * name (r->res_length) +
0272          * end-of-block record 0x0000 (uint16_t)
0273          */
0274 
0275         if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
0276             /* Write end-of-block record */
0277             be_namelen = cpu_to_be16(0);
0278             memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
0279             offset += sizeof(__be16);
0280             ls->ls_recover_dir_sent_msg++;
0281             goto out;
0282         }
0283 
0284         be_namelen = cpu_to_be16(r->res_length);
0285         memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
0286         offset += sizeof(__be16);
0287         memcpy(outbuf + offset, r->res_name, r->res_length);
0288         offset += r->res_length;
0289         ls->ls_recover_dir_sent_res++;
0290     }
0291 
0292     /*
0293      * If we've reached the end of the list (and there's room) write a
0294      * terminating record.
0295      */
0296 
0297     if ((list == &ls->ls_root_list) &&
0298         (offset + sizeof(uint16_t) <= outlen)) {
0299         be_namelen = cpu_to_be16(0xFFFF);
0300         memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
0301         offset += sizeof(__be16);
0302         ls->ls_recover_dir_sent_msg++;
0303     }
0304  out:
0305     up_read(&ls->ls_root_sem);
0306 }
0307