0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 #include <linux/module.h>
0013
0014 #include "dlm_internal.h"
0015 #include "lockspace.h"
0016 #include "member.h"
0017 #include "recoverd.h"
0018 #include "dir.h"
0019 #include "midcomms.h"
0020 #include "lowcomms.h"
0021 #include "config.h"
0022 #include "memory.h"
0023 #include "lock.h"
0024 #include "recover.h"
0025 #include "requestqueue.h"
0026 #include "user.h"
0027 #include "ast.h"
0028
0029 static int ls_count;
0030 static struct mutex ls_lock;
0031 static struct list_head lslist;
0032 static spinlock_t lslist_lock;
0033 static struct task_struct * scand_task;
0034
0035
0036 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
0037 {
0038 ssize_t ret = len;
0039 int n;
0040 int rc = kstrtoint(buf, 0, &n);
0041
0042 if (rc)
0043 return rc;
0044 ls = dlm_find_lockspace_local(ls->ls_local_handle);
0045 if (!ls)
0046 return -EINVAL;
0047
0048 switch (n) {
0049 case 0:
0050 dlm_ls_stop(ls);
0051 break;
0052 case 1:
0053 dlm_ls_start(ls);
0054 break;
0055 default:
0056 ret = -EINVAL;
0057 }
0058 dlm_put_lockspace(ls);
0059 return ret;
0060 }
0061
0062 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
0063 {
0064 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
0065
0066 if (rc)
0067 return rc;
0068 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
0069 wake_up(&ls->ls_uevent_wait);
0070 return len;
0071 }
0072
0073 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
0074 {
0075 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
0076 }
0077
0078 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
0079 {
0080 int rc = kstrtouint(buf, 0, &ls->ls_global_id);
0081
0082 if (rc)
0083 return rc;
0084 return len;
0085 }
0086
0087 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
0088 {
0089 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
0090 }
0091
0092 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
0093 {
0094 int val;
0095 int rc = kstrtoint(buf, 0, &val);
0096
0097 if (rc)
0098 return rc;
0099 if (val == 1)
0100 set_bit(LSFL_NODIR, &ls->ls_flags);
0101 return len;
0102 }
0103
0104 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
0105 {
0106 uint32_t status = dlm_recover_status(ls);
0107 return snprintf(buf, PAGE_SIZE, "%x\n", status);
0108 }
0109
0110 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
0111 {
0112 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
0113 }
0114
0115 struct dlm_attr {
0116 struct attribute attr;
0117 ssize_t (*show)(struct dlm_ls *, char *);
0118 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
0119 };
0120
0121 static struct dlm_attr dlm_attr_control = {
0122 .attr = {.name = "control", .mode = S_IWUSR},
0123 .store = dlm_control_store
0124 };
0125
0126 static struct dlm_attr dlm_attr_event = {
0127 .attr = {.name = "event_done", .mode = S_IWUSR},
0128 .store = dlm_event_store
0129 };
0130
0131 static struct dlm_attr dlm_attr_id = {
0132 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
0133 .show = dlm_id_show,
0134 .store = dlm_id_store
0135 };
0136
0137 static struct dlm_attr dlm_attr_nodir = {
0138 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
0139 .show = dlm_nodir_show,
0140 .store = dlm_nodir_store
0141 };
0142
0143 static struct dlm_attr dlm_attr_recover_status = {
0144 .attr = {.name = "recover_status", .mode = S_IRUGO},
0145 .show = dlm_recover_status_show
0146 };
0147
0148 static struct dlm_attr dlm_attr_recover_nodeid = {
0149 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
0150 .show = dlm_recover_nodeid_show
0151 };
0152
0153 static struct attribute *dlm_attrs[] = {
0154 &dlm_attr_control.attr,
0155 &dlm_attr_event.attr,
0156 &dlm_attr_id.attr,
0157 &dlm_attr_nodir.attr,
0158 &dlm_attr_recover_status.attr,
0159 &dlm_attr_recover_nodeid.attr,
0160 NULL,
0161 };
0162 ATTRIBUTE_GROUPS(dlm);
0163
0164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
0165 char *buf)
0166 {
0167 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
0168 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
0169 return a->show ? a->show(ls, buf) : 0;
0170 }
0171
0172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
0173 const char *buf, size_t len)
0174 {
0175 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
0176 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
0177 return a->store ? a->store(ls, buf, len) : len;
0178 }
0179
0180 static void lockspace_kobj_release(struct kobject *k)
0181 {
0182 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
0183 kfree(ls);
0184 }
0185
0186 static const struct sysfs_ops dlm_attr_ops = {
0187 .show = dlm_attr_show,
0188 .store = dlm_attr_store,
0189 };
0190
0191 static struct kobj_type dlm_ktype = {
0192 .default_groups = dlm_groups,
0193 .sysfs_ops = &dlm_attr_ops,
0194 .release = lockspace_kobj_release,
0195 };
0196
0197 static struct kset *dlm_kset;
0198
0199 static int do_uevent(struct dlm_ls *ls, int in)
0200 {
0201 if (in)
0202 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
0203 else
0204 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
0205
0206 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
0207
0208
0209
0210
0211 wait_event(ls->ls_uevent_wait,
0212 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
0213
0214 log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
0215
0216 return ls->ls_uevent_result;
0217 }
0218
0219 static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env)
0220 {
0221 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
0222
0223 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
0224 return 0;
0225 }
0226
0227 static const struct kset_uevent_ops dlm_uevent_ops = {
0228 .uevent = dlm_uevent,
0229 };
0230
0231 int __init dlm_lockspace_init(void)
0232 {
0233 ls_count = 0;
0234 mutex_init(&ls_lock);
0235 INIT_LIST_HEAD(&lslist);
0236 spin_lock_init(&lslist_lock);
0237
0238 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
0239 if (!dlm_kset) {
0240 printk(KERN_WARNING "%s: can not create kset\n", __func__);
0241 return -ENOMEM;
0242 }
0243 return 0;
0244 }
0245
0246 void dlm_lockspace_exit(void)
0247 {
0248 kset_unregister(dlm_kset);
0249 }
0250
0251 static struct dlm_ls *find_ls_to_scan(void)
0252 {
0253 struct dlm_ls *ls;
0254
0255 spin_lock(&lslist_lock);
0256 list_for_each_entry(ls, &lslist, ls_list) {
0257 if (time_after_eq(jiffies, ls->ls_scan_time +
0258 dlm_config.ci_scan_secs * HZ)) {
0259 spin_unlock(&lslist_lock);
0260 return ls;
0261 }
0262 }
0263 spin_unlock(&lslist_lock);
0264 return NULL;
0265 }
0266
0267 static int dlm_scand(void *data)
0268 {
0269 struct dlm_ls *ls;
0270
0271 while (!kthread_should_stop()) {
0272 ls = find_ls_to_scan();
0273 if (ls) {
0274 if (dlm_lock_recovery_try(ls)) {
0275 ls->ls_scan_time = jiffies;
0276 dlm_scan_rsbs(ls);
0277 dlm_scan_timeout(ls);
0278 dlm_unlock_recovery(ls);
0279 } else {
0280 ls->ls_scan_time += HZ;
0281 }
0282 continue;
0283 }
0284 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
0285 }
0286 return 0;
0287 }
0288
0289 static int dlm_scand_start(void)
0290 {
0291 struct task_struct *p;
0292 int error = 0;
0293
0294 p = kthread_run(dlm_scand, NULL, "dlm_scand");
0295 if (IS_ERR(p))
0296 error = PTR_ERR(p);
0297 else
0298 scand_task = p;
0299 return error;
0300 }
0301
0302 static void dlm_scand_stop(void)
0303 {
0304 kthread_stop(scand_task);
0305 }
0306
0307 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
0308 {
0309 struct dlm_ls *ls;
0310
0311 spin_lock(&lslist_lock);
0312
0313 list_for_each_entry(ls, &lslist, ls_list) {
0314 if (ls->ls_global_id == id) {
0315 atomic_inc(&ls->ls_count);
0316 goto out;
0317 }
0318 }
0319 ls = NULL;
0320 out:
0321 spin_unlock(&lslist_lock);
0322 return ls;
0323 }
0324
0325 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
0326 {
0327 struct dlm_ls *ls;
0328
0329 spin_lock(&lslist_lock);
0330 list_for_each_entry(ls, &lslist, ls_list) {
0331 if (ls->ls_local_handle == lockspace) {
0332 atomic_inc(&ls->ls_count);
0333 goto out;
0334 }
0335 }
0336 ls = NULL;
0337 out:
0338 spin_unlock(&lslist_lock);
0339 return ls;
0340 }
0341
0342 struct dlm_ls *dlm_find_lockspace_device(int minor)
0343 {
0344 struct dlm_ls *ls;
0345
0346 spin_lock(&lslist_lock);
0347 list_for_each_entry(ls, &lslist, ls_list) {
0348 if (ls->ls_device.minor == minor) {
0349 atomic_inc(&ls->ls_count);
0350 goto out;
0351 }
0352 }
0353 ls = NULL;
0354 out:
0355 spin_unlock(&lslist_lock);
0356 return ls;
0357 }
0358
0359 void dlm_put_lockspace(struct dlm_ls *ls)
0360 {
0361 if (atomic_dec_and_test(&ls->ls_count))
0362 wake_up(&ls->ls_count_wait);
0363 }
0364
0365 static void remove_lockspace(struct dlm_ls *ls)
0366 {
0367 retry:
0368 wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
0369
0370 spin_lock(&lslist_lock);
0371 if (atomic_read(&ls->ls_count) != 0) {
0372 spin_unlock(&lslist_lock);
0373 goto retry;
0374 }
0375
0376 WARN_ON(ls->ls_create_count != 0);
0377 list_del(&ls->ls_list);
0378 spin_unlock(&lslist_lock);
0379 }
0380
0381 static int threads_start(void)
0382 {
0383 int error;
0384
0385 error = dlm_scand_start();
0386 if (error) {
0387 log_print("cannot start dlm_scand thread %d", error);
0388 goto fail;
0389 }
0390
0391
0392 error = dlm_midcomms_start();
0393 if (error) {
0394 log_print("cannot start dlm lowcomms %d", error);
0395 goto scand_fail;
0396 }
0397
0398 return 0;
0399
0400 scand_fail:
0401 dlm_scand_stop();
0402 fail:
0403 return error;
0404 }
0405
0406 static int new_lockspace(const char *name, const char *cluster,
0407 uint32_t flags, int lvblen,
0408 const struct dlm_lockspace_ops *ops, void *ops_arg,
0409 int *ops_result, dlm_lockspace_t **lockspace)
0410 {
0411 struct dlm_ls *ls;
0412 int i, size, error;
0413 int do_unreg = 0;
0414 int namelen = strlen(name);
0415
0416 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
0417 return -EINVAL;
0418
0419 if (!lvblen || (lvblen % 8))
0420 return -EINVAL;
0421
0422 if (!try_module_get(THIS_MODULE))
0423 return -EINVAL;
0424
0425 if (!dlm_user_daemon_available()) {
0426 log_print("dlm user daemon not available");
0427 error = -EUNATCH;
0428 goto out;
0429 }
0430
0431 if (ops && ops_result) {
0432 if (!dlm_config.ci_recover_callbacks)
0433 *ops_result = -EOPNOTSUPP;
0434 else
0435 *ops_result = 0;
0436 }
0437
0438 if (!cluster)
0439 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
0440 dlm_config.ci_cluster_name);
0441
0442 if (dlm_config.ci_recover_callbacks && cluster &&
0443 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
0444 log_print("dlm cluster name '%s' does not match "
0445 "the application cluster name '%s'",
0446 dlm_config.ci_cluster_name, cluster);
0447 error = -EBADR;
0448 goto out;
0449 }
0450
0451 error = 0;
0452
0453 spin_lock(&lslist_lock);
0454 list_for_each_entry(ls, &lslist, ls_list) {
0455 WARN_ON(ls->ls_create_count <= 0);
0456 if (ls->ls_namelen != namelen)
0457 continue;
0458 if (memcmp(ls->ls_name, name, namelen))
0459 continue;
0460 if (flags & DLM_LSFL_NEWEXCL) {
0461 error = -EEXIST;
0462 break;
0463 }
0464 ls->ls_create_count++;
0465 *lockspace = ls;
0466 error = 1;
0467 break;
0468 }
0469 spin_unlock(&lslist_lock);
0470
0471 if (error)
0472 goto out;
0473
0474 error = -ENOMEM;
0475
0476 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
0477 if (!ls)
0478 goto out;
0479 memcpy(ls->ls_name, name, namelen);
0480 ls->ls_namelen = namelen;
0481 ls->ls_lvblen = lvblen;
0482 atomic_set(&ls->ls_count, 0);
0483 init_waitqueue_head(&ls->ls_count_wait);
0484 ls->ls_flags = 0;
0485 ls->ls_scan_time = jiffies;
0486
0487 if (ops && dlm_config.ci_recover_callbacks) {
0488 ls->ls_ops = ops;
0489 ls->ls_ops_arg = ops_arg;
0490 }
0491
0492 #ifdef CONFIG_DLM_DEPRECATED_API
0493 if (flags & DLM_LSFL_TIMEWARN) {
0494 pr_warn_once("===============================================================\n"
0495 "WARNING: the dlm DLM_LSFL_TIMEWARN flag is being deprecated and\n"
0496 " will be removed in v6.2!\n"
0497 " Inclusive DLM_LSFL_TIMEWARN define in UAPI header!\n"
0498 "===============================================================\n");
0499
0500 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
0501 }
0502
0503
0504
0505
0506 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
0507 DLM_LSFL_NEWEXCL));
0508 #else
0509
0510
0511
0512 ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
0513 #endif
0514
0515 size = READ_ONCE(dlm_config.ci_rsbtbl_size);
0516 ls->ls_rsbtbl_size = size;
0517
0518 ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
0519 if (!ls->ls_rsbtbl)
0520 goto out_lsfree;
0521 for (i = 0; i < size; i++) {
0522 ls->ls_rsbtbl[i].keep.rb_node = NULL;
0523 ls->ls_rsbtbl[i].toss.rb_node = NULL;
0524 spin_lock_init(&ls->ls_rsbtbl[i].lock);
0525 }
0526
0527 spin_lock_init(&ls->ls_remove_spin);
0528 init_waitqueue_head(&ls->ls_remove_wait);
0529
0530 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
0531 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
0532 GFP_KERNEL);
0533 if (!ls->ls_remove_names[i])
0534 goto out_rsbtbl;
0535 }
0536
0537 idr_init(&ls->ls_lkbidr);
0538 spin_lock_init(&ls->ls_lkbidr_spin);
0539
0540 INIT_LIST_HEAD(&ls->ls_waiters);
0541 mutex_init(&ls->ls_waiters_mutex);
0542 INIT_LIST_HEAD(&ls->ls_orphans);
0543 mutex_init(&ls->ls_orphans_mutex);
0544 #ifdef CONFIG_DLM_DEPRECATED_API
0545 INIT_LIST_HEAD(&ls->ls_timeout);
0546 mutex_init(&ls->ls_timeout_mutex);
0547 #endif
0548
0549 INIT_LIST_HEAD(&ls->ls_new_rsb);
0550 spin_lock_init(&ls->ls_new_rsb_spin);
0551
0552 INIT_LIST_HEAD(&ls->ls_nodes);
0553 INIT_LIST_HEAD(&ls->ls_nodes_gone);
0554 ls->ls_num_nodes = 0;
0555 ls->ls_low_nodeid = 0;
0556 ls->ls_total_weight = 0;
0557 ls->ls_node_array = NULL;
0558
0559 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
0560 ls->ls_stub_rsb.res_ls = ls;
0561
0562 ls->ls_debug_rsb_dentry = NULL;
0563 ls->ls_debug_waiters_dentry = NULL;
0564
0565 init_waitqueue_head(&ls->ls_uevent_wait);
0566 ls->ls_uevent_result = 0;
0567 init_completion(&ls->ls_recovery_done);
0568 ls->ls_recovery_result = -1;
0569
0570 mutex_init(&ls->ls_cb_mutex);
0571 INIT_LIST_HEAD(&ls->ls_cb_delay);
0572
0573 ls->ls_recoverd_task = NULL;
0574 mutex_init(&ls->ls_recoverd_active);
0575 spin_lock_init(&ls->ls_recover_lock);
0576 spin_lock_init(&ls->ls_rcom_spin);
0577 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
0578 ls->ls_recover_status = 0;
0579 ls->ls_recover_seq = 0;
0580 ls->ls_recover_args = NULL;
0581 init_rwsem(&ls->ls_in_recovery);
0582 init_rwsem(&ls->ls_recv_active);
0583 INIT_LIST_HEAD(&ls->ls_requestqueue);
0584 atomic_set(&ls->ls_requestqueue_cnt, 0);
0585 init_waitqueue_head(&ls->ls_requestqueue_wait);
0586 mutex_init(&ls->ls_requestqueue_mutex);
0587 mutex_init(&ls->ls_clear_proc_locks);
0588
0589
0590
0591
0592
0593
0594 ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
0595 if (!ls->ls_recover_buf)
0596 goto out_lkbidr;
0597
0598 ls->ls_slot = 0;
0599 ls->ls_num_slots = 0;
0600 ls->ls_slots_size = 0;
0601 ls->ls_slots = NULL;
0602
0603 INIT_LIST_HEAD(&ls->ls_recover_list);
0604 spin_lock_init(&ls->ls_recover_list_lock);
0605 idr_init(&ls->ls_recover_idr);
0606 spin_lock_init(&ls->ls_recover_idr_lock);
0607 ls->ls_recover_list_count = 0;
0608 ls->ls_local_handle = ls;
0609 init_waitqueue_head(&ls->ls_wait_general);
0610 INIT_LIST_HEAD(&ls->ls_root_list);
0611 init_rwsem(&ls->ls_root_sem);
0612
0613 spin_lock(&lslist_lock);
0614 ls->ls_create_count = 1;
0615 list_add(&ls->ls_list, &lslist);
0616 spin_unlock(&lslist_lock);
0617
0618 if (flags & DLM_LSFL_FS) {
0619 error = dlm_callback_start(ls);
0620 if (error) {
0621 log_error(ls, "can't start dlm_callback %d", error);
0622 goto out_delist;
0623 }
0624 }
0625
0626 init_waitqueue_head(&ls->ls_recover_lock_wait);
0627
0628
0629
0630
0631
0632
0633
0634
0635 error = dlm_recoverd_start(ls);
0636 if (error) {
0637 log_error(ls, "can't start dlm_recoverd %d", error);
0638 goto out_callback;
0639 }
0640
0641 wait_event(ls->ls_recover_lock_wait,
0642 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
0643
0644
0645 do_unreg = 1;
0646
0647 ls->ls_kobj.kset = dlm_kset;
0648 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
0649 "%s", ls->ls_name);
0650 if (error)
0651 goto out_recoverd;
0652 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
0653
0654
0655
0656
0657
0658
0659
0660 error = do_uevent(ls, 1);
0661 if (error)
0662 goto out_recoverd;
0663
0664
0665 wait_for_completion(&ls->ls_recovery_done);
0666 error = ls->ls_recovery_result;
0667 if (error)
0668 goto out_members;
0669
0670 dlm_create_debug_file(ls);
0671
0672 log_rinfo(ls, "join complete");
0673 *lockspace = ls;
0674 return 0;
0675
0676 out_members:
0677 do_uevent(ls, 0);
0678 dlm_clear_members(ls);
0679 kfree(ls->ls_node_array);
0680 out_recoverd:
0681 dlm_recoverd_stop(ls);
0682 out_callback:
0683 dlm_callback_stop(ls);
0684 out_delist:
0685 spin_lock(&lslist_lock);
0686 list_del(&ls->ls_list);
0687 spin_unlock(&lslist_lock);
0688 idr_destroy(&ls->ls_recover_idr);
0689 kfree(ls->ls_recover_buf);
0690 out_lkbidr:
0691 idr_destroy(&ls->ls_lkbidr);
0692 out_rsbtbl:
0693 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
0694 kfree(ls->ls_remove_names[i]);
0695 vfree(ls->ls_rsbtbl);
0696 out_lsfree:
0697 if (do_unreg)
0698 kobject_put(&ls->ls_kobj);
0699 else
0700 kfree(ls);
0701 out:
0702 module_put(THIS_MODULE);
0703 return error;
0704 }
0705
0706 int dlm_new_lockspace(const char *name, const char *cluster,
0707 uint32_t flags, int lvblen,
0708 const struct dlm_lockspace_ops *ops, void *ops_arg,
0709 int *ops_result, dlm_lockspace_t **lockspace)
0710 {
0711 int error = 0;
0712
0713 mutex_lock(&ls_lock);
0714 if (!ls_count)
0715 error = threads_start();
0716 if (error)
0717 goto out;
0718
0719 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
0720 ops_result, lockspace);
0721 if (!error)
0722 ls_count++;
0723 if (error > 0)
0724 error = 0;
0725 if (!ls_count) {
0726 dlm_scand_stop();
0727 dlm_midcomms_shutdown();
0728 dlm_lowcomms_stop();
0729 }
0730 out:
0731 mutex_unlock(&ls_lock);
0732 return error;
0733 }
0734
0735 static int lkb_idr_is_local(int id, void *p, void *data)
0736 {
0737 struct dlm_lkb *lkb = p;
0738
0739 return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
0740 }
0741
0742 static int lkb_idr_is_any(int id, void *p, void *data)
0743 {
0744 return 1;
0745 }
0746
0747 static int lkb_idr_free(int id, void *p, void *data)
0748 {
0749 struct dlm_lkb *lkb = p;
0750
0751 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
0752 dlm_free_lvb(lkb->lkb_lvbptr);
0753
0754 dlm_free_lkb(lkb);
0755 return 0;
0756 }
0757
0758
0759
0760
0761
0762 static int lockspace_busy(struct dlm_ls *ls, int force)
0763 {
0764 int rv;
0765
0766 spin_lock(&ls->ls_lkbidr_spin);
0767 if (force == 0) {
0768 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
0769 } else if (force == 1) {
0770 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
0771 } else {
0772 rv = 0;
0773 }
0774 spin_unlock(&ls->ls_lkbidr_spin);
0775 return rv;
0776 }
0777
0778 static int release_lockspace(struct dlm_ls *ls, int force)
0779 {
0780 struct dlm_rsb *rsb;
0781 struct rb_node *n;
0782 int i, busy, rv;
0783
0784 busy = lockspace_busy(ls, force);
0785
0786 spin_lock(&lslist_lock);
0787 if (ls->ls_create_count == 1) {
0788 if (busy) {
0789 rv = -EBUSY;
0790 } else {
0791
0792 ls->ls_create_count = 0;
0793 rv = 0;
0794 }
0795 } else if (ls->ls_create_count > 1) {
0796 rv = --ls->ls_create_count;
0797 } else {
0798 rv = -EINVAL;
0799 }
0800 spin_unlock(&lslist_lock);
0801
0802 if (rv) {
0803 log_debug(ls, "release_lockspace no remove %d", rv);
0804 return rv;
0805 }
0806
0807 dlm_device_deregister(ls);
0808
0809 if (force < 3 && dlm_user_daemon_available())
0810 do_uevent(ls, 0);
0811
0812 dlm_recoverd_stop(ls);
0813
0814 if (ls_count == 1) {
0815 dlm_scand_stop();
0816 dlm_clear_members(ls);
0817 dlm_midcomms_shutdown();
0818 }
0819
0820 dlm_callback_stop(ls);
0821
0822 remove_lockspace(ls);
0823
0824 dlm_delete_debug_file(ls);
0825
0826 idr_destroy(&ls->ls_recover_idr);
0827 kfree(ls->ls_recover_buf);
0828
0829
0830
0831
0832
0833 idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
0834 idr_destroy(&ls->ls_lkbidr);
0835
0836
0837
0838
0839
0840 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
0841 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
0842 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
0843 rb_erase(n, &ls->ls_rsbtbl[i].keep);
0844 dlm_free_rsb(rsb);
0845 }
0846
0847 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
0848 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
0849 rb_erase(n, &ls->ls_rsbtbl[i].toss);
0850 dlm_free_rsb(rsb);
0851 }
0852 }
0853
0854 vfree(ls->ls_rsbtbl);
0855
0856 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
0857 kfree(ls->ls_remove_names[i]);
0858
0859 while (!list_empty(&ls->ls_new_rsb)) {
0860 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
0861 res_hashchain);
0862 list_del(&rsb->res_hashchain);
0863 dlm_free_rsb(rsb);
0864 }
0865
0866
0867
0868
0869
0870 dlm_purge_requestqueue(ls);
0871 kfree(ls->ls_recover_args);
0872 dlm_clear_members(ls);
0873 dlm_clear_members_gone(ls);
0874 kfree(ls->ls_node_array);
0875 log_rinfo(ls, "release_lockspace final free");
0876 kobject_put(&ls->ls_kobj);
0877
0878
0879 module_put(THIS_MODULE);
0880 return 0;
0881 }
0882
0883
0884
0885
0886
0887
0888
0889
0890
0891
0892
0893
0894
0895
0896
0897 int dlm_release_lockspace(void *lockspace, int force)
0898 {
0899 struct dlm_ls *ls;
0900 int error;
0901
0902 ls = dlm_find_lockspace_local(lockspace);
0903 if (!ls)
0904 return -EINVAL;
0905 dlm_put_lockspace(ls);
0906
0907 mutex_lock(&ls_lock);
0908 error = release_lockspace(ls, force);
0909 if (!error)
0910 ls_count--;
0911 if (!ls_count)
0912 dlm_lowcomms_stop();
0913 mutex_unlock(&ls_lock);
0914
0915 return error;
0916 }
0917
0918 void dlm_stop_lockspaces(void)
0919 {
0920 struct dlm_ls *ls;
0921 int count;
0922
0923 restart:
0924 count = 0;
0925 spin_lock(&lslist_lock);
0926 list_for_each_entry(ls, &lslist, ls_list) {
0927 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
0928 count++;
0929 continue;
0930 }
0931 spin_unlock(&lslist_lock);
0932 log_error(ls, "no userland control daemon, stopping lockspace");
0933 dlm_ls_stop(ls);
0934 goto restart;
0935 }
0936 spin_unlock(&lslist_lock);
0937
0938 if (count)
0939 log_print("dlm user daemon left %d lockspaces", count);
0940 }
0941
0942 void dlm_stop_lockspaces_check(void)
0943 {
0944 struct dlm_ls *ls;
0945
0946 spin_lock(&lslist_lock);
0947 list_for_each_entry(ls, &lslist, ls_list) {
0948 if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) ||
0949 !dlm_locking_stopped(ls)))
0950 break;
0951 }
0952 spin_unlock(&lslist_lock);
0953 }