0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 #include "dlm_internal.h"
0013 #include "lockspace.h"
0014 #include "member.h"
0015 #include "lowcomms.h"
0016 #include "rcom.h"
0017 #include "config.h"
0018 #include "memory.h"
0019 #include "recover.h"
0020 #include "util.h"
0021 #include "lock.h"
0022 #include "dir.h"
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
0034 {
0035 uint32_t node;
0036
0037 if (ls->ls_num_nodes == 1)
0038 return dlm_our_nodeid();
0039 else {
0040 node = (hash >> 16) % ls->ls_total_weight;
0041 return ls->ls_node_array[node];
0042 }
0043 }
0044
0045 int dlm_dir_nodeid(struct dlm_rsb *r)
0046 {
0047 return r->res_dir_nodeid;
0048 }
0049
0050 void dlm_recover_dir_nodeid(struct dlm_ls *ls)
0051 {
0052 struct dlm_rsb *r;
0053
0054 down_read(&ls->ls_root_sem);
0055 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
0056 r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
0057 }
0058 up_read(&ls->ls_root_sem);
0059 }
0060
0061 int dlm_recover_directory(struct dlm_ls *ls)
0062 {
0063 struct dlm_member *memb;
0064 char *b, *last_name = NULL;
0065 int error = -ENOMEM, last_len, nodeid, result;
0066 uint16_t namelen;
0067 unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
0068
0069 log_rinfo(ls, "dlm_recover_directory");
0070
0071 if (dlm_no_directory(ls))
0072 goto out_status;
0073
0074 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
0075 if (!last_name)
0076 goto out;
0077
0078 list_for_each_entry(memb, &ls->ls_nodes, list) {
0079 if (memb->nodeid == dlm_our_nodeid())
0080 continue;
0081
0082 memset(last_name, 0, DLM_RESNAME_MAXLEN);
0083 last_len = 0;
0084
0085 for (;;) {
0086 int left;
0087 if (dlm_recovery_stopped(ls)) {
0088 error = -EINTR;
0089 goto out_free;
0090 }
0091
0092 error = dlm_rcom_names(ls, memb->nodeid,
0093 last_name, last_len);
0094 if (error)
0095 goto out_free;
0096
0097 cond_resched();
0098
0099
0100
0101
0102
0103 b = ls->ls_recover_buf->rc_buf;
0104 left = le16_to_cpu(ls->ls_recover_buf->rc_header.h_length);
0105 left -= sizeof(struct dlm_rcom);
0106
0107 for (;;) {
0108 __be16 v;
0109
0110 error = -EINVAL;
0111 if (left < sizeof(__be16))
0112 goto out_free;
0113
0114 memcpy(&v, b, sizeof(__be16));
0115 namelen = be16_to_cpu(v);
0116 b += sizeof(__be16);
0117 left -= sizeof(__be16);
0118
0119
0120
0121
0122
0123 if (namelen == 0xFFFF)
0124 goto done;
0125 if (!namelen)
0126 break;
0127
0128 if (namelen > left)
0129 goto out_free;
0130
0131 if (namelen > DLM_RESNAME_MAXLEN)
0132 goto out_free;
0133
0134 error = dlm_master_lookup(ls, memb->nodeid,
0135 b, namelen,
0136 DLM_LU_RECOVER_DIR,
0137 &nodeid, &result);
0138 if (error) {
0139 log_error(ls, "recover_dir lookup %d",
0140 error);
0141 goto out_free;
0142 }
0143
0144
0145
0146
0147
0148
0149 if (result == DLM_LU_MATCH &&
0150 nodeid != memb->nodeid) {
0151 count_bad++;
0152 log_error(ls, "recover_dir lookup %d "
0153 "nodeid %d memb %d bad %u",
0154 result, nodeid, memb->nodeid,
0155 count_bad);
0156 print_hex_dump_bytes("dlm_recover_dir ",
0157 DUMP_PREFIX_NONE,
0158 b, namelen);
0159 }
0160
0161
0162
0163
0164 if (result == DLM_LU_MATCH &&
0165 nodeid == memb->nodeid) {
0166 count_match++;
0167 }
0168
0169
0170
0171
0172 if (result == DLM_LU_ADD) {
0173 count_add++;
0174 }
0175
0176 last_len = namelen;
0177 memcpy(last_name, b, namelen);
0178 b += namelen;
0179 left -= namelen;
0180 count++;
0181 }
0182 }
0183 done:
0184 ;
0185 }
0186
0187 out_status:
0188 error = 0;
0189 dlm_set_recover_status(ls, DLM_RS_DIR);
0190
0191 log_rinfo(ls, "dlm_recover_directory %u in %u new",
0192 count, count_add);
0193 out_free:
0194 kfree(last_name);
0195 out:
0196 return error;
0197 }
0198
0199 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
0200 {
0201 struct dlm_rsb *r;
0202 uint32_t hash, bucket;
0203 int rv;
0204
0205 hash = jhash(name, len, 0);
0206 bucket = hash & (ls->ls_rsbtbl_size - 1);
0207
0208 spin_lock(&ls->ls_rsbtbl[bucket].lock);
0209 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
0210 if (rv)
0211 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
0212 name, len, &r);
0213 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
0214
0215 if (!rv)
0216 return r;
0217
0218 down_read(&ls->ls_root_sem);
0219 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
0220 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
0221 up_read(&ls->ls_root_sem);
0222 log_debug(ls, "find_rsb_root revert to root_list %s",
0223 r->res_name);
0224 return r;
0225 }
0226 }
0227 up_read(&ls->ls_root_sem);
0228 return NULL;
0229 }
0230
0231
0232
0233
0234
0235 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
0236 char *outbuf, int outlen, int nodeid)
0237 {
0238 struct list_head *list;
0239 struct dlm_rsb *r;
0240 int offset = 0, dir_nodeid;
0241 __be16 be_namelen;
0242
0243 down_read(&ls->ls_root_sem);
0244
0245 if (inlen > 1) {
0246 r = find_rsb_root(ls, inbuf, inlen);
0247 if (!r) {
0248 inbuf[inlen - 1] = '\0';
0249 log_error(ls, "copy_master_names from %d start %d %s",
0250 nodeid, inlen, inbuf);
0251 goto out;
0252 }
0253 list = r->res_root_list.next;
0254 } else {
0255 list = ls->ls_root_list.next;
0256 }
0257
0258 for (offset = 0; list != &ls->ls_root_list; list = list->next) {
0259 r = list_entry(list, struct dlm_rsb, res_root_list);
0260 if (r->res_nodeid)
0261 continue;
0262
0263 dir_nodeid = dlm_dir_nodeid(r);
0264 if (dir_nodeid != nodeid)
0265 continue;
0266
0267
0268
0269
0270
0271
0272
0273
0274
0275 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
0276
0277 be_namelen = cpu_to_be16(0);
0278 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
0279 offset += sizeof(__be16);
0280 ls->ls_recover_dir_sent_msg++;
0281 goto out;
0282 }
0283
0284 be_namelen = cpu_to_be16(r->res_length);
0285 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
0286 offset += sizeof(__be16);
0287 memcpy(outbuf + offset, r->res_name, r->res_length);
0288 offset += r->res_length;
0289 ls->ls_recover_dir_sent_res++;
0290 }
0291
0292
0293
0294
0295
0296
0297 if ((list == &ls->ls_root_list) &&
0298 (offset + sizeof(uint16_t) <= outlen)) {
0299 be_namelen = cpu_to_be16(0xFFFF);
0300 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
0301 offset += sizeof(__be16);
0302 ls->ls_recover_dir_sent_msg++;
0303 }
0304 out:
0305 up_read(&ls->ls_root_sem);
0306 }
0307