0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #include <linux/module.h>
0012 #include <linux/fs.h>
0013 #include <linux/types.h>
0014 #include <linux/highmem.h>
0015 #include <linux/init.h>
0016 #include <linux/sysctl.h>
0017 #include <linux/random.h>
0018 #include <linux/blkdev.h>
0019 #include <linux/socket.h>
0020 #include <linux/inet.h>
0021 #include <linux/spinlock.h>
0022
0023
0024 #include "../cluster/heartbeat.h"
0025 #include "../cluster/nodemanager.h"
0026 #include "../cluster/tcp.h"
0027
0028 #include "dlmapi.h"
0029 #include "dlmcommon.h"
0030
0031 #include "dlmconvert.h"
0032
0033 #define MLOG_MASK_PREFIX ML_DLM
0034 #include "../cluster/masklog.h"
0035
0036
0037
0038
0039
0040
0041 static enum dlm_status __dlmconvert_master(struct dlm_ctxt *dlm,
0042 struct dlm_lock_resource *res,
0043 struct dlm_lock *lock, int flags,
0044 int type, int *call_ast,
0045 int *kick_thread);
0046 static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
0047 struct dlm_lock_resource *res,
0048 struct dlm_lock *lock, int flags, int type);
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059 enum dlm_status dlmconvert_master(struct dlm_ctxt *dlm,
0060 struct dlm_lock_resource *res,
0061 struct dlm_lock *lock, int flags, int type)
0062 {
0063 int call_ast = 0, kick_thread = 0;
0064 enum dlm_status status;
0065
0066 spin_lock(&res->spinlock);
0067
0068 __dlm_wait_on_lockres(res);
0069 __dlm_lockres_reserve_ast(res);
0070 res->state |= DLM_LOCK_RES_IN_PROGRESS;
0071
0072 status = __dlmconvert_master(dlm, res, lock, flags, type,
0073 &call_ast, &kick_thread);
0074
0075 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
0076 spin_unlock(&res->spinlock);
0077 wake_up(&res->wq);
0078 if (status != DLM_NORMAL && status != DLM_NOTQUEUED)
0079 dlm_error(status);
0080
0081
0082 if (call_ast)
0083 dlm_queue_ast(dlm, lock);
0084 else
0085 dlm_lockres_release_ast(dlm, res);
0086
0087 if (kick_thread)
0088 dlm_kick_thread(dlm, res);
0089
0090 return status;
0091 }
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101
0102 static enum dlm_status __dlmconvert_master(struct dlm_ctxt *dlm,
0103 struct dlm_lock_resource *res,
0104 struct dlm_lock *lock, int flags,
0105 int type, int *call_ast,
0106 int *kick_thread)
0107 {
0108 enum dlm_status status = DLM_NORMAL;
0109 struct dlm_lock *tmplock=NULL;
0110
0111 assert_spin_locked(&res->spinlock);
0112
0113 mlog(0, "type=%d, convert_type=%d, new convert_type=%d\n",
0114 lock->ml.type, lock->ml.convert_type, type);
0115
0116 spin_lock(&lock->spinlock);
0117
0118
0119 if (lock->ml.convert_type != LKM_IVMODE) {
0120 mlog(ML_ERROR, "attempted to convert a lock with a lock "
0121 "conversion pending\n");
0122 status = DLM_DENIED;
0123 goto unlock_exit;
0124 }
0125
0126
0127 if (!dlm_lock_on_list(&res->granted, lock)) {
0128 mlog(ML_ERROR, "attempted to convert a lock not on grant "
0129 "queue\n");
0130 status = DLM_DENIED;
0131 goto unlock_exit;
0132 }
0133
0134 if (flags & LKM_VALBLK) {
0135 switch (lock->ml.type) {
0136 case LKM_EXMODE:
0137
0138 mlog(0, "will set lvb: converting %s->%s\n",
0139 dlm_lock_mode_name(lock->ml.type),
0140 dlm_lock_mode_name(type));
0141 lock->lksb->flags |= DLM_LKSB_PUT_LVB;
0142 break;
0143 case LKM_PRMODE:
0144 case LKM_NLMODE:
0145
0146 if (type > LKM_NLMODE) {
0147 mlog(0, "will fetch new value into "
0148 "lvb: converting %s->%s\n",
0149 dlm_lock_mode_name(lock->ml.type),
0150 dlm_lock_mode_name(type));
0151 lock->lksb->flags |= DLM_LKSB_GET_LVB;
0152 } else {
0153 mlog(0, "will NOT fetch new value "
0154 "into lvb: converting %s->%s\n",
0155 dlm_lock_mode_name(lock->ml.type),
0156 dlm_lock_mode_name(type));
0157 flags &= ~(LKM_VALBLK);
0158 }
0159 break;
0160 }
0161 }
0162
0163
0164
0165 if (type <= lock->ml.type)
0166 goto grant;
0167
0168
0169 status = DLM_NORMAL;
0170 list_for_each_entry(tmplock, &res->granted, list) {
0171 if (tmplock == lock)
0172 continue;
0173 if (!dlm_lock_compatible(tmplock->ml.type, type))
0174 goto switch_queues;
0175 }
0176
0177 list_for_each_entry(tmplock, &res->converting, list) {
0178 if (!dlm_lock_compatible(tmplock->ml.type, type))
0179 goto switch_queues;
0180
0181 if (!dlm_lock_compatible(tmplock->ml.convert_type, type))
0182 goto switch_queues;
0183 }
0184
0185
0186
0187 grant:
0188 mlog(0, "res %.*s, granting %s lock\n", res->lockname.len,
0189 res->lockname.name, dlm_lock_mode_name(type));
0190
0191 lock->lksb->status = DLM_NORMAL;
0192 if (lock->ml.node == dlm->node_num)
0193 mlog(0, "doing in-place convert for nonlocal lock\n");
0194 lock->ml.type = type;
0195 if (lock->lksb->flags & DLM_LKSB_PUT_LVB)
0196 memcpy(res->lvb, lock->lksb->lvb, DLM_LVB_LEN);
0197
0198
0199
0200
0201
0202 list_move_tail(&lock->list, &res->granted);
0203
0204 status = DLM_NORMAL;
0205 *call_ast = 1;
0206 goto unlock_exit;
0207
0208 switch_queues:
0209 if (flags & LKM_NOQUEUE) {
0210 mlog(0, "failed to convert NOQUEUE lock %.*s from "
0211 "%d to %d...\n", res->lockname.len, res->lockname.name,
0212 lock->ml.type, type);
0213 status = DLM_NOTQUEUED;
0214 goto unlock_exit;
0215 }
0216 mlog(0, "res %.*s, queueing...\n", res->lockname.len,
0217 res->lockname.name);
0218
0219 lock->ml.convert_type = type;
0220
0221 list_move_tail(&lock->list, &res->converting);
0222
0223 unlock_exit:
0224 spin_unlock(&lock->spinlock);
0225 if (status == DLM_DENIED) {
0226 __dlm_print_one_lock_resource(res);
0227 }
0228 if (status == DLM_NORMAL)
0229 *kick_thread = 1;
0230 return status;
0231 }
0232
0233 void dlm_revert_pending_convert(struct dlm_lock_resource *res,
0234 struct dlm_lock *lock)
0235 {
0236
0237 list_move_tail(&lock->list, &res->granted);
0238 lock->ml.convert_type = LKM_IVMODE;
0239 lock->lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
0240 }
0241
0242
0243
0244
0245
0246
0247
0248
0249 enum dlm_status dlmconvert_remote(struct dlm_ctxt *dlm,
0250 struct dlm_lock_resource *res,
0251 struct dlm_lock *lock, int flags, int type)
0252 {
0253 enum dlm_status status;
0254
0255 mlog(0, "type=%d, convert_type=%d, busy=%d\n", lock->ml.type,
0256 lock->ml.convert_type, res->state & DLM_LOCK_RES_IN_PROGRESS);
0257
0258 spin_lock(&res->spinlock);
0259 if (res->state & DLM_LOCK_RES_RECOVERING) {
0260 mlog(0, "bailing out early since res is RECOVERING "
0261 "on secondary queue\n");
0262
0263 status = DLM_RECOVERING;
0264 goto bail;
0265 }
0266
0267 __dlm_wait_on_lockres(res);
0268
0269 if (lock->ml.convert_type != LKM_IVMODE) {
0270 __dlm_print_one_lock_resource(res);
0271 mlog(ML_ERROR, "converting a remote lock that is already "
0272 "converting! (cookie=%u:%llu, conv=%d)\n",
0273 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
0274 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
0275 lock->ml.convert_type);
0276 status = DLM_DENIED;
0277 goto bail;
0278 }
0279
0280 if (lock->ml.type == type && lock->ml.convert_type == LKM_IVMODE) {
0281 mlog(0, "last convert request returned DLM_RECOVERING, but "
0282 "owner has already queued and sent ast to me. res %.*s, "
0283 "(cookie=%u:%llu, type=%d, conv=%d)\n",
0284 res->lockname.len, res->lockname.name,
0285 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
0286 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
0287 lock->ml.type, lock->ml.convert_type);
0288 status = DLM_NORMAL;
0289 goto bail;
0290 }
0291
0292 res->state |= DLM_LOCK_RES_IN_PROGRESS;
0293
0294
0295 list_move_tail(&lock->list, &res->converting);
0296 lock->convert_pending = 1;
0297 lock->ml.convert_type = type;
0298
0299 if (flags & LKM_VALBLK) {
0300 if (lock->ml.type == LKM_EXMODE) {
0301 flags |= LKM_PUT_LVB;
0302 lock->lksb->flags |= DLM_LKSB_PUT_LVB;
0303 } else {
0304 if (lock->ml.convert_type == LKM_NLMODE)
0305 flags &= ~LKM_VALBLK;
0306 else {
0307 flags |= LKM_GET_LVB;
0308 lock->lksb->flags |= DLM_LKSB_GET_LVB;
0309 }
0310 }
0311 }
0312 spin_unlock(&res->spinlock);
0313
0314
0315
0316 status = dlm_send_remote_convert_request(dlm, res, lock, flags, type);
0317
0318 spin_lock(&res->spinlock);
0319 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
0320
0321
0322
0323
0324 if (status != DLM_NORMAL) {
0325 if (status != DLM_NOTQUEUED)
0326 dlm_error(status);
0327 dlm_revert_pending_convert(res, lock);
0328 } else if (!lock->convert_pending) {
0329 mlog(0, "%s: res %.*s, owner died and lock has been moved back "
0330 "to granted list, retry convert.\n",
0331 dlm->name, res->lockname.len, res->lockname.name);
0332 status = DLM_RECOVERING;
0333 }
0334
0335 lock->convert_pending = 0;
0336 bail:
0337 spin_unlock(&res->spinlock);
0338
0339
0340
0341 wake_up(&res->wq);
0342
0343 return status;
0344 }
0345
0346
0347
0348
0349
0350
0351
0352
0353 static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
0354 struct dlm_lock_resource *res,
0355 struct dlm_lock *lock, int flags, int type)
0356 {
0357 struct dlm_convert_lock convert;
0358 int tmpret;
0359 enum dlm_status ret;
0360 int status = 0;
0361 struct kvec vec[2];
0362 size_t veclen = 1;
0363
0364 mlog(0, "%.*s\n", res->lockname.len, res->lockname.name);
0365
0366 memset(&convert, 0, sizeof(struct dlm_convert_lock));
0367 convert.node_idx = dlm->node_num;
0368 convert.requested_type = type;
0369 convert.cookie = lock->ml.cookie;
0370 convert.namelen = res->lockname.len;
0371 convert.flags = cpu_to_be32(flags);
0372 memcpy(convert.name, res->lockname.name, convert.namelen);
0373
0374 vec[0].iov_len = sizeof(struct dlm_convert_lock);
0375 vec[0].iov_base = &convert;
0376
0377 if (flags & LKM_PUT_LVB) {
0378
0379 vec[1].iov_len = DLM_LVB_LEN;
0380 vec[1].iov_base = lock->lksb->lvb;
0381 veclen++;
0382 }
0383
0384 tmpret = o2net_send_message_vec(DLM_CONVERT_LOCK_MSG, dlm->key,
0385 vec, veclen, res->owner, &status);
0386 if (tmpret >= 0) {
0387
0388 ret = status;
0389 if (ret == DLM_RECOVERING) {
0390 mlog(0, "node %u returned DLM_RECOVERING from convert "
0391 "message!\n", res->owner);
0392 } else if (ret == DLM_MIGRATING) {
0393 mlog(0, "node %u returned DLM_MIGRATING from convert "
0394 "message!\n", res->owner);
0395 } else if (ret == DLM_FORWARD) {
0396 mlog(0, "node %u returned DLM_FORWARD from convert "
0397 "message!\n", res->owner);
0398 } else if (ret != DLM_NORMAL && ret != DLM_NOTQUEUED)
0399 dlm_error(ret);
0400 } else {
0401 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
0402 "node %u\n", tmpret, DLM_CONVERT_LOCK_MSG, dlm->key,
0403 res->owner);
0404 if (dlm_is_host_down(tmpret)) {
0405
0406
0407
0408 dlm_wait_for_node_death(dlm, res->owner,
0409 DLM_NODE_DEATH_WAIT_MAX);
0410 ret = DLM_RECOVERING;
0411 mlog(0, "node %u died so returning DLM_RECOVERING "
0412 "from convert message!\n", res->owner);
0413 } else {
0414 ret = dlm_err_to_dlm_status(tmpret);
0415 }
0416 }
0417
0418 return ret;
0419 }
0420
0421
0422
0423
0424
0425
0426
0427
0428
0429 int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
0430 void **ret_data)
0431 {
0432 struct dlm_ctxt *dlm = data;
0433 struct dlm_convert_lock *cnv = (struct dlm_convert_lock *)msg->buf;
0434 struct dlm_lock_resource *res = NULL;
0435 struct dlm_lock *lock = NULL;
0436 struct dlm_lock *tmp_lock;
0437 struct dlm_lockstatus *lksb;
0438 enum dlm_status status = DLM_NORMAL;
0439 u32 flags;
0440 int call_ast = 0, kick_thread = 0, ast_reserved = 0, wake = 0;
0441
0442 if (!dlm_grab(dlm)) {
0443 dlm_error(DLM_REJECTED);
0444 return DLM_REJECTED;
0445 }
0446
0447 mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
0448 "Domain %s not fully joined!\n", dlm->name);
0449
0450 if (cnv->namelen > DLM_LOCKID_NAME_MAX) {
0451 status = DLM_IVBUFLEN;
0452 dlm_error(status);
0453 goto leave;
0454 }
0455
0456 flags = be32_to_cpu(cnv->flags);
0457
0458 if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
0459 (LKM_PUT_LVB|LKM_GET_LVB)) {
0460 mlog(ML_ERROR, "both PUT and GET lvb specified\n");
0461 status = DLM_BADARGS;
0462 goto leave;
0463 }
0464
0465 mlog(0, "lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" :
0466 (flags & LKM_GET_LVB ? "get lvb" : "none"));
0467
0468 status = DLM_IVLOCKID;
0469 res = dlm_lookup_lockres(dlm, cnv->name, cnv->namelen);
0470 if (!res) {
0471 dlm_error(status);
0472 goto leave;
0473 }
0474
0475 spin_lock(&res->spinlock);
0476 status = __dlm_lockres_state_to_status(res);
0477 if (status != DLM_NORMAL) {
0478 spin_unlock(&res->spinlock);
0479 dlm_error(status);
0480 goto leave;
0481 }
0482 list_for_each_entry(tmp_lock, &res->granted, list) {
0483 if (tmp_lock->ml.cookie == cnv->cookie &&
0484 tmp_lock->ml.node == cnv->node_idx) {
0485 lock = tmp_lock;
0486 dlm_lock_get(lock);
0487 break;
0488 }
0489 }
0490 spin_unlock(&res->spinlock);
0491 if (!lock) {
0492 status = DLM_IVLOCKID;
0493 mlog(ML_ERROR, "did not find lock to convert on grant queue! "
0494 "cookie=%u:%llu\n",
0495 dlm_get_lock_cookie_node(be64_to_cpu(cnv->cookie)),
0496 dlm_get_lock_cookie_seq(be64_to_cpu(cnv->cookie)));
0497 dlm_print_one_lock_resource(res);
0498 goto leave;
0499 }
0500
0501
0502 lksb = lock->lksb;
0503
0504
0505 if (flags & LKM_PUT_LVB) {
0506 BUG_ON(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
0507 lksb->flags |= DLM_LKSB_PUT_LVB;
0508 memcpy(&lksb->lvb[0], &cnv->lvb[0], DLM_LVB_LEN);
0509 } else if (flags & LKM_GET_LVB) {
0510 BUG_ON(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
0511 lksb->flags |= DLM_LKSB_GET_LVB;
0512 }
0513
0514 spin_lock(&res->spinlock);
0515 status = __dlm_lockres_state_to_status(res);
0516 if (status == DLM_NORMAL) {
0517 __dlm_lockres_reserve_ast(res);
0518 ast_reserved = 1;
0519 res->state |= DLM_LOCK_RES_IN_PROGRESS;
0520 status = __dlmconvert_master(dlm, res, lock, flags,
0521 cnv->requested_type,
0522 &call_ast, &kick_thread);
0523 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
0524 wake = 1;
0525 }
0526 spin_unlock(&res->spinlock);
0527 if (wake)
0528 wake_up(&res->wq);
0529
0530 if (status != DLM_NORMAL) {
0531 if (status != DLM_NOTQUEUED)
0532 dlm_error(status);
0533 lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
0534 }
0535
0536 leave:
0537 if (lock)
0538 dlm_lock_put(lock);
0539
0540
0541 if (call_ast)
0542 dlm_queue_ast(dlm, lock);
0543 else if (ast_reserved)
0544 dlm_lockres_release_ast(dlm, res);
0545
0546 if (kick_thread)
0547 dlm_kick_thread(dlm, res);
0548
0549 if (res)
0550 dlm_lockres_put(res);
0551
0552 dlm_put(dlm);
0553
0554 return status;
0555 }