Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-only
0002 /*
0003  * stack_user.c
0004  *
0005  * Code which interfaces ocfs2 with fs/dlm and a userspace stack.
0006  *
0007  * Copyright (C) 2007 Oracle.  All rights reserved.
0008  */
0009 
0010 #include <linux/module.h>
0011 #include <linux/fs.h>
0012 #include <linux/miscdevice.h>
0013 #include <linux/mutex.h>
0014 #include <linux/slab.h>
0015 #include <linux/reboot.h>
0016 #include <linux/sched.h>
0017 #include <linux/uaccess.h>
0018 
0019 #include "stackglue.h"
0020 
0021 #include <linux/dlm_plock.h>
0022 
0023 /*
0024  * The control protocol starts with a handshake.  Until the handshake
0025  * is complete, the control device will fail all write(2)s.
0026  *
0027  * The handshake is simple.  First, the client reads until EOF.  Each line
0028  * of output is a supported protocol tag.  All protocol tags are a single
0029  * character followed by a two hex digit version number.  Currently the
0030  * only things supported is T01, for "Text-base version 0x01".  Next, the
0031  * client writes the version they would like to use, including the newline.
0032  * Thus, the protocol tag is 'T01\n'.  If the version tag written is
0033  * unknown, -EINVAL is returned.  Once the negotiation is complete, the
0034  * client can start sending messages.
0035  *
0036  * The T01 protocol has three messages.  First is the "SETN" message.
0037  * It has the following syntax:
0038  *
0039  *  SETN<space><8-char-hex-nodenum><newline>
0040  *
0041  * This is 14 characters.
0042  *
0043  * The "SETN" message must be the first message following the protocol.
0044  * It tells ocfs2_control the local node number.
0045  *
0046  * Next comes the "SETV" message.  It has the following syntax:
0047  *
0048  *  SETV<space><2-char-hex-major><space><2-char-hex-minor><newline>
0049  *
0050  * This is 11 characters.
0051  *
0052  * The "SETV" message sets the filesystem locking protocol version as
0053  * negotiated by the client.  The client negotiates based on the maximum
0054  * version advertised in /sys/fs/ocfs2/max_locking_protocol.  The major
0055  * number from the "SETV" message must match
0056  * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number
0057  * must be less than or equal to ...sp_max_version.pv_minor.
0058  *
0059  * Once this information has been set, mounts will be allowed.  From this
0060  * point on, the "DOWN" message can be sent for node down notification.
0061  * It has the following syntax:
0062  *
0063  *  DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline>
0064  *
0065  * eg:
0066  *
0067  *  DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n
0068  *
0069  * This is 47 characters.
0070  */
0071 
0072 /*
0073  * Whether or not the client has done the handshake.
0074  * For now, we have just one protocol version.
0075  */
0076 #define OCFS2_CONTROL_PROTO         "T01\n"
0077 #define OCFS2_CONTROL_PROTO_LEN         4
0078 
0079 /* Handshake states */
0080 #define OCFS2_CONTROL_HANDSHAKE_INVALID     (0)
0081 #define OCFS2_CONTROL_HANDSHAKE_READ        (1)
0082 #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL    (2)
0083 #define OCFS2_CONTROL_HANDSHAKE_VALID       (3)
0084 
0085 /* Messages */
0086 #define OCFS2_CONTROL_MESSAGE_OP_LEN        4
0087 #define OCFS2_CONTROL_MESSAGE_SETNODE_OP    "SETN"
0088 #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN 14
0089 #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP "SETV"
0090 #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN  11
0091 #define OCFS2_CONTROL_MESSAGE_DOWN_OP       "DOWN"
0092 #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN    47
0093 #define OCFS2_TEXT_UUID_LEN         32
0094 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN    2
0095 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN   8
0096 #define VERSION_LOCK                "version_lock"
0097 
0098 enum ocfs2_connection_type {
0099     WITH_CONTROLD,
0100     NO_CONTROLD
0101 };
0102 
0103 /*
0104  * ocfs2_live_connection is refcounted because the filesystem and
0105  * miscdevice sides can detach in different order.  Let's just be safe.
0106  */
0107 struct ocfs2_live_connection {
0108     struct list_head        oc_list;
0109     struct ocfs2_cluster_connection *oc_conn;
0110     enum ocfs2_connection_type  oc_type;
0111     atomic_t                        oc_this_node;
0112     int                             oc_our_slot;
0113     struct dlm_lksb                 oc_version_lksb;
0114     char                            oc_lvb[DLM_LVB_LEN];
0115     struct completion               oc_sync_wait;
0116     wait_queue_head_t       oc_wait;
0117 };
0118 
0119 struct ocfs2_control_private {
0120     struct list_head op_list;
0121     int op_state;
0122     int op_this_node;
0123     struct ocfs2_protocol_version op_proto;
0124 };
0125 
0126 /* SETN<space><8-char-hex-nodenum><newline> */
0127 struct ocfs2_control_message_setn {
0128     char    tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
0129     char    space;
0130     char    nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
0131     char    newline;
0132 };
0133 
0134 /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */
0135 struct ocfs2_control_message_setv {
0136     char    tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
0137     char    space1;
0138     char    major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
0139     char    space2;
0140     char    minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
0141     char    newline;
0142 };
0143 
0144 /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */
0145 struct ocfs2_control_message_down {
0146     char    tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
0147     char    space1;
0148     char    uuid[OCFS2_TEXT_UUID_LEN];
0149     char    space2;
0150     char    nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
0151     char    newline;
0152 };
0153 
0154 union ocfs2_control_message {
0155     char                    tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
0156     struct ocfs2_control_message_setn   u_setn;
0157     struct ocfs2_control_message_setv   u_setv;
0158     struct ocfs2_control_message_down   u_down;
0159 };
0160 
0161 static struct ocfs2_stack_plugin ocfs2_user_plugin;
0162 
0163 static atomic_t ocfs2_control_opened;
0164 static int ocfs2_control_this_node = -1;
0165 static struct ocfs2_protocol_version running_proto;
0166 
0167 static LIST_HEAD(ocfs2_live_connection_list);
0168 static LIST_HEAD(ocfs2_control_private_list);
0169 static DEFINE_MUTEX(ocfs2_control_lock);
0170 
0171 static inline void ocfs2_control_set_handshake_state(struct file *file,
0172                              int state)
0173 {
0174     struct ocfs2_control_private *p = file->private_data;
0175     p->op_state = state;
0176 }
0177 
0178 static inline int ocfs2_control_get_handshake_state(struct file *file)
0179 {
0180     struct ocfs2_control_private *p = file->private_data;
0181     return p->op_state;
0182 }
0183 
0184 static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
0185 {
0186     size_t len = strlen(name);
0187     struct ocfs2_live_connection *c;
0188 
0189     BUG_ON(!mutex_is_locked(&ocfs2_control_lock));
0190 
0191     list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) {
0192         if ((c->oc_conn->cc_namelen == len) &&
0193             !strncmp(c->oc_conn->cc_name, name, len))
0194             return c;
0195     }
0196 
0197     return NULL;
0198 }
0199 
0200 /*
0201  * ocfs2_live_connection structures are created underneath the ocfs2
0202  * mount path.  Since the VFS prevents multiple calls to
0203  * fill_super(), we can't get dupes here.
0204  */
0205 static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn,
0206                      struct ocfs2_live_connection *c)
0207 {
0208     int rc = 0;
0209 
0210     mutex_lock(&ocfs2_control_lock);
0211     c->oc_conn = conn;
0212 
0213     if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened))
0214         list_add(&c->oc_list, &ocfs2_live_connection_list);
0215     else {
0216         printk(KERN_ERR
0217                "ocfs2: Userspace control daemon is not present\n");
0218         rc = -ESRCH;
0219     }
0220 
0221     mutex_unlock(&ocfs2_control_lock);
0222     return rc;
0223 }
0224 
0225 /*
0226  * This function disconnects the cluster connection from ocfs2_control.
0227  * Afterwards, userspace can't affect the cluster connection.
0228  */
0229 static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c)
0230 {
0231     mutex_lock(&ocfs2_control_lock);
0232     list_del_init(&c->oc_list);
0233     c->oc_conn = NULL;
0234     mutex_unlock(&ocfs2_control_lock);
0235 
0236     kfree(c);
0237 }
0238 
0239 static int ocfs2_control_cfu(void *target, size_t target_len,
0240                  const char __user *buf, size_t count)
0241 {
0242     /* The T01 expects write(2) calls to have exactly one command */
0243     if ((count != target_len) ||
0244         (count > sizeof(union ocfs2_control_message)))
0245         return -EINVAL;
0246 
0247     if (copy_from_user(target, buf, target_len))
0248         return -EFAULT;
0249 
0250     return 0;
0251 }
0252 
0253 static ssize_t ocfs2_control_validate_protocol(struct file *file,
0254                            const char __user *buf,
0255                            size_t count)
0256 {
0257     ssize_t ret;
0258     char kbuf[OCFS2_CONTROL_PROTO_LEN];
0259 
0260     ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN,
0261                 buf, count);
0262     if (ret)
0263         return ret;
0264 
0265     if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN))
0266         return -EINVAL;
0267 
0268     ocfs2_control_set_handshake_state(file,
0269                       OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
0270 
0271     return count;
0272 }
0273 
0274 static void ocfs2_control_send_down(const char *uuid,
0275                     int nodenum)
0276 {
0277     struct ocfs2_live_connection *c;
0278 
0279     mutex_lock(&ocfs2_control_lock);
0280 
0281     c = ocfs2_connection_find(uuid);
0282     if (c) {
0283         BUG_ON(c->oc_conn == NULL);
0284         c->oc_conn->cc_recovery_handler(nodenum,
0285                         c->oc_conn->cc_recovery_data);
0286     }
0287 
0288     mutex_unlock(&ocfs2_control_lock);
0289 }
0290 
0291 /*
0292  * Called whenever configuration elements are sent to /dev/ocfs2_control.
0293  * If all configuration elements are present, try to set the global
0294  * values.  If there is a problem, return an error.  Skip any missing
0295  * elements, and only bump ocfs2_control_opened when we have all elements
0296  * and are successful.
0297  */
0298 static int ocfs2_control_install_private(struct file *file)
0299 {
0300     int rc = 0;
0301     int set_p = 1;
0302     struct ocfs2_control_private *p = file->private_data;
0303 
0304     BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
0305 
0306     mutex_lock(&ocfs2_control_lock);
0307 
0308     if (p->op_this_node < 0) {
0309         set_p = 0;
0310     } else if ((ocfs2_control_this_node >= 0) &&
0311            (ocfs2_control_this_node != p->op_this_node)) {
0312         rc = -EINVAL;
0313         goto out_unlock;
0314     }
0315 
0316     if (!p->op_proto.pv_major) {
0317         set_p = 0;
0318     } else if (!list_empty(&ocfs2_live_connection_list) &&
0319            ((running_proto.pv_major != p->op_proto.pv_major) ||
0320             (running_proto.pv_minor != p->op_proto.pv_minor))) {
0321         rc = -EINVAL;
0322         goto out_unlock;
0323     }
0324 
0325     if (set_p) {
0326         ocfs2_control_this_node = p->op_this_node;
0327         running_proto.pv_major = p->op_proto.pv_major;
0328         running_proto.pv_minor = p->op_proto.pv_minor;
0329     }
0330 
0331 out_unlock:
0332     mutex_unlock(&ocfs2_control_lock);
0333 
0334     if (!rc && set_p) {
0335         /* We set the global values successfully */
0336         atomic_inc(&ocfs2_control_opened);
0337         ocfs2_control_set_handshake_state(file,
0338                     OCFS2_CONTROL_HANDSHAKE_VALID);
0339     }
0340 
0341     return rc;
0342 }
0343 
0344 static int ocfs2_control_get_this_node(void)
0345 {
0346     int rc;
0347 
0348     mutex_lock(&ocfs2_control_lock);
0349     if (ocfs2_control_this_node < 0)
0350         rc = -EINVAL;
0351     else
0352         rc = ocfs2_control_this_node;
0353     mutex_unlock(&ocfs2_control_lock);
0354 
0355     return rc;
0356 }
0357 
0358 static int ocfs2_control_do_setnode_msg(struct file *file,
0359                     struct ocfs2_control_message_setn *msg)
0360 {
0361     long nodenum;
0362     char *ptr = NULL;
0363     struct ocfs2_control_private *p = file->private_data;
0364 
0365     if (ocfs2_control_get_handshake_state(file) !=
0366         OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
0367         return -EINVAL;
0368 
0369     if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
0370             OCFS2_CONTROL_MESSAGE_OP_LEN))
0371         return -EINVAL;
0372 
0373     if ((msg->space != ' ') || (msg->newline != '\n'))
0374         return -EINVAL;
0375     msg->space = msg->newline = '\0';
0376 
0377     nodenum = simple_strtol(msg->nodestr, &ptr, 16);
0378     if (!ptr || *ptr)
0379         return -EINVAL;
0380 
0381     if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
0382         (nodenum > INT_MAX) || (nodenum < 0))
0383         return -ERANGE;
0384     p->op_this_node = nodenum;
0385 
0386     return ocfs2_control_install_private(file);
0387 }
0388 
0389 static int ocfs2_control_do_setversion_msg(struct file *file,
0390                        struct ocfs2_control_message_setv *msg)
0391 {
0392     long major, minor;
0393     char *ptr = NULL;
0394     struct ocfs2_control_private *p = file->private_data;
0395     struct ocfs2_protocol_version *max =
0396         &ocfs2_user_plugin.sp_max_proto;
0397 
0398     if (ocfs2_control_get_handshake_state(file) !=
0399         OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
0400         return -EINVAL;
0401 
0402     if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
0403             OCFS2_CONTROL_MESSAGE_OP_LEN))
0404         return -EINVAL;
0405 
0406     if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
0407         (msg->newline != '\n'))
0408         return -EINVAL;
0409     msg->space1 = msg->space2 = msg->newline = '\0';
0410 
0411     major = simple_strtol(msg->major, &ptr, 16);
0412     if (!ptr || *ptr)
0413         return -EINVAL;
0414     minor = simple_strtol(msg->minor, &ptr, 16);
0415     if (!ptr || *ptr)
0416         return -EINVAL;
0417 
0418     /*
0419      * The major must be between 1 and 255, inclusive.  The minor
0420      * must be between 0 and 255, inclusive.  The version passed in
0421      * must be within the maximum version supported by the filesystem.
0422      */
0423     if ((major == LONG_MIN) || (major == LONG_MAX) ||
0424         (major > (u8)-1) || (major < 1))
0425         return -ERANGE;
0426     if ((minor == LONG_MIN) || (minor == LONG_MAX) ||
0427         (minor > (u8)-1) || (minor < 0))
0428         return -ERANGE;
0429     if ((major != max->pv_major) ||
0430         (minor > max->pv_minor))
0431         return -EINVAL;
0432 
0433     p->op_proto.pv_major = major;
0434     p->op_proto.pv_minor = minor;
0435 
0436     return ocfs2_control_install_private(file);
0437 }
0438 
0439 static int ocfs2_control_do_down_msg(struct file *file,
0440                      struct ocfs2_control_message_down *msg)
0441 {
0442     long nodenum;
0443     char *p = NULL;
0444 
0445     if (ocfs2_control_get_handshake_state(file) !=
0446         OCFS2_CONTROL_HANDSHAKE_VALID)
0447         return -EINVAL;
0448 
0449     if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
0450             OCFS2_CONTROL_MESSAGE_OP_LEN))
0451         return -EINVAL;
0452 
0453     if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
0454         (msg->newline != '\n'))
0455         return -EINVAL;
0456     msg->space1 = msg->space2 = msg->newline = '\0';
0457 
0458     nodenum = simple_strtol(msg->nodestr, &p, 16);
0459     if (!p || *p)
0460         return -EINVAL;
0461 
0462     if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
0463         (nodenum > INT_MAX) || (nodenum < 0))
0464         return -ERANGE;
0465 
0466     ocfs2_control_send_down(msg->uuid, nodenum);
0467 
0468     return 0;
0469 }
0470 
0471 static ssize_t ocfs2_control_message(struct file *file,
0472                      const char __user *buf,
0473                      size_t count)
0474 {
0475     ssize_t ret;
0476     union ocfs2_control_message msg;
0477 
0478     /* Try to catch padding issues */
0479     WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) !=
0480         (sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1)));
0481 
0482     memset(&msg, 0, sizeof(union ocfs2_control_message));
0483     ret = ocfs2_control_cfu(&msg, count, buf, count);
0484     if (ret)
0485         goto out;
0486 
0487     if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) &&
0488         !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
0489              OCFS2_CONTROL_MESSAGE_OP_LEN))
0490         ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn);
0491     else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) &&
0492          !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
0493               OCFS2_CONTROL_MESSAGE_OP_LEN))
0494         ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv);
0495     else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) &&
0496          !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
0497               OCFS2_CONTROL_MESSAGE_OP_LEN))
0498         ret = ocfs2_control_do_down_msg(file, &msg.u_down);
0499     else
0500         ret = -EINVAL;
0501 
0502 out:
0503     return ret ? ret : count;
0504 }
0505 
0506 static ssize_t ocfs2_control_write(struct file *file,
0507                    const char __user *buf,
0508                    size_t count,
0509                    loff_t *ppos)
0510 {
0511     ssize_t ret;
0512 
0513     switch (ocfs2_control_get_handshake_state(file)) {
0514         case OCFS2_CONTROL_HANDSHAKE_INVALID:
0515             ret = -EINVAL;
0516             break;
0517 
0518         case OCFS2_CONTROL_HANDSHAKE_READ:
0519             ret = ocfs2_control_validate_protocol(file, buf,
0520                                   count);
0521             break;
0522 
0523         case OCFS2_CONTROL_HANDSHAKE_PROTOCOL:
0524         case OCFS2_CONTROL_HANDSHAKE_VALID:
0525             ret = ocfs2_control_message(file, buf, count);
0526             break;
0527 
0528         default:
0529             BUG();
0530             ret = -EIO;
0531             break;
0532     }
0533 
0534     return ret;
0535 }
0536 
0537 /*
0538  * This is a naive version.  If we ever have a new protocol, we'll expand
0539  * it.  Probably using seq_file.
0540  */
0541 static ssize_t ocfs2_control_read(struct file *file,
0542                   char __user *buf,
0543                   size_t count,
0544                   loff_t *ppos)
0545 {
0546     ssize_t ret;
0547 
0548     ret = simple_read_from_buffer(buf, count, ppos,
0549             OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN);
0550 
0551     /* Have we read the whole protocol list? */
0552     if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN)
0553         ocfs2_control_set_handshake_state(file,
0554                           OCFS2_CONTROL_HANDSHAKE_READ);
0555 
0556     return ret;
0557 }
0558 
0559 static int ocfs2_control_release(struct inode *inode, struct file *file)
0560 {
0561     struct ocfs2_control_private *p = file->private_data;
0562 
0563     mutex_lock(&ocfs2_control_lock);
0564 
0565     if (ocfs2_control_get_handshake_state(file) !=
0566         OCFS2_CONTROL_HANDSHAKE_VALID)
0567         goto out;
0568 
0569     if (atomic_dec_and_test(&ocfs2_control_opened)) {
0570         if (!list_empty(&ocfs2_live_connection_list)) {
0571             /* XXX: Do bad things! */
0572             printk(KERN_ERR
0573                    "ocfs2: Unexpected release of ocfs2_control!\n"
0574                    "       Loss of cluster connection requires "
0575                    "an emergency restart!\n");
0576             emergency_restart();
0577         }
0578         /*
0579          * Last valid close clears the node number and resets
0580          * the locking protocol version
0581          */
0582         ocfs2_control_this_node = -1;
0583         running_proto.pv_major = 0;
0584         running_proto.pv_minor = 0;
0585     }
0586 
0587 out:
0588     list_del_init(&p->op_list);
0589     file->private_data = NULL;
0590 
0591     mutex_unlock(&ocfs2_control_lock);
0592 
0593     kfree(p);
0594 
0595     return 0;
0596 }
0597 
0598 static int ocfs2_control_open(struct inode *inode, struct file *file)
0599 {
0600     struct ocfs2_control_private *p;
0601 
0602     p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL);
0603     if (!p)
0604         return -ENOMEM;
0605     p->op_this_node = -1;
0606 
0607     mutex_lock(&ocfs2_control_lock);
0608     file->private_data = p;
0609     list_add(&p->op_list, &ocfs2_control_private_list);
0610     mutex_unlock(&ocfs2_control_lock);
0611 
0612     return 0;
0613 }
0614 
0615 static const struct file_operations ocfs2_control_fops = {
0616     .open    = ocfs2_control_open,
0617     .release = ocfs2_control_release,
0618     .read    = ocfs2_control_read,
0619     .write   = ocfs2_control_write,
0620     .owner   = THIS_MODULE,
0621     .llseek  = default_llseek,
0622 };
0623 
0624 static struct miscdevice ocfs2_control_device = {
0625     .minor      = MISC_DYNAMIC_MINOR,
0626     .name       = "ocfs2_control",
0627     .fops       = &ocfs2_control_fops,
0628 };
0629 
0630 static int ocfs2_control_init(void)
0631 {
0632     int rc;
0633 
0634     atomic_set(&ocfs2_control_opened, 0);
0635 
0636     rc = misc_register(&ocfs2_control_device);
0637     if (rc)
0638         printk(KERN_ERR
0639                "ocfs2: Unable to register ocfs2_control device "
0640                "(errno %d)\n",
0641                -rc);
0642 
0643     return rc;
0644 }
0645 
0646 static void ocfs2_control_exit(void)
0647 {
0648     misc_deregister(&ocfs2_control_device);
0649 }
0650 
0651 static void fsdlm_lock_ast_wrapper(void *astarg)
0652 {
0653     struct ocfs2_dlm_lksb *lksb = astarg;
0654     int status = lksb->lksb_fsdlm.sb_status;
0655 
0656     /*
0657      * For now we're punting on the issue of other non-standard errors
0658      * where we can't tell if the unlock_ast or lock_ast should be called.
0659      * The main "other error" that's possible is EINVAL which means the
0660      * function was called with invalid args, which shouldn't be possible
0661      * since the caller here is under our control.  Other non-standard
0662      * errors probably fall into the same category, or otherwise are fatal
0663      * which means we can't carry on anyway.
0664      */
0665 
0666     if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
0667         lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0);
0668     else
0669         lksb->lksb_conn->cc_proto->lp_lock_ast(lksb);
0670 }
0671 
0672 static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
0673 {
0674     struct ocfs2_dlm_lksb *lksb = astarg;
0675 
0676     lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level);
0677 }
0678 
0679 static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
0680              int mode,
0681              struct ocfs2_dlm_lksb *lksb,
0682              u32 flags,
0683              void *name,
0684              unsigned int namelen)
0685 {
0686     if (!lksb->lksb_fsdlm.sb_lvbptr)
0687         lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
0688                          sizeof(struct dlm_lksb);
0689 
0690     return dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm,
0691             flags|DLM_LKF_NODLCKWT, name, namelen, 0,
0692             fsdlm_lock_ast_wrapper, lksb,
0693             fsdlm_blocking_ast_wrapper);
0694 }
0695 
0696 static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
0697                struct ocfs2_dlm_lksb *lksb,
0698                u32 flags)
0699 {
0700     return dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid,
0701               flags, &lksb->lksb_fsdlm, lksb);
0702 }
0703 
0704 static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
0705 {
0706     return lksb->lksb_fsdlm.sb_status;
0707 }
0708 
0709 static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
0710 {
0711     int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID;
0712 
0713     return !invalid;
0714 }
0715 
0716 static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
0717 {
0718     if (!lksb->lksb_fsdlm.sb_lvbptr)
0719         lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
0720                          sizeof(struct dlm_lksb);
0721     return (void *)(lksb->lksb_fsdlm.sb_lvbptr);
0722 }
0723 
0724 static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb)
0725 {
0726 }
0727 
0728 static int user_plock(struct ocfs2_cluster_connection *conn,
0729               u64 ino,
0730               struct file *file,
0731               int cmd,
0732               struct file_lock *fl)
0733 {
0734     /*
0735      * This more or less just demuxes the plock request into any
0736      * one of three dlm calls.
0737      *
0738      * Internally, fs/dlm will pass these to a misc device, which
0739      * a userspace daemon will read and write to.
0740      *
0741      * For now, cancel requests (which happen internally only),
0742      * are turned into unlocks. Most of this function taken from
0743      * gfs2_lock.
0744      */
0745 
0746     if (cmd == F_CANCELLK) {
0747         cmd = F_SETLK;
0748         fl->fl_type = F_UNLCK;
0749     }
0750 
0751     if (IS_GETLK(cmd))
0752         return dlm_posix_get(conn->cc_lockspace, ino, file, fl);
0753     else if (fl->fl_type == F_UNLCK)
0754         return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl);
0755     else
0756         return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl);
0757 }
0758 
0759 /*
0760  * Compare a requested locking protocol version against the current one.
0761  *
0762  * If the major numbers are different, they are incompatible.
0763  * If the current minor is greater than the request, they are incompatible.
0764  * If the current minor is less than or equal to the request, they are
0765  * compatible, and the requester should run at the current minor version.
0766  */
0767 static int fs_protocol_compare(struct ocfs2_protocol_version *existing,
0768                    struct ocfs2_protocol_version *request)
0769 {
0770     if (existing->pv_major != request->pv_major)
0771         return 1;
0772 
0773     if (existing->pv_minor > request->pv_minor)
0774         return 1;
0775 
0776     if (existing->pv_minor < request->pv_minor)
0777         request->pv_minor = existing->pv_minor;
0778 
0779     return 0;
0780 }
0781 
0782 static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver)
0783 {
0784     struct ocfs2_protocol_version *pv =
0785         (struct ocfs2_protocol_version *)lvb;
0786     /*
0787      * ocfs2_protocol_version has two u8 variables, so we don't
0788      * need any endian conversion.
0789      */
0790     ver->pv_major = pv->pv_major;
0791     ver->pv_minor = pv->pv_minor;
0792 }
0793 
0794 static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb)
0795 {
0796     struct ocfs2_protocol_version *pv =
0797         (struct ocfs2_protocol_version *)lvb;
0798     /*
0799      * ocfs2_protocol_version has two u8 variables, so we don't
0800      * need any endian conversion.
0801      */
0802     pv->pv_major = ver->pv_major;
0803     pv->pv_minor = ver->pv_minor;
0804 }
0805 
0806 static void sync_wait_cb(void *arg)
0807 {
0808     struct ocfs2_cluster_connection *conn = arg;
0809     struct ocfs2_live_connection *lc = conn->cc_private;
0810     complete(&lc->oc_sync_wait);
0811 }
0812 
0813 static int sync_unlock(struct ocfs2_cluster_connection *conn,
0814         struct dlm_lksb *lksb, char *name)
0815 {
0816     int error;
0817     struct ocfs2_live_connection *lc = conn->cc_private;
0818 
0819     error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn);
0820     if (error) {
0821         printk(KERN_ERR "%s lkid %x error %d\n",
0822                 name, lksb->sb_lkid, error);
0823         return error;
0824     }
0825 
0826     wait_for_completion(&lc->oc_sync_wait);
0827 
0828     if (lksb->sb_status != -DLM_EUNLOCK) {
0829         printk(KERN_ERR "%s lkid %x status %d\n",
0830                 name, lksb->sb_lkid, lksb->sb_status);
0831         return -1;
0832     }
0833     return 0;
0834 }
0835 
0836 static int sync_lock(struct ocfs2_cluster_connection *conn,
0837         int mode, uint32_t flags,
0838         struct dlm_lksb *lksb, char *name)
0839 {
0840     int error, status;
0841     struct ocfs2_live_connection *lc = conn->cc_private;
0842 
0843     error = dlm_lock(conn->cc_lockspace, mode, lksb, flags,
0844             name, strlen(name),
0845             0, sync_wait_cb, conn, NULL);
0846     if (error) {
0847         printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n",
0848                 name, lksb->sb_lkid, flags, mode, error);
0849         return error;
0850     }
0851 
0852     wait_for_completion(&lc->oc_sync_wait);
0853 
0854     status = lksb->sb_status;
0855 
0856     if (status && status != -EAGAIN) {
0857         printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n",
0858                 name, lksb->sb_lkid, flags, mode, status);
0859     }
0860 
0861     return status;
0862 }
0863 
0864 
0865 static int version_lock(struct ocfs2_cluster_connection *conn, int mode,
0866         int flags)
0867 {
0868     struct ocfs2_live_connection *lc = conn->cc_private;
0869     return sync_lock(conn, mode, flags,
0870             &lc->oc_version_lksb, VERSION_LOCK);
0871 }
0872 
0873 static int version_unlock(struct ocfs2_cluster_connection *conn)
0874 {
0875     struct ocfs2_live_connection *lc = conn->cc_private;
0876     return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK);
0877 }
0878 
0879 /* get_protocol_version()
0880  *
0881  * To exchange ocfs2 versioning, we use the LVB of the version dlm lock.
0882  * The algorithm is:
0883  * 1. Attempt to take the lock in EX mode (non-blocking).
0884  * 2. If successful (which means it is the first mount), write the
0885  *    version number and downconvert to PR lock.
0886  * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after
0887  *    taking the PR lock.
0888  */
0889 
0890 static int get_protocol_version(struct ocfs2_cluster_connection *conn)
0891 {
0892     int ret;
0893     struct ocfs2_live_connection *lc = conn->cc_private;
0894     struct ocfs2_protocol_version pv;
0895 
0896     running_proto.pv_major =
0897         ocfs2_user_plugin.sp_max_proto.pv_major;
0898     running_proto.pv_minor =
0899         ocfs2_user_plugin.sp_max_proto.pv_minor;
0900 
0901     lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb;
0902     ret = version_lock(conn, DLM_LOCK_EX,
0903             DLM_LKF_VALBLK|DLM_LKF_NOQUEUE);
0904     if (!ret) {
0905         conn->cc_version.pv_major = running_proto.pv_major;
0906         conn->cc_version.pv_minor = running_proto.pv_minor;
0907         version_to_lvb(&running_proto, lc->oc_lvb);
0908         version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
0909     } else if (ret == -EAGAIN) {
0910         ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK);
0911         if (ret)
0912             goto out;
0913         lvb_to_version(lc->oc_lvb, &pv);
0914 
0915         if ((pv.pv_major != running_proto.pv_major) ||
0916                 (pv.pv_minor > running_proto.pv_minor)) {
0917             ret = -EINVAL;
0918             goto out;
0919         }
0920 
0921         conn->cc_version.pv_major = pv.pv_major;
0922         conn->cc_version.pv_minor = pv.pv_minor;
0923     }
0924 out:
0925     return ret;
0926 }
0927 
0928 static void user_recover_prep(void *arg)
0929 {
0930 }
0931 
0932 static void user_recover_slot(void *arg, struct dlm_slot *slot)
0933 {
0934     struct ocfs2_cluster_connection *conn = arg;
0935     printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n",
0936             slot->nodeid, slot->slot);
0937     conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data);
0938 
0939 }
0940 
0941 static void user_recover_done(void *arg, struct dlm_slot *slots,
0942         int num_slots, int our_slot,
0943         uint32_t generation)
0944 {
0945     struct ocfs2_cluster_connection *conn = arg;
0946     struct ocfs2_live_connection *lc = conn->cc_private;
0947     int i;
0948 
0949     for (i = 0; i < num_slots; i++)
0950         if (slots[i].slot == our_slot) {
0951             atomic_set(&lc->oc_this_node, slots[i].nodeid);
0952             break;
0953         }
0954 
0955     lc->oc_our_slot = our_slot;
0956     wake_up(&lc->oc_wait);
0957 }
0958 
0959 static const struct dlm_lockspace_ops ocfs2_ls_ops = {
0960     .recover_prep = user_recover_prep,
0961     .recover_slot = user_recover_slot,
0962     .recover_done = user_recover_done,
0963 };
0964 
0965 static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
0966 {
0967     version_unlock(conn);
0968     dlm_release_lockspace(conn->cc_lockspace, 2);
0969     conn->cc_lockspace = NULL;
0970     ocfs2_live_connection_drop(conn->cc_private);
0971     conn->cc_private = NULL;
0972     return 0;
0973 }
0974 
0975 static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
0976 {
0977     dlm_lockspace_t *fsdlm;
0978     struct ocfs2_live_connection *lc;
0979     int rc, ops_rv;
0980 
0981     BUG_ON(conn == NULL);
0982 
0983     lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
0984     if (!lc)
0985         return -ENOMEM;
0986 
0987     init_waitqueue_head(&lc->oc_wait);
0988     init_completion(&lc->oc_sync_wait);
0989     atomic_set(&lc->oc_this_node, 0);
0990     conn->cc_private = lc;
0991     lc->oc_type = NO_CONTROLD;
0992 
0993     rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name,
0994                    DLM_LSFL_FS | DLM_LSFL_NEWEXCL, DLM_LVB_LEN,
0995                    &ocfs2_ls_ops, conn, &ops_rv, &fsdlm);
0996     if (rc) {
0997         if (rc == -EEXIST || rc == -EPROTO)
0998             printk(KERN_ERR "ocfs2: Unable to create the "
0999                 "lockspace %s (%d), because a ocfs2-tools "
1000                 "program is running on this file system "
1001                 "with the same name lockspace\n",
1002                 conn->cc_name, rc);
1003         goto out;
1004     }
1005 
1006     if (ops_rv == -EOPNOTSUPP) {
1007         lc->oc_type = WITH_CONTROLD;
1008         printk(KERN_NOTICE "ocfs2: You seem to be using an older "
1009                 "version of dlm_controld and/or ocfs2-tools."
1010                 " Please consider upgrading.\n");
1011     } else if (ops_rv) {
1012         rc = ops_rv;
1013         goto out;
1014     }
1015     conn->cc_lockspace = fsdlm;
1016 
1017     rc = ocfs2_live_connection_attach(conn, lc);
1018     if (rc)
1019         goto out;
1020 
1021     if (lc->oc_type == NO_CONTROLD) {
1022         rc = get_protocol_version(conn);
1023         if (rc) {
1024             printk(KERN_ERR "ocfs2: Could not determine"
1025                     " locking version\n");
1026             user_cluster_disconnect(conn);
1027             goto out;
1028         }
1029         wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0));
1030     }
1031 
1032     /*
1033      * running_proto must have been set before we allowed any mounts
1034      * to proceed.
1035      */
1036     if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
1037         printk(KERN_ERR
1038                "Unable to mount with fs locking protocol version "
1039                "%u.%u because negotiated protocol is %u.%u\n",
1040                conn->cc_version.pv_major, conn->cc_version.pv_minor,
1041                running_proto.pv_major, running_proto.pv_minor);
1042         rc = -EPROTO;
1043         ocfs2_live_connection_drop(lc);
1044         lc = NULL;
1045     }
1046 
1047 out:
1048     if (rc)
1049         kfree(lc);
1050     return rc;
1051 }
1052 
1053 
1054 static int user_cluster_this_node(struct ocfs2_cluster_connection *conn,
1055                   unsigned int *this_node)
1056 {
1057     int rc;
1058     struct ocfs2_live_connection *lc = conn->cc_private;
1059 
1060     if (lc->oc_type == WITH_CONTROLD)
1061         rc = ocfs2_control_get_this_node();
1062     else if (lc->oc_type == NO_CONTROLD)
1063         rc = atomic_read(&lc->oc_this_node);
1064     else
1065         rc = -EINVAL;
1066 
1067     if (rc < 0)
1068         return rc;
1069 
1070     *this_node = rc;
1071     return 0;
1072 }
1073 
1074 static struct ocfs2_stack_operations ocfs2_user_plugin_ops = {
1075     .connect    = user_cluster_connect,
1076     .disconnect = user_cluster_disconnect,
1077     .this_node  = user_cluster_this_node,
1078     .dlm_lock   = user_dlm_lock,
1079     .dlm_unlock = user_dlm_unlock,
1080     .lock_status    = user_dlm_lock_status,
1081     .lvb_valid  = user_dlm_lvb_valid,
1082     .lock_lvb   = user_dlm_lvb,
1083     .plock      = user_plock,
1084     .dump_lksb  = user_dlm_dump_lksb,
1085 };
1086 
1087 static struct ocfs2_stack_plugin ocfs2_user_plugin = {
1088     .sp_name    = "user",
1089     .sp_ops     = &ocfs2_user_plugin_ops,
1090     .sp_owner   = THIS_MODULE,
1091 };
1092 
1093 
1094 static int __init ocfs2_user_plugin_init(void)
1095 {
1096     int rc;
1097 
1098     rc = ocfs2_control_init();
1099     if (!rc) {
1100         rc = ocfs2_stack_glue_register(&ocfs2_user_plugin);
1101         if (rc)
1102             ocfs2_control_exit();
1103     }
1104 
1105     return rc;
1106 }
1107 
1108 static void __exit ocfs2_user_plugin_exit(void)
1109 {
1110     ocfs2_stack_glue_unregister(&ocfs2_user_plugin);
1111     ocfs2_control_exit();
1112 }
1113 
1114 MODULE_AUTHOR("Oracle");
1115 MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks");
1116 MODULE_LICENSE("GPL");
1117 module_init(ocfs2_user_plugin_init);
1118 module_exit(ocfs2_user_plugin_exit);