0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #include "dlm_internal.h"
0012 #include "lockspace.h"
0013 #include "member.h"
0014 #include "recoverd.h"
0015 #include "recover.h"
0016 #include "rcom.h"
0017 #include "config.h"
0018 #include "midcomms.h"
0019 #include "lowcomms.h"
0020
0021 int dlm_slots_version(struct dlm_header *h)
0022 {
0023 if ((le32_to_cpu(h->h_version) & 0x0000FFFF) < DLM_HEADER_SLOTS)
0024 return 0;
0025 return 1;
0026 }
0027
0028 void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
0029 struct dlm_member *memb)
0030 {
0031 struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
0032
0033 if (!dlm_slots_version(&rc->rc_header))
0034 return;
0035
0036 memb->slot = le16_to_cpu(rf->rf_our_slot);
0037 memb->generation = le32_to_cpu(rf->rf_generation);
0038 }
0039
0040 void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
0041 {
0042 struct dlm_slot *slot;
0043 struct rcom_slot *ro;
0044 int i;
0045
0046 ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
0047
0048
0049
0050 for (i = 0; i < ls->ls_slots_size; i++) {
0051 slot = &ls->ls_slots[i];
0052 if (!slot->nodeid)
0053 continue;
0054 ro->ro_nodeid = cpu_to_le32(slot->nodeid);
0055 ro->ro_slot = cpu_to_le16(slot->slot);
0056 ro++;
0057 }
0058 }
0059
0060 #define SLOT_DEBUG_LINE 128
0061
0062 static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
0063 struct rcom_slot *ro0, struct dlm_slot *array,
0064 int array_size)
0065 {
0066 char line[SLOT_DEBUG_LINE];
0067 int len = SLOT_DEBUG_LINE - 1;
0068 int pos = 0;
0069 int ret, i;
0070
0071 memset(line, 0, sizeof(line));
0072
0073 if (array) {
0074 for (i = 0; i < array_size; i++) {
0075 if (!array[i].nodeid)
0076 continue;
0077
0078 ret = snprintf(line + pos, len - pos, " %d:%d",
0079 array[i].slot, array[i].nodeid);
0080 if (ret >= len - pos)
0081 break;
0082 pos += ret;
0083 }
0084 } else if (ro0) {
0085 for (i = 0; i < num_slots; i++) {
0086 ret = snprintf(line + pos, len - pos, " %d:%d",
0087 ro0[i].ro_slot, ro0[i].ro_nodeid);
0088 if (ret >= len - pos)
0089 break;
0090 pos += ret;
0091 }
0092 }
0093
0094 log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
0095 }
0096
0097 int dlm_slots_copy_in(struct dlm_ls *ls)
0098 {
0099 struct dlm_member *memb;
0100 struct dlm_rcom *rc = ls->ls_recover_buf;
0101 struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
0102 struct rcom_slot *ro0, *ro;
0103 int our_nodeid = dlm_our_nodeid();
0104 int i, num_slots;
0105 uint32_t gen;
0106
0107 if (!dlm_slots_version(&rc->rc_header))
0108 return -1;
0109
0110 gen = le32_to_cpu(rf->rf_generation);
0111 if (gen <= ls->ls_generation) {
0112 log_error(ls, "dlm_slots_copy_in gen %u old %u",
0113 gen, ls->ls_generation);
0114 }
0115 ls->ls_generation = gen;
0116
0117 num_slots = le16_to_cpu(rf->rf_num_slots);
0118 if (!num_slots)
0119 return -1;
0120
0121 ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
0122
0123 log_slots(ls, gen, num_slots, ro0, NULL, 0);
0124
0125 list_for_each_entry(memb, &ls->ls_nodes, list) {
0126 for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
0127 if (le32_to_cpu(ro->ro_nodeid) != memb->nodeid)
0128 continue;
0129 memb->slot = le16_to_cpu(ro->ro_slot);
0130 memb->slot_prev = memb->slot;
0131 break;
0132 }
0133
0134 if (memb->nodeid == our_nodeid) {
0135 if (ls->ls_slot && ls->ls_slot != memb->slot) {
0136 log_error(ls, "dlm_slots_copy_in our slot "
0137 "changed %d %d", ls->ls_slot,
0138 memb->slot);
0139 return -1;
0140 }
0141
0142 if (!ls->ls_slot)
0143 ls->ls_slot = memb->slot;
0144 }
0145
0146 if (!memb->slot) {
0147 log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
0148 memb->nodeid);
0149 return -1;
0150 }
0151 }
0152
0153 return 0;
0154 }
0155
0156
0157
0158
0159
0160 int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
0161 struct dlm_slot **slots_out, uint32_t *gen_out)
0162 {
0163 struct dlm_member *memb;
0164 struct dlm_slot *array;
0165 int our_nodeid = dlm_our_nodeid();
0166 int array_size, max_slots, i;
0167 int need = 0;
0168 int max = 0;
0169 int num = 0;
0170 uint32_t gen = 0;
0171
0172
0173
0174 list_for_each_entry(memb, &ls->ls_nodes, list) {
0175 if (memb->nodeid == our_nodeid) {
0176 memb->slot = ls->ls_slot;
0177 memb->generation = ls->ls_generation;
0178 break;
0179 }
0180 }
0181
0182 list_for_each_entry(memb, &ls->ls_nodes, list) {
0183 if (memb->generation > gen)
0184 gen = memb->generation;
0185
0186
0187
0188 if (memb->slot == -1)
0189 return -1;
0190
0191
0192
0193 if (!memb->slot)
0194 need++;
0195
0196
0197
0198 num++;
0199
0200 if (!max || max < memb->slot)
0201 max = memb->slot;
0202
0203
0204
0205 if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
0206 log_error(ls, "nodeid %d slot changed %d %d",
0207 memb->nodeid, memb->slot_prev, memb->slot);
0208 return -1;
0209 }
0210 memb->slot_prev = memb->slot;
0211 }
0212
0213 array_size = max + need;
0214 array = kcalloc(array_size, sizeof(*array), GFP_NOFS);
0215 if (!array)
0216 return -ENOMEM;
0217
0218 num = 0;
0219
0220
0221
0222 list_for_each_entry(memb, &ls->ls_nodes, list) {
0223 if (!memb->slot)
0224 continue;
0225
0226 if (memb->slot > array_size) {
0227 log_error(ls, "invalid slot number %d", memb->slot);
0228 kfree(array);
0229 return -1;
0230 }
0231
0232 array[memb->slot - 1].nodeid = memb->nodeid;
0233 array[memb->slot - 1].slot = memb->slot;
0234 num++;
0235 }
0236
0237
0238
0239 list_for_each_entry(memb, &ls->ls_nodes, list) {
0240 if (memb->slot)
0241 continue;
0242
0243 for (i = 0; i < array_size; i++) {
0244 if (array[i].nodeid)
0245 continue;
0246
0247 memb->slot = i + 1;
0248 memb->slot_prev = memb->slot;
0249 array[i].nodeid = memb->nodeid;
0250 array[i].slot = memb->slot;
0251 num++;
0252
0253 if (!ls->ls_slot && memb->nodeid == our_nodeid)
0254 ls->ls_slot = memb->slot;
0255 break;
0256 }
0257
0258 if (!memb->slot) {
0259 log_error(ls, "no free slot found");
0260 kfree(array);
0261 return -1;
0262 }
0263 }
0264
0265 gen++;
0266
0267 log_slots(ls, gen, num, NULL, array, array_size);
0268
0269 max_slots = (DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom) -
0270 sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
0271
0272 if (num > max_slots) {
0273 log_error(ls, "num_slots %d exceeds max_slots %d",
0274 num, max_slots);
0275 kfree(array);
0276 return -1;
0277 }
0278
0279 *gen_out = gen;
0280 *slots_out = array;
0281 *slots_size = array_size;
0282 *num_slots = num;
0283 return 0;
0284 }
0285
0286 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
0287 {
0288 struct dlm_member *memb = NULL;
0289 struct list_head *tmp;
0290 struct list_head *newlist = &new->list;
0291 struct list_head *head = &ls->ls_nodes;
0292
0293 list_for_each(tmp, head) {
0294 memb = list_entry(tmp, struct dlm_member, list);
0295 if (new->nodeid < memb->nodeid)
0296 break;
0297 }
0298
0299 if (!memb)
0300 list_add_tail(newlist, head);
0301 else {
0302
0303 newlist->prev = tmp->prev;
0304 newlist->next = tmp;
0305 tmp->prev->next = newlist;
0306 tmp->prev = newlist;
0307 }
0308 }
0309
0310 static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
0311 {
0312 struct dlm_member *memb;
0313 int error;
0314
0315 memb = kzalloc(sizeof(*memb), GFP_NOFS);
0316 if (!memb)
0317 return -ENOMEM;
0318
0319 error = dlm_lowcomms_connect_node(node->nodeid);
0320 if (error < 0) {
0321 kfree(memb);
0322 return error;
0323 }
0324
0325 memb->nodeid = node->nodeid;
0326 memb->weight = node->weight;
0327 memb->comm_seq = node->comm_seq;
0328 dlm_midcomms_add_member(node->nodeid);
0329 add_ordered_member(ls, memb);
0330 ls->ls_num_nodes++;
0331 return 0;
0332 }
0333
0334 static struct dlm_member *find_memb(struct list_head *head, int nodeid)
0335 {
0336 struct dlm_member *memb;
0337
0338 list_for_each_entry(memb, head, list) {
0339 if (memb->nodeid == nodeid)
0340 return memb;
0341 }
0342 return NULL;
0343 }
0344
0345 int dlm_is_member(struct dlm_ls *ls, int nodeid)
0346 {
0347 if (find_memb(&ls->ls_nodes, nodeid))
0348 return 1;
0349 return 0;
0350 }
0351
0352 int dlm_is_removed(struct dlm_ls *ls, int nodeid)
0353 {
0354 if (find_memb(&ls->ls_nodes_gone, nodeid))
0355 return 1;
0356 return 0;
0357 }
0358
0359 static void clear_memb_list(struct list_head *head,
0360 void (*after_del)(int nodeid))
0361 {
0362 struct dlm_member *memb;
0363
0364 while (!list_empty(head)) {
0365 memb = list_entry(head->next, struct dlm_member, list);
0366 list_del(&memb->list);
0367 if (after_del)
0368 after_del(memb->nodeid);
0369 kfree(memb);
0370 }
0371 }
0372
0373 static void clear_members_cb(int nodeid)
0374 {
0375 dlm_midcomms_remove_member(nodeid);
0376 }
0377
0378 void dlm_clear_members(struct dlm_ls *ls)
0379 {
0380 clear_memb_list(&ls->ls_nodes, clear_members_cb);
0381 ls->ls_num_nodes = 0;
0382 }
0383
0384 void dlm_clear_members_gone(struct dlm_ls *ls)
0385 {
0386 clear_memb_list(&ls->ls_nodes_gone, NULL);
0387 }
0388
0389 static void make_member_array(struct dlm_ls *ls)
0390 {
0391 struct dlm_member *memb;
0392 int i, w, x = 0, total = 0, all_zero = 0, *array;
0393
0394 kfree(ls->ls_node_array);
0395 ls->ls_node_array = NULL;
0396
0397 list_for_each_entry(memb, &ls->ls_nodes, list) {
0398 if (memb->weight)
0399 total += memb->weight;
0400 }
0401
0402
0403
0404 if (!total) {
0405 total = ls->ls_num_nodes;
0406 all_zero = 1;
0407 }
0408
0409 ls->ls_total_weight = total;
0410 array = kmalloc_array(total, sizeof(*array), GFP_NOFS);
0411 if (!array)
0412 return;
0413
0414 list_for_each_entry(memb, &ls->ls_nodes, list) {
0415 if (!all_zero && !memb->weight)
0416 continue;
0417
0418 if (all_zero)
0419 w = 1;
0420 else
0421 w = memb->weight;
0422
0423 DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
0424
0425 for (i = 0; i < w; i++)
0426 array[x++] = memb->nodeid;
0427 }
0428
0429 ls->ls_node_array = array;
0430 }
0431
0432
0433
0434 static int ping_members(struct dlm_ls *ls)
0435 {
0436 struct dlm_member *memb;
0437 int error = 0;
0438
0439 list_for_each_entry(memb, &ls->ls_nodes, list) {
0440 if (dlm_recovery_stopped(ls)) {
0441 error = -EINTR;
0442 break;
0443 }
0444 error = dlm_rcom_status(ls, memb->nodeid, 0);
0445 if (error)
0446 break;
0447 }
0448 if (error)
0449 log_rinfo(ls, "ping_members aborted %d last nodeid %d",
0450 error, ls->ls_recover_nodeid);
0451 return error;
0452 }
0453
0454 static void dlm_lsop_recover_prep(struct dlm_ls *ls)
0455 {
0456 if (!ls->ls_ops || !ls->ls_ops->recover_prep)
0457 return;
0458 ls->ls_ops->recover_prep(ls->ls_ops_arg);
0459 }
0460
0461 static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
0462 {
0463 struct dlm_slot slot;
0464 uint32_t seq;
0465 int error;
0466
0467 if (!ls->ls_ops || !ls->ls_ops->recover_slot)
0468 return;
0469
0470
0471
0472
0473
0474
0475
0476 error = dlm_comm_seq(memb->nodeid, &seq);
0477
0478 if (!error && seq == memb->comm_seq)
0479 return;
0480
0481 slot.nodeid = memb->nodeid;
0482 slot.slot = memb->slot;
0483
0484 ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
0485 }
0486
0487 void dlm_lsop_recover_done(struct dlm_ls *ls)
0488 {
0489 struct dlm_member *memb;
0490 struct dlm_slot *slots;
0491 int i, num;
0492
0493 if (!ls->ls_ops || !ls->ls_ops->recover_done)
0494 return;
0495
0496 num = ls->ls_num_nodes;
0497 slots = kcalloc(num, sizeof(*slots), GFP_KERNEL);
0498 if (!slots)
0499 return;
0500
0501 i = 0;
0502 list_for_each_entry(memb, &ls->ls_nodes, list) {
0503 if (i == num) {
0504 log_error(ls, "dlm_lsop_recover_done bad num %d", num);
0505 goto out;
0506 }
0507 slots[i].nodeid = memb->nodeid;
0508 slots[i].slot = memb->slot;
0509 i++;
0510 }
0511
0512 ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
0513 ls->ls_slot, ls->ls_generation);
0514 out:
0515 kfree(slots);
0516 }
0517
0518 static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
0519 int nodeid)
0520 {
0521 int i;
0522
0523 for (i = 0; i < rv->nodes_count; i++) {
0524 if (rv->nodes[i].nodeid == nodeid)
0525 return &rv->nodes[i];
0526 }
0527 return NULL;
0528 }
0529
0530 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
0531 {
0532 struct dlm_member *memb, *safe;
0533 struct dlm_config_node *node;
0534 int i, error, neg = 0, low = -1;
0535
0536
0537
0538
0539
0540
0541
0542
0543 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
0544 log_rinfo(ls, "prev removed member %d", memb->nodeid);
0545 neg++;
0546 }
0547
0548
0549
0550 list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
0551 node = find_config_node(rv, memb->nodeid);
0552 if (node && !node->new)
0553 continue;
0554
0555 if (!node) {
0556 log_rinfo(ls, "remove member %d", memb->nodeid);
0557 } else {
0558
0559 log_rinfo(ls, "remove member %d comm_seq %u %u",
0560 memb->nodeid, memb->comm_seq, node->comm_seq);
0561 }
0562
0563 neg++;
0564 list_move(&memb->list, &ls->ls_nodes_gone);
0565 dlm_midcomms_remove_member(memb->nodeid);
0566 ls->ls_num_nodes--;
0567 dlm_lsop_recover_slot(ls, memb);
0568 }
0569
0570
0571
0572 for (i = 0; i < rv->nodes_count; i++) {
0573 node = &rv->nodes[i];
0574 if (dlm_is_member(ls, node->nodeid))
0575 continue;
0576 dlm_add_member(ls, node);
0577 log_rinfo(ls, "add member %d", node->nodeid);
0578 }
0579
0580 list_for_each_entry(memb, &ls->ls_nodes, list) {
0581 if (low == -1 || memb->nodeid < low)
0582 low = memb->nodeid;
0583 }
0584 ls->ls_low_nodeid = low;
0585
0586 make_member_array(ls);
0587 *neg_out = neg;
0588
0589 error = ping_members(ls);
0590 log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
0591 return error;
0592 }
0593
0594
0595
0596
0597 int dlm_ls_stop(struct dlm_ls *ls)
0598 {
0599 int new;
0600
0601
0602
0603
0604
0605
0606
0607
0608
0609
0610
0611
0612 down_write(&ls->ls_recv_active);
0613
0614
0615
0616
0617
0618
0619
0620 spin_lock(&ls->ls_recover_lock);
0621 set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
0622 new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
0623 ls->ls_recover_seq++;
0624 spin_unlock(&ls->ls_recover_lock);
0625
0626
0627
0628
0629
0630
0631 up_write(&ls->ls_recv_active);
0632
0633
0634
0635
0636
0637
0638
0639
0640
0641 if (new) {
0642 set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
0643 wake_up_process(ls->ls_recoverd_task);
0644 wait_event(ls->ls_recover_lock_wait,
0645 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
0646 }
0647
0648
0649
0650
0651
0652
0653
0654 dlm_recoverd_suspend(ls);
0655
0656 spin_lock(&ls->ls_recover_lock);
0657 kfree(ls->ls_slots);
0658 ls->ls_slots = NULL;
0659 ls->ls_num_slots = 0;
0660 ls->ls_slots_size = 0;
0661 ls->ls_recover_status = 0;
0662 spin_unlock(&ls->ls_recover_lock);
0663
0664 dlm_recoverd_resume(ls);
0665
0666 if (!ls->ls_recover_begin)
0667 ls->ls_recover_begin = jiffies;
0668
0669
0670
0671
0672
0673
0674
0675
0676 if (new)
0677 dlm_lsop_recover_prep(ls);
0678
0679 return 0;
0680 }
0681
0682 int dlm_ls_start(struct dlm_ls *ls)
0683 {
0684 struct dlm_recover *rv, *rv_old;
0685 struct dlm_config_node *nodes = NULL;
0686 int error, count;
0687
0688 rv = kzalloc(sizeof(*rv), GFP_NOFS);
0689 if (!rv)
0690 return -ENOMEM;
0691
0692 error = dlm_config_nodes(ls->ls_name, &nodes, &count);
0693 if (error < 0)
0694 goto fail_rv;
0695
0696 spin_lock(&ls->ls_recover_lock);
0697
0698
0699
0700 if (!dlm_locking_stopped(ls)) {
0701 spin_unlock(&ls->ls_recover_lock);
0702 log_error(ls, "start ignored: lockspace running");
0703 error = -EINVAL;
0704 goto fail;
0705 }
0706
0707 rv->nodes = nodes;
0708 rv->nodes_count = count;
0709 rv->seq = ++ls->ls_recover_seq;
0710 rv_old = ls->ls_recover_args;
0711 ls->ls_recover_args = rv;
0712 spin_unlock(&ls->ls_recover_lock);
0713
0714 if (rv_old) {
0715 log_error(ls, "unused recovery %llx %d",
0716 (unsigned long long)rv_old->seq, rv_old->nodes_count);
0717 kfree(rv_old->nodes);
0718 kfree(rv_old);
0719 }
0720
0721 set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
0722 wake_up_process(ls->ls_recoverd_task);
0723 return 0;
0724
0725 fail:
0726 kfree(nodes);
0727 fail_rv:
0728 kfree(rv);
0729 return error;
0730 }
0731