Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-only
0002 /******************************************************************************
0003 *******************************************************************************
0004 **
0005 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
0006 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
0007 **
0008 **
0009 *******************************************************************************
0010 ******************************************************************************/
0011 
0012 #include <linux/module.h>
0013 
0014 #include "dlm_internal.h"
0015 #include "lockspace.h"
0016 #include "member.h"
0017 #include "recoverd.h"
0018 #include "dir.h"
0019 #include "midcomms.h"
0020 #include "lowcomms.h"
0021 #include "config.h"
0022 #include "memory.h"
0023 #include "lock.h"
0024 #include "recover.h"
0025 #include "requestqueue.h"
0026 #include "user.h"
0027 #include "ast.h"
0028 
0029 static int          ls_count;
0030 static struct mutex     ls_lock;
0031 static struct list_head     lslist;
0032 static spinlock_t       lslist_lock;
0033 static struct task_struct * scand_task;
0034 
0035 
0036 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
0037 {
0038     ssize_t ret = len;
0039     int n;
0040     int rc = kstrtoint(buf, 0, &n);
0041 
0042     if (rc)
0043         return rc;
0044     ls = dlm_find_lockspace_local(ls->ls_local_handle);
0045     if (!ls)
0046         return -EINVAL;
0047 
0048     switch (n) {
0049     case 0:
0050         dlm_ls_stop(ls);
0051         break;
0052     case 1:
0053         dlm_ls_start(ls);
0054         break;
0055     default:
0056         ret = -EINVAL;
0057     }
0058     dlm_put_lockspace(ls);
0059     return ret;
0060 }
0061 
0062 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
0063 {
0064     int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
0065 
0066     if (rc)
0067         return rc;
0068     set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
0069     wake_up(&ls->ls_uevent_wait);
0070     return len;
0071 }
0072 
0073 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
0074 {
0075     return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
0076 }
0077 
0078 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
0079 {
0080     int rc = kstrtouint(buf, 0, &ls->ls_global_id);
0081 
0082     if (rc)
0083         return rc;
0084     return len;
0085 }
0086 
0087 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
0088 {
0089     return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
0090 }
0091 
0092 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
0093 {
0094     int val;
0095     int rc = kstrtoint(buf, 0, &val);
0096 
0097     if (rc)
0098         return rc;
0099     if (val == 1)
0100         set_bit(LSFL_NODIR, &ls->ls_flags);
0101     return len;
0102 }
0103 
0104 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
0105 {
0106     uint32_t status = dlm_recover_status(ls);
0107     return snprintf(buf, PAGE_SIZE, "%x\n", status);
0108 }
0109 
0110 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
0111 {
0112     return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
0113 }
0114 
0115 struct dlm_attr {
0116     struct attribute attr;
0117     ssize_t (*show)(struct dlm_ls *, char *);
0118     ssize_t (*store)(struct dlm_ls *, const char *, size_t);
0119 };
0120 
0121 static struct dlm_attr dlm_attr_control = {
0122     .attr  = {.name = "control", .mode = S_IWUSR},
0123     .store = dlm_control_store
0124 };
0125 
0126 static struct dlm_attr dlm_attr_event = {
0127     .attr  = {.name = "event_done", .mode = S_IWUSR},
0128     .store = dlm_event_store
0129 };
0130 
0131 static struct dlm_attr dlm_attr_id = {
0132     .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
0133     .show  = dlm_id_show,
0134     .store = dlm_id_store
0135 };
0136 
0137 static struct dlm_attr dlm_attr_nodir = {
0138     .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
0139     .show  = dlm_nodir_show,
0140     .store = dlm_nodir_store
0141 };
0142 
0143 static struct dlm_attr dlm_attr_recover_status = {
0144     .attr  = {.name = "recover_status", .mode = S_IRUGO},
0145     .show  = dlm_recover_status_show
0146 };
0147 
0148 static struct dlm_attr dlm_attr_recover_nodeid = {
0149     .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
0150     .show  = dlm_recover_nodeid_show
0151 };
0152 
0153 static struct attribute *dlm_attrs[] = {
0154     &dlm_attr_control.attr,
0155     &dlm_attr_event.attr,
0156     &dlm_attr_id.attr,
0157     &dlm_attr_nodir.attr,
0158     &dlm_attr_recover_status.attr,
0159     &dlm_attr_recover_nodeid.attr,
0160     NULL,
0161 };
0162 ATTRIBUTE_GROUPS(dlm);
0163 
0164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
0165                  char *buf)
0166 {
0167     struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
0168     struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
0169     return a->show ? a->show(ls, buf) : 0;
0170 }
0171 
0172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
0173                   const char *buf, size_t len)
0174 {
0175     struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
0176     struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
0177     return a->store ? a->store(ls, buf, len) : len;
0178 }
0179 
0180 static void lockspace_kobj_release(struct kobject *k)
0181 {
0182     struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
0183     kfree(ls);
0184 }
0185 
0186 static const struct sysfs_ops dlm_attr_ops = {
0187     .show  = dlm_attr_show,
0188     .store = dlm_attr_store,
0189 };
0190 
0191 static struct kobj_type dlm_ktype = {
0192     .default_groups = dlm_groups,
0193     .sysfs_ops     = &dlm_attr_ops,
0194     .release       = lockspace_kobj_release,
0195 };
0196 
0197 static struct kset *dlm_kset;
0198 
0199 static int do_uevent(struct dlm_ls *ls, int in)
0200 {
0201     if (in)
0202         kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
0203     else
0204         kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
0205 
0206     log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
0207 
0208     /* dlm_controld will see the uevent, do the necessary group management
0209        and then write to sysfs to wake us */
0210 
0211     wait_event(ls->ls_uevent_wait,
0212            test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
0213 
0214     log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
0215 
0216     return ls->ls_uevent_result;
0217 }
0218 
0219 static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env)
0220 {
0221     struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
0222 
0223     add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
0224     return 0;
0225 }
0226 
0227 static const struct kset_uevent_ops dlm_uevent_ops = {
0228     .uevent = dlm_uevent,
0229 };
0230 
0231 int __init dlm_lockspace_init(void)
0232 {
0233     ls_count = 0;
0234     mutex_init(&ls_lock);
0235     INIT_LIST_HEAD(&lslist);
0236     spin_lock_init(&lslist_lock);
0237 
0238     dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
0239     if (!dlm_kset) {
0240         printk(KERN_WARNING "%s: can not create kset\n", __func__);
0241         return -ENOMEM;
0242     }
0243     return 0;
0244 }
0245 
0246 void dlm_lockspace_exit(void)
0247 {
0248     kset_unregister(dlm_kset);
0249 }
0250 
0251 static struct dlm_ls *find_ls_to_scan(void)
0252 {
0253     struct dlm_ls *ls;
0254 
0255     spin_lock(&lslist_lock);
0256     list_for_each_entry(ls, &lslist, ls_list) {
0257         if (time_after_eq(jiffies, ls->ls_scan_time +
0258                         dlm_config.ci_scan_secs * HZ)) {
0259             spin_unlock(&lslist_lock);
0260             return ls;
0261         }
0262     }
0263     spin_unlock(&lslist_lock);
0264     return NULL;
0265 }
0266 
0267 static int dlm_scand(void *data)
0268 {
0269     struct dlm_ls *ls;
0270 
0271     while (!kthread_should_stop()) {
0272         ls = find_ls_to_scan();
0273         if (ls) {
0274             if (dlm_lock_recovery_try(ls)) {
0275                 ls->ls_scan_time = jiffies;
0276                 dlm_scan_rsbs(ls);
0277                 dlm_scan_timeout(ls);
0278                 dlm_unlock_recovery(ls);
0279             } else {
0280                 ls->ls_scan_time += HZ;
0281             }
0282             continue;
0283         }
0284         schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
0285     }
0286     return 0;
0287 }
0288 
0289 static int dlm_scand_start(void)
0290 {
0291     struct task_struct *p;
0292     int error = 0;
0293 
0294     p = kthread_run(dlm_scand, NULL, "dlm_scand");
0295     if (IS_ERR(p))
0296         error = PTR_ERR(p);
0297     else
0298         scand_task = p;
0299     return error;
0300 }
0301 
0302 static void dlm_scand_stop(void)
0303 {
0304     kthread_stop(scand_task);
0305 }
0306 
0307 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
0308 {
0309     struct dlm_ls *ls;
0310 
0311     spin_lock(&lslist_lock);
0312 
0313     list_for_each_entry(ls, &lslist, ls_list) {
0314         if (ls->ls_global_id == id) {
0315             atomic_inc(&ls->ls_count);
0316             goto out;
0317         }
0318     }
0319     ls = NULL;
0320  out:
0321     spin_unlock(&lslist_lock);
0322     return ls;
0323 }
0324 
0325 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
0326 {
0327     struct dlm_ls *ls;
0328 
0329     spin_lock(&lslist_lock);
0330     list_for_each_entry(ls, &lslist, ls_list) {
0331         if (ls->ls_local_handle == lockspace) {
0332             atomic_inc(&ls->ls_count);
0333             goto out;
0334         }
0335     }
0336     ls = NULL;
0337  out:
0338     spin_unlock(&lslist_lock);
0339     return ls;
0340 }
0341 
0342 struct dlm_ls *dlm_find_lockspace_device(int minor)
0343 {
0344     struct dlm_ls *ls;
0345 
0346     spin_lock(&lslist_lock);
0347     list_for_each_entry(ls, &lslist, ls_list) {
0348         if (ls->ls_device.minor == minor) {
0349             atomic_inc(&ls->ls_count);
0350             goto out;
0351         }
0352     }
0353     ls = NULL;
0354  out:
0355     spin_unlock(&lslist_lock);
0356     return ls;
0357 }
0358 
0359 void dlm_put_lockspace(struct dlm_ls *ls)
0360 {
0361     if (atomic_dec_and_test(&ls->ls_count))
0362         wake_up(&ls->ls_count_wait);
0363 }
0364 
0365 static void remove_lockspace(struct dlm_ls *ls)
0366 {
0367 retry:
0368     wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
0369 
0370     spin_lock(&lslist_lock);
0371     if (atomic_read(&ls->ls_count) != 0) {
0372         spin_unlock(&lslist_lock);
0373         goto retry;
0374     }
0375 
0376     WARN_ON(ls->ls_create_count != 0);
0377     list_del(&ls->ls_list);
0378     spin_unlock(&lslist_lock);
0379 }
0380 
0381 static int threads_start(void)
0382 {
0383     int error;
0384 
0385     error = dlm_scand_start();
0386     if (error) {
0387         log_print("cannot start dlm_scand thread %d", error);
0388         goto fail;
0389     }
0390 
0391     /* Thread for sending/receiving messages for all lockspace's */
0392     error = dlm_midcomms_start();
0393     if (error) {
0394         log_print("cannot start dlm lowcomms %d", error);
0395         goto scand_fail;
0396     }
0397 
0398     return 0;
0399 
0400  scand_fail:
0401     dlm_scand_stop();
0402  fail:
0403     return error;
0404 }
0405 
0406 static int new_lockspace(const char *name, const char *cluster,
0407              uint32_t flags, int lvblen,
0408              const struct dlm_lockspace_ops *ops, void *ops_arg,
0409              int *ops_result, dlm_lockspace_t **lockspace)
0410 {
0411     struct dlm_ls *ls;
0412     int i, size, error;
0413     int do_unreg = 0;
0414     int namelen = strlen(name);
0415 
0416     if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
0417         return -EINVAL;
0418 
0419     if (!lvblen || (lvblen % 8))
0420         return -EINVAL;
0421 
0422     if (!try_module_get(THIS_MODULE))
0423         return -EINVAL;
0424 
0425     if (!dlm_user_daemon_available()) {
0426         log_print("dlm user daemon not available");
0427         error = -EUNATCH;
0428         goto out;
0429     }
0430 
0431     if (ops && ops_result) {
0432             if (!dlm_config.ci_recover_callbacks)
0433             *ops_result = -EOPNOTSUPP;
0434         else
0435             *ops_result = 0;
0436     }
0437 
0438     if (!cluster)
0439         log_print("dlm cluster name '%s' is being used without an application provided cluster name",
0440               dlm_config.ci_cluster_name);
0441 
0442     if (dlm_config.ci_recover_callbacks && cluster &&
0443         strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
0444         log_print("dlm cluster name '%s' does not match "
0445               "the application cluster name '%s'",
0446               dlm_config.ci_cluster_name, cluster);
0447         error = -EBADR;
0448         goto out;
0449     }
0450 
0451     error = 0;
0452 
0453     spin_lock(&lslist_lock);
0454     list_for_each_entry(ls, &lslist, ls_list) {
0455         WARN_ON(ls->ls_create_count <= 0);
0456         if (ls->ls_namelen != namelen)
0457             continue;
0458         if (memcmp(ls->ls_name, name, namelen))
0459             continue;
0460         if (flags & DLM_LSFL_NEWEXCL) {
0461             error = -EEXIST;
0462             break;
0463         }
0464         ls->ls_create_count++;
0465         *lockspace = ls;
0466         error = 1;
0467         break;
0468     }
0469     spin_unlock(&lslist_lock);
0470 
0471     if (error)
0472         goto out;
0473 
0474     error = -ENOMEM;
0475 
0476     ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
0477     if (!ls)
0478         goto out;
0479     memcpy(ls->ls_name, name, namelen);
0480     ls->ls_namelen = namelen;
0481     ls->ls_lvblen = lvblen;
0482     atomic_set(&ls->ls_count, 0);
0483     init_waitqueue_head(&ls->ls_count_wait);
0484     ls->ls_flags = 0;
0485     ls->ls_scan_time = jiffies;
0486 
0487     if (ops && dlm_config.ci_recover_callbacks) {
0488         ls->ls_ops = ops;
0489         ls->ls_ops_arg = ops_arg;
0490     }
0491 
0492 #ifdef CONFIG_DLM_DEPRECATED_API
0493     if (flags & DLM_LSFL_TIMEWARN) {
0494         pr_warn_once("===============================================================\n"
0495                  "WARNING: the dlm DLM_LSFL_TIMEWARN flag is being deprecated and\n"
0496                  "         will be removed in v6.2!\n"
0497                  "         Inclusive DLM_LSFL_TIMEWARN define in UAPI header!\n"
0498                  "===============================================================\n");
0499 
0500         set_bit(LSFL_TIMEWARN, &ls->ls_flags);
0501     }
0502 
0503     /* ls_exflags are forced to match among nodes, and we don't
0504      * need to require all nodes to have some flags set
0505      */
0506     ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
0507                     DLM_LSFL_NEWEXCL));
0508 #else
0509     /* ls_exflags are forced to match among nodes, and we don't
0510      * need to require all nodes to have some flags set
0511      */
0512     ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
0513 #endif
0514 
0515     size = READ_ONCE(dlm_config.ci_rsbtbl_size);
0516     ls->ls_rsbtbl_size = size;
0517 
0518     ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
0519     if (!ls->ls_rsbtbl)
0520         goto out_lsfree;
0521     for (i = 0; i < size; i++) {
0522         ls->ls_rsbtbl[i].keep.rb_node = NULL;
0523         ls->ls_rsbtbl[i].toss.rb_node = NULL;
0524         spin_lock_init(&ls->ls_rsbtbl[i].lock);
0525     }
0526 
0527     spin_lock_init(&ls->ls_remove_spin);
0528     init_waitqueue_head(&ls->ls_remove_wait);
0529 
0530     for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
0531         ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
0532                          GFP_KERNEL);
0533         if (!ls->ls_remove_names[i])
0534             goto out_rsbtbl;
0535     }
0536 
0537     idr_init(&ls->ls_lkbidr);
0538     spin_lock_init(&ls->ls_lkbidr_spin);
0539 
0540     INIT_LIST_HEAD(&ls->ls_waiters);
0541     mutex_init(&ls->ls_waiters_mutex);
0542     INIT_LIST_HEAD(&ls->ls_orphans);
0543     mutex_init(&ls->ls_orphans_mutex);
0544 #ifdef CONFIG_DLM_DEPRECATED_API
0545     INIT_LIST_HEAD(&ls->ls_timeout);
0546     mutex_init(&ls->ls_timeout_mutex);
0547 #endif
0548 
0549     INIT_LIST_HEAD(&ls->ls_new_rsb);
0550     spin_lock_init(&ls->ls_new_rsb_spin);
0551 
0552     INIT_LIST_HEAD(&ls->ls_nodes);
0553     INIT_LIST_HEAD(&ls->ls_nodes_gone);
0554     ls->ls_num_nodes = 0;
0555     ls->ls_low_nodeid = 0;
0556     ls->ls_total_weight = 0;
0557     ls->ls_node_array = NULL;
0558 
0559     memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
0560     ls->ls_stub_rsb.res_ls = ls;
0561 
0562     ls->ls_debug_rsb_dentry = NULL;
0563     ls->ls_debug_waiters_dentry = NULL;
0564 
0565     init_waitqueue_head(&ls->ls_uevent_wait);
0566     ls->ls_uevent_result = 0;
0567     init_completion(&ls->ls_recovery_done);
0568     ls->ls_recovery_result = -1;
0569 
0570     mutex_init(&ls->ls_cb_mutex);
0571     INIT_LIST_HEAD(&ls->ls_cb_delay);
0572 
0573     ls->ls_recoverd_task = NULL;
0574     mutex_init(&ls->ls_recoverd_active);
0575     spin_lock_init(&ls->ls_recover_lock);
0576     spin_lock_init(&ls->ls_rcom_spin);
0577     get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
0578     ls->ls_recover_status = 0;
0579     ls->ls_recover_seq = 0;
0580     ls->ls_recover_args = NULL;
0581     init_rwsem(&ls->ls_in_recovery);
0582     init_rwsem(&ls->ls_recv_active);
0583     INIT_LIST_HEAD(&ls->ls_requestqueue);
0584     atomic_set(&ls->ls_requestqueue_cnt, 0);
0585     init_waitqueue_head(&ls->ls_requestqueue_wait);
0586     mutex_init(&ls->ls_requestqueue_mutex);
0587     mutex_init(&ls->ls_clear_proc_locks);
0588 
0589     /* Due backwards compatibility with 3.1 we need to use maximum
0590      * possible dlm message size to be sure the message will fit and
0591      * not having out of bounds issues. However on sending side 3.2
0592      * might send less.
0593      */
0594     ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
0595     if (!ls->ls_recover_buf)
0596         goto out_lkbidr;
0597 
0598     ls->ls_slot = 0;
0599     ls->ls_num_slots = 0;
0600     ls->ls_slots_size = 0;
0601     ls->ls_slots = NULL;
0602 
0603     INIT_LIST_HEAD(&ls->ls_recover_list);
0604     spin_lock_init(&ls->ls_recover_list_lock);
0605     idr_init(&ls->ls_recover_idr);
0606     spin_lock_init(&ls->ls_recover_idr_lock);
0607     ls->ls_recover_list_count = 0;
0608     ls->ls_local_handle = ls;
0609     init_waitqueue_head(&ls->ls_wait_general);
0610     INIT_LIST_HEAD(&ls->ls_root_list);
0611     init_rwsem(&ls->ls_root_sem);
0612 
0613     spin_lock(&lslist_lock);
0614     ls->ls_create_count = 1;
0615     list_add(&ls->ls_list, &lslist);
0616     spin_unlock(&lslist_lock);
0617 
0618     if (flags & DLM_LSFL_FS) {
0619         error = dlm_callback_start(ls);
0620         if (error) {
0621             log_error(ls, "can't start dlm_callback %d", error);
0622             goto out_delist;
0623         }
0624     }
0625 
0626     init_waitqueue_head(&ls->ls_recover_lock_wait);
0627 
0628     /*
0629      * Once started, dlm_recoverd first looks for ls in lslist, then
0630      * initializes ls_in_recovery as locked in "down" mode.  We need
0631      * to wait for the wakeup from dlm_recoverd because in_recovery
0632      * has to start out in down mode.
0633      */
0634 
0635     error = dlm_recoverd_start(ls);
0636     if (error) {
0637         log_error(ls, "can't start dlm_recoverd %d", error);
0638         goto out_callback;
0639     }
0640 
0641     wait_event(ls->ls_recover_lock_wait,
0642            test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
0643 
0644     /* let kobject handle freeing of ls if there's an error */
0645     do_unreg = 1;
0646 
0647     ls->ls_kobj.kset = dlm_kset;
0648     error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
0649                      "%s", ls->ls_name);
0650     if (error)
0651         goto out_recoverd;
0652     kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
0653 
0654     /* This uevent triggers dlm_controld in userspace to add us to the
0655        group of nodes that are members of this lockspace (managed by the
0656        cluster infrastructure.)  Once it's done that, it tells us who the
0657        current lockspace members are (via configfs) and then tells the
0658        lockspace to start running (via sysfs) in dlm_ls_start(). */
0659 
0660     error = do_uevent(ls, 1);
0661     if (error)
0662         goto out_recoverd;
0663 
0664     /* wait until recovery is successful or failed */
0665     wait_for_completion(&ls->ls_recovery_done);
0666     error = ls->ls_recovery_result;
0667     if (error)
0668         goto out_members;
0669 
0670     dlm_create_debug_file(ls);
0671 
0672     log_rinfo(ls, "join complete");
0673     *lockspace = ls;
0674     return 0;
0675 
0676  out_members:
0677     do_uevent(ls, 0);
0678     dlm_clear_members(ls);
0679     kfree(ls->ls_node_array);
0680  out_recoverd:
0681     dlm_recoverd_stop(ls);
0682  out_callback:
0683     dlm_callback_stop(ls);
0684  out_delist:
0685     spin_lock(&lslist_lock);
0686     list_del(&ls->ls_list);
0687     spin_unlock(&lslist_lock);
0688     idr_destroy(&ls->ls_recover_idr);
0689     kfree(ls->ls_recover_buf);
0690  out_lkbidr:
0691     idr_destroy(&ls->ls_lkbidr);
0692  out_rsbtbl:
0693     for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
0694         kfree(ls->ls_remove_names[i]);
0695     vfree(ls->ls_rsbtbl);
0696  out_lsfree:
0697     if (do_unreg)
0698         kobject_put(&ls->ls_kobj);
0699     else
0700         kfree(ls);
0701  out:
0702     module_put(THIS_MODULE);
0703     return error;
0704 }
0705 
0706 int dlm_new_lockspace(const char *name, const char *cluster,
0707               uint32_t flags, int lvblen,
0708               const struct dlm_lockspace_ops *ops, void *ops_arg,
0709               int *ops_result, dlm_lockspace_t **lockspace)
0710 {
0711     int error = 0;
0712 
0713     mutex_lock(&ls_lock);
0714     if (!ls_count)
0715         error = threads_start();
0716     if (error)
0717         goto out;
0718 
0719     error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
0720                   ops_result, lockspace);
0721     if (!error)
0722         ls_count++;
0723     if (error > 0)
0724         error = 0;
0725     if (!ls_count) {
0726         dlm_scand_stop();
0727         dlm_midcomms_shutdown();
0728         dlm_lowcomms_stop();
0729     }
0730  out:
0731     mutex_unlock(&ls_lock);
0732     return error;
0733 }
0734 
0735 static int lkb_idr_is_local(int id, void *p, void *data)
0736 {
0737     struct dlm_lkb *lkb = p;
0738 
0739     return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
0740 }
0741 
0742 static int lkb_idr_is_any(int id, void *p, void *data)
0743 {
0744     return 1;
0745 }
0746 
0747 static int lkb_idr_free(int id, void *p, void *data)
0748 {
0749     struct dlm_lkb *lkb = p;
0750 
0751     if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
0752         dlm_free_lvb(lkb->lkb_lvbptr);
0753 
0754     dlm_free_lkb(lkb);
0755     return 0;
0756 }
0757 
0758 /* NOTE: We check the lkbidr here rather than the resource table.
0759    This is because there may be LKBs queued as ASTs that have been unlinked
0760    from their RSBs and are pending deletion once the AST has been delivered */
0761 
0762 static int lockspace_busy(struct dlm_ls *ls, int force)
0763 {
0764     int rv;
0765 
0766     spin_lock(&ls->ls_lkbidr_spin);
0767     if (force == 0) {
0768         rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
0769     } else if (force == 1) {
0770         rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
0771     } else {
0772         rv = 0;
0773     }
0774     spin_unlock(&ls->ls_lkbidr_spin);
0775     return rv;
0776 }
0777 
0778 static int release_lockspace(struct dlm_ls *ls, int force)
0779 {
0780     struct dlm_rsb *rsb;
0781     struct rb_node *n;
0782     int i, busy, rv;
0783 
0784     busy = lockspace_busy(ls, force);
0785 
0786     spin_lock(&lslist_lock);
0787     if (ls->ls_create_count == 1) {
0788         if (busy) {
0789             rv = -EBUSY;
0790         } else {
0791             /* remove_lockspace takes ls off lslist */
0792             ls->ls_create_count = 0;
0793             rv = 0;
0794         }
0795     } else if (ls->ls_create_count > 1) {
0796         rv = --ls->ls_create_count;
0797     } else {
0798         rv = -EINVAL;
0799     }
0800     spin_unlock(&lslist_lock);
0801 
0802     if (rv) {
0803         log_debug(ls, "release_lockspace no remove %d", rv);
0804         return rv;
0805     }
0806 
0807     dlm_device_deregister(ls);
0808 
0809     if (force < 3 && dlm_user_daemon_available())
0810         do_uevent(ls, 0);
0811 
0812     dlm_recoverd_stop(ls);
0813 
0814     if (ls_count == 1) {
0815         dlm_scand_stop();
0816         dlm_clear_members(ls);
0817         dlm_midcomms_shutdown();
0818     }
0819 
0820     dlm_callback_stop(ls);
0821 
0822     remove_lockspace(ls);
0823 
0824     dlm_delete_debug_file(ls);
0825 
0826     idr_destroy(&ls->ls_recover_idr);
0827     kfree(ls->ls_recover_buf);
0828 
0829     /*
0830      * Free all lkb's in idr
0831      */
0832 
0833     idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
0834     idr_destroy(&ls->ls_lkbidr);
0835 
0836     /*
0837      * Free all rsb's on rsbtbl[] lists
0838      */
0839 
0840     for (i = 0; i < ls->ls_rsbtbl_size; i++) {
0841         while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
0842             rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
0843             rb_erase(n, &ls->ls_rsbtbl[i].keep);
0844             dlm_free_rsb(rsb);
0845         }
0846 
0847         while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
0848             rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
0849             rb_erase(n, &ls->ls_rsbtbl[i].toss);
0850             dlm_free_rsb(rsb);
0851         }
0852     }
0853 
0854     vfree(ls->ls_rsbtbl);
0855 
0856     for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
0857         kfree(ls->ls_remove_names[i]);
0858 
0859     while (!list_empty(&ls->ls_new_rsb)) {
0860         rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
0861                        res_hashchain);
0862         list_del(&rsb->res_hashchain);
0863         dlm_free_rsb(rsb);
0864     }
0865 
0866     /*
0867      * Free structures on any other lists
0868      */
0869 
0870     dlm_purge_requestqueue(ls);
0871     kfree(ls->ls_recover_args);
0872     dlm_clear_members(ls);
0873     dlm_clear_members_gone(ls);
0874     kfree(ls->ls_node_array);
0875     log_rinfo(ls, "release_lockspace final free");
0876     kobject_put(&ls->ls_kobj);
0877     /* The ls structure will be freed when the kobject is done with */
0878 
0879     module_put(THIS_MODULE);
0880     return 0;
0881 }
0882 
0883 /*
0884  * Called when a system has released all its locks and is not going to use the
0885  * lockspace any longer.  We free everything we're managing for this lockspace.
0886  * Remaining nodes will go through the recovery process as if we'd died.  The
0887  * lockspace must continue to function as usual, participating in recoveries,
0888  * until this returns.
0889  *
0890  * Force has 4 possible values:
0891  * 0 - don't destroy lockspace if it has any LKBs
0892  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
0893  * 2 - destroy lockspace regardless of LKBs
0894  * 3 - destroy lockspace as part of a forced shutdown
0895  */
0896 
0897 int dlm_release_lockspace(void *lockspace, int force)
0898 {
0899     struct dlm_ls *ls;
0900     int error;
0901 
0902     ls = dlm_find_lockspace_local(lockspace);
0903     if (!ls)
0904         return -EINVAL;
0905     dlm_put_lockspace(ls);
0906 
0907     mutex_lock(&ls_lock);
0908     error = release_lockspace(ls, force);
0909     if (!error)
0910         ls_count--;
0911     if (!ls_count)
0912         dlm_lowcomms_stop();
0913     mutex_unlock(&ls_lock);
0914 
0915     return error;
0916 }
0917 
0918 void dlm_stop_lockspaces(void)
0919 {
0920     struct dlm_ls *ls;
0921     int count;
0922 
0923  restart:
0924     count = 0;
0925     spin_lock(&lslist_lock);
0926     list_for_each_entry(ls, &lslist, ls_list) {
0927         if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
0928             count++;
0929             continue;
0930         }
0931         spin_unlock(&lslist_lock);
0932         log_error(ls, "no userland control daemon, stopping lockspace");
0933         dlm_ls_stop(ls);
0934         goto restart;
0935     }
0936     spin_unlock(&lslist_lock);
0937 
0938     if (count)
0939         log_print("dlm user daemon left %d lockspaces", count);
0940 }
0941 
0942 void dlm_stop_lockspaces_check(void)
0943 {
0944     struct dlm_ls *ls;
0945 
0946     spin_lock(&lslist_lock);
0947     list_for_each_entry(ls, &lslist, ls_list) {
0948         if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) ||
0949                 !dlm_locking_stopped(ls)))
0950             break;
0951     }
0952     spin_unlock(&lslist_lock);
0953 }