Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-or-later
0002 /*
0003  * dlmmod.c
0004  *
0005  * standalone DLM module
0006  *
0007  * Copyright (C) 2004 Oracle.  All rights reserved.
0008  */
0009 
0010 
0011 #include <linux/module.h>
0012 #include <linux/fs.h>
0013 #include <linux/types.h>
0014 #include <linux/slab.h>
0015 #include <linux/highmem.h>
0016 #include <linux/init.h>
0017 #include <linux/sysctl.h>
0018 #include <linux/random.h>
0019 #include <linux/blkdev.h>
0020 #include <linux/socket.h>
0021 #include <linux/inet.h>
0022 #include <linux/spinlock.h>
0023 #include <linux/delay.h>
0024 
0025 
0026 #include "../cluster/heartbeat.h"
0027 #include "../cluster/nodemanager.h"
0028 #include "../cluster/tcp.h"
0029 
0030 #include "dlmapi.h"
0031 #include "dlmcommon.h"
0032 #include "dlmdomain.h"
0033 #include "dlmdebug.h"
0034 
0035 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
0036 #include "../cluster/masklog.h"
0037 
0038 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
0039                   struct dlm_master_list_entry *mle,
0040                   struct o2nm_node *node,
0041                   int idx);
0042 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
0043                 struct dlm_master_list_entry *mle,
0044                 struct o2nm_node *node,
0045                 int idx);
0046 
0047 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
0048 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
0049                 struct dlm_lock_resource *res,
0050                 void *nodemap, u32 flags);
0051 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
0052 
0053 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
0054                 struct dlm_master_list_entry *mle,
0055                 const char *name,
0056                 unsigned int namelen)
0057 {
0058     if (dlm != mle->dlm)
0059         return 0;
0060 
0061     if (namelen != mle->mnamelen ||
0062         memcmp(name, mle->mname, namelen) != 0)
0063         return 0;
0064 
0065     return 1;
0066 }
0067 
0068 static struct kmem_cache *dlm_lockres_cache;
0069 static struct kmem_cache *dlm_lockname_cache;
0070 static struct kmem_cache *dlm_mle_cache;
0071 
0072 static void dlm_mle_release(struct kref *kref);
0073 static void dlm_init_mle(struct dlm_master_list_entry *mle,
0074             enum dlm_mle_type type,
0075             struct dlm_ctxt *dlm,
0076             struct dlm_lock_resource *res,
0077             const char *name,
0078             unsigned int namelen);
0079 static void dlm_put_mle(struct dlm_master_list_entry *mle);
0080 static void __dlm_put_mle(struct dlm_master_list_entry *mle);
0081 static int dlm_find_mle(struct dlm_ctxt *dlm,
0082             struct dlm_master_list_entry **mle,
0083             char *name, unsigned int namelen);
0084 
0085 static int dlm_do_master_request(struct dlm_lock_resource *res,
0086                  struct dlm_master_list_entry *mle, int to);
0087 
0088 
0089 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
0090                      struct dlm_lock_resource *res,
0091                      struct dlm_master_list_entry *mle,
0092                      int *blocked);
0093 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
0094                     struct dlm_lock_resource *res,
0095                     struct dlm_master_list_entry *mle,
0096                     int blocked);
0097 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
0098                  struct dlm_lock_resource *res,
0099                  struct dlm_master_list_entry *mle,
0100                  struct dlm_master_list_entry **oldmle,
0101                  const char *name, unsigned int namelen,
0102                  u8 new_master, u8 master);
0103 
0104 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
0105                     struct dlm_lock_resource *res);
0106 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
0107                       struct dlm_lock_resource *res);
0108 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
0109                        struct dlm_lock_resource *res,
0110                        u8 target);
0111 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
0112                        struct dlm_lock_resource *res);
0113 
0114 
0115 int dlm_is_host_down(int errno)
0116 {
0117     switch (errno) {
0118         case -EBADF:
0119         case -ECONNREFUSED:
0120         case -ENOTCONN:
0121         case -ECONNRESET:
0122         case -EPIPE:
0123         case -EHOSTDOWN:
0124         case -EHOSTUNREACH:
0125         case -ETIMEDOUT:
0126         case -ECONNABORTED:
0127         case -ENETDOWN:
0128         case -ENETUNREACH:
0129         case -ENETRESET:
0130         case -ESHUTDOWN:
0131         case -ENOPROTOOPT:
0132         case -EINVAL:   /* if returned from our tcp code,
0133                    this means there is no socket */
0134             return 1;
0135     }
0136     return 0;
0137 }
0138 
0139 
0140 /*
0141  * MASTER LIST FUNCTIONS
0142  */
0143 
0144 
0145 /*
0146  * regarding master list entries and heartbeat callbacks:
0147  *
0148  * in order to avoid sleeping and allocation that occurs in
0149  * heartbeat, master list entries are simply attached to the
0150  * dlm's established heartbeat callbacks.  the mle is attached
0151  * when it is created, and since the dlm->spinlock is held at
0152  * that time, any heartbeat event will be properly discovered
0153  * by the mle.  the mle needs to be detached from the
0154  * dlm->mle_hb_events list as soon as heartbeat events are no
0155  * longer useful to the mle, and before the mle is freed.
0156  *
0157  * as a general rule, heartbeat events are no longer needed by
0158  * the mle once an "answer" regarding the lock master has been
0159  * received.
0160  */
0161 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
0162                           struct dlm_master_list_entry *mle)
0163 {
0164     assert_spin_locked(&dlm->spinlock);
0165 
0166     list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
0167 }
0168 
0169 
0170 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
0171                           struct dlm_master_list_entry *mle)
0172 {
0173     if (!list_empty(&mle->hb_events))
0174         list_del_init(&mle->hb_events);
0175 }
0176 
0177 
0178 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
0179                         struct dlm_master_list_entry *mle)
0180 {
0181     spin_lock(&dlm->spinlock);
0182     __dlm_mle_detach_hb_events(dlm, mle);
0183     spin_unlock(&dlm->spinlock);
0184 }
0185 
0186 static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
0187 {
0188     struct dlm_ctxt *dlm;
0189     dlm = mle->dlm;
0190 
0191     assert_spin_locked(&dlm->spinlock);
0192     assert_spin_locked(&dlm->master_lock);
0193     mle->inuse++;
0194     kref_get(&mle->mle_refs);
0195 }
0196 
0197 static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
0198 {
0199     struct dlm_ctxt *dlm;
0200     dlm = mle->dlm;
0201 
0202     spin_lock(&dlm->spinlock);
0203     spin_lock(&dlm->master_lock);
0204     mle->inuse--;
0205     __dlm_put_mle(mle);
0206     spin_unlock(&dlm->master_lock);
0207     spin_unlock(&dlm->spinlock);
0208 
0209 }
0210 
0211 /* remove from list and free */
0212 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
0213 {
0214     struct dlm_ctxt *dlm;
0215     dlm = mle->dlm;
0216 
0217     assert_spin_locked(&dlm->spinlock);
0218     assert_spin_locked(&dlm->master_lock);
0219     if (!kref_read(&mle->mle_refs)) {
0220         /* this may or may not crash, but who cares.
0221          * it's a BUG. */
0222         mlog(ML_ERROR, "bad mle: %p\n", mle);
0223         dlm_print_one_mle(mle);
0224         BUG();
0225     } else
0226         kref_put(&mle->mle_refs, dlm_mle_release);
0227 }
0228 
0229 
0230 /* must not have any spinlocks coming in */
0231 static void dlm_put_mle(struct dlm_master_list_entry *mle)
0232 {
0233     struct dlm_ctxt *dlm;
0234     dlm = mle->dlm;
0235 
0236     spin_lock(&dlm->spinlock);
0237     spin_lock(&dlm->master_lock);
0238     __dlm_put_mle(mle);
0239     spin_unlock(&dlm->master_lock);
0240     spin_unlock(&dlm->spinlock);
0241 }
0242 
0243 static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
0244 {
0245     kref_get(&mle->mle_refs);
0246 }
0247 
0248 static void dlm_init_mle(struct dlm_master_list_entry *mle,
0249             enum dlm_mle_type type,
0250             struct dlm_ctxt *dlm,
0251             struct dlm_lock_resource *res,
0252             const char *name,
0253             unsigned int namelen)
0254 {
0255     assert_spin_locked(&dlm->spinlock);
0256 
0257     mle->dlm = dlm;
0258     mle->type = type;
0259     INIT_HLIST_NODE(&mle->master_hash_node);
0260     INIT_LIST_HEAD(&mle->hb_events);
0261     memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
0262     spin_lock_init(&mle->spinlock);
0263     init_waitqueue_head(&mle->wq);
0264     atomic_set(&mle->woken, 0);
0265     kref_init(&mle->mle_refs);
0266     memset(mle->response_map, 0, sizeof(mle->response_map));
0267     mle->master = O2NM_MAX_NODES;
0268     mle->new_master = O2NM_MAX_NODES;
0269     mle->inuse = 0;
0270 
0271     BUG_ON(mle->type != DLM_MLE_BLOCK &&
0272            mle->type != DLM_MLE_MASTER &&
0273            mle->type != DLM_MLE_MIGRATION);
0274 
0275     if (mle->type == DLM_MLE_MASTER) {
0276         BUG_ON(!res);
0277         mle->mleres = res;
0278         memcpy(mle->mname, res->lockname.name, res->lockname.len);
0279         mle->mnamelen = res->lockname.len;
0280         mle->mnamehash = res->lockname.hash;
0281     } else {
0282         BUG_ON(!name);
0283         mle->mleres = NULL;
0284         memcpy(mle->mname, name, namelen);
0285         mle->mnamelen = namelen;
0286         mle->mnamehash = dlm_lockid_hash(name, namelen);
0287     }
0288 
0289     atomic_inc(&dlm->mle_tot_count[mle->type]);
0290     atomic_inc(&dlm->mle_cur_count[mle->type]);
0291 
0292     /* copy off the node_map and register hb callbacks on our copy */
0293     memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
0294     memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
0295     clear_bit(dlm->node_num, mle->vote_map);
0296     clear_bit(dlm->node_num, mle->node_map);
0297 
0298     /* attach the mle to the domain node up/down events */
0299     __dlm_mle_attach_hb_events(dlm, mle);
0300 }
0301 
0302 void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
0303 {
0304     assert_spin_locked(&dlm->spinlock);
0305     assert_spin_locked(&dlm->master_lock);
0306 
0307     if (!hlist_unhashed(&mle->master_hash_node))
0308         hlist_del_init(&mle->master_hash_node);
0309 }
0310 
0311 void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
0312 {
0313     struct hlist_head *bucket;
0314 
0315     assert_spin_locked(&dlm->master_lock);
0316 
0317     bucket = dlm_master_hash(dlm, mle->mnamehash);
0318     hlist_add_head(&mle->master_hash_node, bucket);
0319 }
0320 
0321 /* returns 1 if found, 0 if not */
0322 static int dlm_find_mle(struct dlm_ctxt *dlm,
0323             struct dlm_master_list_entry **mle,
0324             char *name, unsigned int namelen)
0325 {
0326     struct dlm_master_list_entry *tmpmle;
0327     struct hlist_head *bucket;
0328     unsigned int hash;
0329 
0330     assert_spin_locked(&dlm->master_lock);
0331 
0332     hash = dlm_lockid_hash(name, namelen);
0333     bucket = dlm_master_hash(dlm, hash);
0334     hlist_for_each_entry(tmpmle, bucket, master_hash_node) {
0335         if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
0336             continue;
0337         dlm_get_mle(tmpmle);
0338         *mle = tmpmle;
0339         return 1;
0340     }
0341     return 0;
0342 }
0343 
0344 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
0345 {
0346     struct dlm_master_list_entry *mle;
0347 
0348     assert_spin_locked(&dlm->spinlock);
0349 
0350     list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
0351         if (node_up)
0352             dlm_mle_node_up(dlm, mle, NULL, idx);
0353         else
0354             dlm_mle_node_down(dlm, mle, NULL, idx);
0355     }
0356 }
0357 
0358 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
0359                   struct dlm_master_list_entry *mle,
0360                   struct o2nm_node *node, int idx)
0361 {
0362     spin_lock(&mle->spinlock);
0363 
0364     if (!test_bit(idx, mle->node_map))
0365         mlog(0, "node %u already removed from nodemap!\n", idx);
0366     else
0367         clear_bit(idx, mle->node_map);
0368 
0369     spin_unlock(&mle->spinlock);
0370 }
0371 
0372 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
0373                 struct dlm_master_list_entry *mle,
0374                 struct o2nm_node *node, int idx)
0375 {
0376     spin_lock(&mle->spinlock);
0377 
0378     if (test_bit(idx, mle->node_map))
0379         mlog(0, "node %u already in node map!\n", idx);
0380     else
0381         set_bit(idx, mle->node_map);
0382 
0383     spin_unlock(&mle->spinlock);
0384 }
0385 
0386 
0387 int dlm_init_mle_cache(void)
0388 {
0389     dlm_mle_cache = kmem_cache_create("o2dlm_mle",
0390                       sizeof(struct dlm_master_list_entry),
0391                       0, SLAB_HWCACHE_ALIGN,
0392                       NULL);
0393     if (dlm_mle_cache == NULL)
0394         return -ENOMEM;
0395     return 0;
0396 }
0397 
0398 void dlm_destroy_mle_cache(void)
0399 {
0400     kmem_cache_destroy(dlm_mle_cache);
0401 }
0402 
0403 static void dlm_mle_release(struct kref *kref)
0404 {
0405     struct dlm_master_list_entry *mle;
0406     struct dlm_ctxt *dlm;
0407 
0408     mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
0409     dlm = mle->dlm;
0410 
0411     assert_spin_locked(&dlm->spinlock);
0412     assert_spin_locked(&dlm->master_lock);
0413 
0414     mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
0415          mle->type);
0416 
0417     /* remove from list if not already */
0418     __dlm_unlink_mle(dlm, mle);
0419 
0420     /* detach the mle from the domain node up/down events */
0421     __dlm_mle_detach_hb_events(dlm, mle);
0422 
0423     atomic_dec(&dlm->mle_cur_count[mle->type]);
0424 
0425     /* NOTE: kfree under spinlock here.
0426      * if this is bad, we can move this to a freelist. */
0427     kmem_cache_free(dlm_mle_cache, mle);
0428 }
0429 
0430 
0431 /*
0432  * LOCK RESOURCE FUNCTIONS
0433  */
0434 
0435 int dlm_init_master_caches(void)
0436 {
0437     dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
0438                           sizeof(struct dlm_lock_resource),
0439                           0, SLAB_HWCACHE_ALIGN, NULL);
0440     if (!dlm_lockres_cache)
0441         goto bail;
0442 
0443     dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
0444                            DLM_LOCKID_NAME_MAX, 0,
0445                            SLAB_HWCACHE_ALIGN, NULL);
0446     if (!dlm_lockname_cache)
0447         goto bail;
0448 
0449     return 0;
0450 bail:
0451     dlm_destroy_master_caches();
0452     return -ENOMEM;
0453 }
0454 
0455 void dlm_destroy_master_caches(void)
0456 {
0457     kmem_cache_destroy(dlm_lockname_cache);
0458     dlm_lockname_cache = NULL;
0459 
0460     kmem_cache_destroy(dlm_lockres_cache);
0461     dlm_lockres_cache = NULL;
0462 }
0463 
0464 static void dlm_lockres_release(struct kref *kref)
0465 {
0466     struct dlm_lock_resource *res;
0467     struct dlm_ctxt *dlm;
0468 
0469     res = container_of(kref, struct dlm_lock_resource, refs);
0470     dlm = res->dlm;
0471 
0472     /* This should not happen -- all lockres' have a name
0473      * associated with them at init time. */
0474     BUG_ON(!res->lockname.name);
0475 
0476     mlog(0, "destroying lockres %.*s\n", res->lockname.len,
0477          res->lockname.name);
0478 
0479     atomic_dec(&dlm->res_cur_count);
0480 
0481     if (!hlist_unhashed(&res->hash_node) ||
0482         !list_empty(&res->granted) ||
0483         !list_empty(&res->converting) ||
0484         !list_empty(&res->blocked) ||
0485         !list_empty(&res->dirty) ||
0486         !list_empty(&res->recovering) ||
0487         !list_empty(&res->purge)) {
0488         mlog(ML_ERROR,
0489              "Going to BUG for resource %.*s."
0490              "  We're on a list! [%c%c%c%c%c%c%c]\n",
0491              res->lockname.len, res->lockname.name,
0492              !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
0493              !list_empty(&res->granted) ? 'G' : ' ',
0494              !list_empty(&res->converting) ? 'C' : ' ',
0495              !list_empty(&res->blocked) ? 'B' : ' ',
0496              !list_empty(&res->dirty) ? 'D' : ' ',
0497              !list_empty(&res->recovering) ? 'R' : ' ',
0498              !list_empty(&res->purge) ? 'P' : ' ');
0499 
0500         dlm_print_one_lock_resource(res);
0501     }
0502 
0503     /* By the time we're ready to blow this guy away, we shouldn't
0504      * be on any lists. */
0505     BUG_ON(!hlist_unhashed(&res->hash_node));
0506     BUG_ON(!list_empty(&res->granted));
0507     BUG_ON(!list_empty(&res->converting));
0508     BUG_ON(!list_empty(&res->blocked));
0509     BUG_ON(!list_empty(&res->dirty));
0510     BUG_ON(!list_empty(&res->recovering));
0511     BUG_ON(!list_empty(&res->purge));
0512 
0513     kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
0514 
0515     kmem_cache_free(dlm_lockres_cache, res);
0516 }
0517 
0518 void dlm_lockres_put(struct dlm_lock_resource *res)
0519 {
0520     kref_put(&res->refs, dlm_lockres_release);
0521 }
0522 
0523 static void dlm_init_lockres(struct dlm_ctxt *dlm,
0524                  struct dlm_lock_resource *res,
0525                  const char *name, unsigned int namelen)
0526 {
0527     char *qname;
0528 
0529     /* If we memset here, we lose our reference to the kmalloc'd
0530      * res->lockname.name, so be sure to init every field
0531      * correctly! */
0532 
0533     qname = (char *) res->lockname.name;
0534     memcpy(qname, name, namelen);
0535 
0536     res->lockname.len = namelen;
0537     res->lockname.hash = dlm_lockid_hash(name, namelen);
0538 
0539     init_waitqueue_head(&res->wq);
0540     spin_lock_init(&res->spinlock);
0541     INIT_HLIST_NODE(&res->hash_node);
0542     INIT_LIST_HEAD(&res->granted);
0543     INIT_LIST_HEAD(&res->converting);
0544     INIT_LIST_HEAD(&res->blocked);
0545     INIT_LIST_HEAD(&res->dirty);
0546     INIT_LIST_HEAD(&res->recovering);
0547     INIT_LIST_HEAD(&res->purge);
0548     INIT_LIST_HEAD(&res->tracking);
0549     atomic_set(&res->asts_reserved, 0);
0550     res->migration_pending = 0;
0551     res->inflight_locks = 0;
0552     res->inflight_assert_workers = 0;
0553 
0554     res->dlm = dlm;
0555 
0556     kref_init(&res->refs);
0557 
0558     atomic_inc(&dlm->res_tot_count);
0559     atomic_inc(&dlm->res_cur_count);
0560 
0561     /* just for consistency */
0562     spin_lock(&res->spinlock);
0563     dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
0564     spin_unlock(&res->spinlock);
0565 
0566     res->state = DLM_LOCK_RES_IN_PROGRESS;
0567 
0568     res->last_used = 0;
0569 
0570     spin_lock(&dlm->track_lock);
0571     list_add_tail(&res->tracking, &dlm->tracking_list);
0572     spin_unlock(&dlm->track_lock);
0573 
0574     memset(res->lvb, 0, DLM_LVB_LEN);
0575     memset(res->refmap, 0, sizeof(res->refmap));
0576 }
0577 
0578 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
0579                    const char *name,
0580                    unsigned int namelen)
0581 {
0582     struct dlm_lock_resource *res = NULL;
0583 
0584     res = kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
0585     if (!res)
0586         goto error;
0587 
0588     res->lockname.name = kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
0589     if (!res->lockname.name)
0590         goto error;
0591 
0592     dlm_init_lockres(dlm, res, name, namelen);
0593     return res;
0594 
0595 error:
0596     if (res)
0597         kmem_cache_free(dlm_lockres_cache, res);
0598     return NULL;
0599 }
0600 
0601 void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
0602                 struct dlm_lock_resource *res, int bit)
0603 {
0604     assert_spin_locked(&res->spinlock);
0605 
0606     mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
0607          res->lockname.name, bit, __builtin_return_address(0));
0608 
0609     set_bit(bit, res->refmap);
0610 }
0611 
0612 void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
0613                   struct dlm_lock_resource *res, int bit)
0614 {
0615     assert_spin_locked(&res->spinlock);
0616 
0617     mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
0618          res->lockname.name, bit, __builtin_return_address(0));
0619 
0620     clear_bit(bit, res->refmap);
0621 }
0622 
0623 static void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
0624                    struct dlm_lock_resource *res)
0625 {
0626     res->inflight_locks++;
0627 
0628     mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
0629          res->lockname.len, res->lockname.name, res->inflight_locks,
0630          __builtin_return_address(0));
0631 }
0632 
0633 void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
0634                    struct dlm_lock_resource *res)
0635 {
0636     assert_spin_locked(&res->spinlock);
0637     __dlm_lockres_grab_inflight_ref(dlm, res);
0638 }
0639 
0640 void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
0641                    struct dlm_lock_resource *res)
0642 {
0643     assert_spin_locked(&res->spinlock);
0644 
0645     BUG_ON(res->inflight_locks == 0);
0646 
0647     res->inflight_locks--;
0648 
0649     mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
0650          res->lockname.len, res->lockname.name, res->inflight_locks,
0651          __builtin_return_address(0));
0652 
0653     wake_up(&res->wq);
0654 }
0655 
0656 void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
0657         struct dlm_lock_resource *res)
0658 {
0659     assert_spin_locked(&res->spinlock);
0660     res->inflight_assert_workers++;
0661     mlog(0, "%s:%.*s: inflight assert worker++: now %u\n",
0662             dlm->name, res->lockname.len, res->lockname.name,
0663             res->inflight_assert_workers);
0664 }
0665 
0666 static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
0667         struct dlm_lock_resource *res)
0668 {
0669     assert_spin_locked(&res->spinlock);
0670     BUG_ON(res->inflight_assert_workers == 0);
0671     res->inflight_assert_workers--;
0672     mlog(0, "%s:%.*s: inflight assert worker--: now %u\n",
0673             dlm->name, res->lockname.len, res->lockname.name,
0674             res->inflight_assert_workers);
0675 }
0676 
0677 static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
0678         struct dlm_lock_resource *res)
0679 {
0680     spin_lock(&res->spinlock);
0681     __dlm_lockres_drop_inflight_worker(dlm, res);
0682     spin_unlock(&res->spinlock);
0683 }
0684 
0685 /*
0686  * lookup a lock resource by name.
0687  * may already exist in the hashtable.
0688  * lockid is null terminated
0689  *
0690  * if not, allocate enough for the lockres and for
0691  * the temporary structure used in doing the mastering.
0692  *
0693  * also, do a lookup in the dlm->master_list to see
0694  * if another node has begun mastering the same lock.
0695  * if so, there should be a block entry in there
0696  * for this name, and we should *not* attempt to master
0697  * the lock here.   need to wait around for that node
0698  * to assert_master (or die).
0699  *
0700  */
0701 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
0702                       const char *lockid,
0703                       int namelen,
0704                       int flags)
0705 {
0706     struct dlm_lock_resource *tmpres=NULL, *res=NULL;
0707     struct dlm_master_list_entry *mle = NULL;
0708     struct dlm_master_list_entry *alloc_mle = NULL;
0709     int blocked = 0;
0710     int ret, nodenum;
0711     struct dlm_node_iter iter;
0712     unsigned int hash;
0713     int tries = 0;
0714     int bit, wait_on_recovery = 0;
0715 
0716     BUG_ON(!lockid);
0717 
0718     hash = dlm_lockid_hash(lockid, namelen);
0719 
0720     mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
0721 
0722 lookup:
0723     spin_lock(&dlm->spinlock);
0724     tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
0725     if (tmpres) {
0726         spin_unlock(&dlm->spinlock);
0727         spin_lock(&tmpres->spinlock);
0728 
0729         /*
0730          * Right after dlm spinlock was released, dlm_thread could have
0731          * purged the lockres. Check if lockres got unhashed. If so
0732          * start over.
0733          */
0734         if (hlist_unhashed(&tmpres->hash_node)) {
0735             spin_unlock(&tmpres->spinlock);
0736             dlm_lockres_put(tmpres);
0737             tmpres = NULL;
0738             goto lookup;
0739         }
0740 
0741         /* Wait on the thread that is mastering the resource */
0742         if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
0743             __dlm_wait_on_lockres(tmpres);
0744             BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
0745             spin_unlock(&tmpres->spinlock);
0746             dlm_lockres_put(tmpres);
0747             tmpres = NULL;
0748             goto lookup;
0749         }
0750 
0751         /* Wait on the resource purge to complete before continuing */
0752         if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) {
0753             BUG_ON(tmpres->owner == dlm->node_num);
0754             __dlm_wait_on_lockres_flags(tmpres,
0755                             DLM_LOCK_RES_DROPPING_REF);
0756             spin_unlock(&tmpres->spinlock);
0757             dlm_lockres_put(tmpres);
0758             tmpres = NULL;
0759             goto lookup;
0760         }
0761 
0762         /* Grab inflight ref to pin the resource */
0763         dlm_lockres_grab_inflight_ref(dlm, tmpres);
0764 
0765         spin_unlock(&tmpres->spinlock);
0766         if (res) {
0767             spin_lock(&dlm->track_lock);
0768             if (!list_empty(&res->tracking))
0769                 list_del_init(&res->tracking);
0770             else
0771                 mlog(ML_ERROR, "Resource %.*s not "
0772                         "on the Tracking list\n",
0773                         res->lockname.len,
0774                         res->lockname.name);
0775             spin_unlock(&dlm->track_lock);
0776             dlm_lockres_put(res);
0777         }
0778         res = tmpres;
0779         goto leave;
0780     }
0781 
0782     if (!res) {
0783         spin_unlock(&dlm->spinlock);
0784         mlog(0, "allocating a new resource\n");
0785         /* nothing found and we need to allocate one. */
0786         alloc_mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
0787         if (!alloc_mle)
0788             goto leave;
0789         res = dlm_new_lockres(dlm, lockid, namelen);
0790         if (!res)
0791             goto leave;
0792         goto lookup;
0793     }
0794 
0795     mlog(0, "no lockres found, allocated our own: %p\n", res);
0796 
0797     if (flags & LKM_LOCAL) {
0798         /* caller knows it's safe to assume it's not mastered elsewhere
0799          * DONE!  return right away */
0800         spin_lock(&res->spinlock);
0801         dlm_change_lockres_owner(dlm, res, dlm->node_num);
0802         __dlm_insert_lockres(dlm, res);
0803         dlm_lockres_grab_inflight_ref(dlm, res);
0804         spin_unlock(&res->spinlock);
0805         spin_unlock(&dlm->spinlock);
0806         /* lockres still marked IN_PROGRESS */
0807         goto wake_waiters;
0808     }
0809 
0810     /* check master list to see if another node has started mastering it */
0811     spin_lock(&dlm->master_lock);
0812 
0813     /* if we found a block, wait for lock to be mastered by another node */
0814     blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
0815     if (blocked) {
0816         int mig;
0817         if (mle->type == DLM_MLE_MASTER) {
0818             mlog(ML_ERROR, "master entry for nonexistent lock!\n");
0819             BUG();
0820         }
0821         mig = (mle->type == DLM_MLE_MIGRATION);
0822         /* if there is a migration in progress, let the migration
0823          * finish before continuing.  we can wait for the absence
0824          * of the MIGRATION mle: either the migrate finished or
0825          * one of the nodes died and the mle was cleaned up.
0826          * if there is a BLOCK here, but it already has a master
0827          * set, we are too late.  the master does not have a ref
0828          * for us in the refmap.  detach the mle and drop it.
0829          * either way, go back to the top and start over. */
0830         if (mig || mle->master != O2NM_MAX_NODES) {
0831             BUG_ON(mig && mle->master == dlm->node_num);
0832             /* we arrived too late.  the master does not
0833              * have a ref for us. retry. */
0834             mlog(0, "%s:%.*s: late on %s\n",
0835                  dlm->name, namelen, lockid,
0836                  mig ?  "MIGRATION" : "BLOCK");
0837             spin_unlock(&dlm->master_lock);
0838             spin_unlock(&dlm->spinlock);
0839 
0840             /* master is known, detach */
0841             if (!mig)
0842                 dlm_mle_detach_hb_events(dlm, mle);
0843             dlm_put_mle(mle);
0844             mle = NULL;
0845             /* this is lame, but we can't wait on either
0846              * the mle or lockres waitqueue here */
0847             if (mig)
0848                 msleep(100);
0849             goto lookup;
0850         }
0851     } else {
0852         /* go ahead and try to master lock on this node */
0853         mle = alloc_mle;
0854         /* make sure this does not get freed below */
0855         alloc_mle = NULL;
0856         dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
0857         set_bit(dlm->node_num, mle->maybe_map);
0858         __dlm_insert_mle(dlm, mle);
0859 
0860         /* still holding the dlm spinlock, check the recovery map
0861          * to see if there are any nodes that still need to be
0862          * considered.  these will not appear in the mle nodemap
0863          * but they might own this lockres.  wait on them. */
0864         bit = find_first_bit(dlm->recovery_map, O2NM_MAX_NODES);
0865         if (bit < O2NM_MAX_NODES) {
0866             mlog(0, "%s: res %.*s, At least one node (%d) "
0867                  "to recover before lock mastery can begin\n",
0868                  dlm->name, namelen, (char *)lockid, bit);
0869             wait_on_recovery = 1;
0870         }
0871     }
0872 
0873     /* at this point there is either a DLM_MLE_BLOCK or a
0874      * DLM_MLE_MASTER on the master list, so it's safe to add the
0875      * lockres to the hashtable.  anyone who finds the lock will
0876      * still have to wait on the IN_PROGRESS. */
0877 
0878     /* finally add the lockres to its hash bucket */
0879     __dlm_insert_lockres(dlm, res);
0880 
0881     /* since this lockres is new it doesn't not require the spinlock */
0882     __dlm_lockres_grab_inflight_ref(dlm, res);
0883 
0884     /* get an extra ref on the mle in case this is a BLOCK
0885      * if so, the creator of the BLOCK may try to put the last
0886      * ref at this time in the assert master handler, so we
0887      * need an extra one to keep from a bad ptr deref. */
0888     dlm_get_mle_inuse(mle);
0889     spin_unlock(&dlm->master_lock);
0890     spin_unlock(&dlm->spinlock);
0891 
0892 redo_request:
0893     while (wait_on_recovery) {
0894         /* any cluster changes that occurred after dropping the
0895          * dlm spinlock would be detectable be a change on the mle,
0896          * so we only need to clear out the recovery map once. */
0897         if (dlm_is_recovery_lock(lockid, namelen)) {
0898             mlog(0, "%s: Recovery map is not empty, but must "
0899                  "master $RECOVERY lock now\n", dlm->name);
0900             if (!dlm_pre_master_reco_lockres(dlm, res))
0901                 wait_on_recovery = 0;
0902             else {
0903                 mlog(0, "%s: waiting 500ms for heartbeat state "
0904                     "change\n", dlm->name);
0905                 msleep(500);
0906             }
0907             continue;
0908         }
0909 
0910         dlm_kick_recovery_thread(dlm);
0911         msleep(1000);
0912         dlm_wait_for_recovery(dlm);
0913 
0914         spin_lock(&dlm->spinlock);
0915         bit = find_first_bit(dlm->recovery_map, O2NM_MAX_NODES);
0916         if (bit < O2NM_MAX_NODES) {
0917             mlog(0, "%s: res %.*s, At least one node (%d) "
0918                  "to recover before lock mastery can begin\n",
0919                  dlm->name, namelen, (char *)lockid, bit);
0920             wait_on_recovery = 1;
0921         } else
0922             wait_on_recovery = 0;
0923         spin_unlock(&dlm->spinlock);
0924 
0925         if (wait_on_recovery)
0926             dlm_wait_for_node_recovery(dlm, bit, 10000);
0927     }
0928 
0929     /* must wait for lock to be mastered elsewhere */
0930     if (blocked)
0931         goto wait;
0932 
0933     ret = -EINVAL;
0934     dlm_node_iter_init(mle->vote_map, &iter);
0935     while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
0936         ret = dlm_do_master_request(res, mle, nodenum);
0937         if (ret < 0)
0938             mlog_errno(ret);
0939         if (mle->master != O2NM_MAX_NODES) {
0940             /* found a master ! */
0941             if (mle->master <= nodenum)
0942                 break;
0943             /* if our master request has not reached the master
0944              * yet, keep going until it does.  this is how the
0945              * master will know that asserts are needed back to
0946              * the lower nodes. */
0947             mlog(0, "%s: res %.*s, Requests only up to %u but "
0948                  "master is %u, keep going\n", dlm->name, namelen,
0949                  lockid, nodenum, mle->master);
0950         }
0951     }
0952 
0953 wait:
0954     /* keep going until the response map includes all nodes */
0955     ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
0956     if (ret < 0) {
0957         wait_on_recovery = 1;
0958         mlog(0, "%s: res %.*s, Node map changed, redo the master "
0959              "request now, blocked=%d\n", dlm->name, res->lockname.len,
0960              res->lockname.name, blocked);
0961         if (++tries > 20) {
0962             mlog(ML_ERROR, "%s: res %.*s, Spinning on "
0963                  "dlm_wait_for_lock_mastery, blocked = %d\n",
0964                  dlm->name, res->lockname.len,
0965                  res->lockname.name, blocked);
0966             dlm_print_one_lock_resource(res);
0967             dlm_print_one_mle(mle);
0968             tries = 0;
0969         }
0970         goto redo_request;
0971     }
0972 
0973     mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
0974          res->lockname.name, res->owner);
0975     /* make sure we never continue without this */
0976     BUG_ON(res->owner == O2NM_MAX_NODES);
0977 
0978     /* master is known, detach if not already detached */
0979     dlm_mle_detach_hb_events(dlm, mle);
0980     dlm_put_mle(mle);
0981     /* put the extra ref */
0982     dlm_put_mle_inuse(mle);
0983 
0984 wake_waiters:
0985     spin_lock(&res->spinlock);
0986     res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
0987     spin_unlock(&res->spinlock);
0988     wake_up(&res->wq);
0989 
0990 leave:
0991     /* need to free the unused mle */
0992     if (alloc_mle)
0993         kmem_cache_free(dlm_mle_cache, alloc_mle);
0994 
0995     return res;
0996 }
0997 
0998 
0999 #define DLM_MASTERY_TIMEOUT_MS   5000
1000 
1001 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
1002                      struct dlm_lock_resource *res,
1003                      struct dlm_master_list_entry *mle,
1004                      int *blocked)
1005 {
1006     u8 m;
1007     int ret, bit;
1008     int map_changed, voting_done;
1009     int assert, sleep;
1010 
1011 recheck:
1012     ret = 0;
1013     assert = 0;
1014 
1015     /* check if another node has already become the owner */
1016     spin_lock(&res->spinlock);
1017     if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1018         mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
1019              res->lockname.len, res->lockname.name, res->owner);
1020         spin_unlock(&res->spinlock);
1021         /* this will cause the master to re-assert across
1022          * the whole cluster, freeing up mles */
1023         if (res->owner != dlm->node_num) {
1024             ret = dlm_do_master_request(res, mle, res->owner);
1025             if (ret < 0) {
1026                 /* give recovery a chance to run */
1027                 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
1028                 msleep(500);
1029                 goto recheck;
1030             }
1031         }
1032         ret = 0;
1033         goto leave;
1034     }
1035     spin_unlock(&res->spinlock);
1036 
1037     spin_lock(&mle->spinlock);
1038     m = mle->master;
1039     map_changed = (memcmp(mle->vote_map, mle->node_map,
1040                   sizeof(mle->vote_map)) != 0);
1041     voting_done = (memcmp(mle->vote_map, mle->response_map,
1042                  sizeof(mle->vote_map)) == 0);
1043 
1044     /* restart if we hit any errors */
1045     if (map_changed) {
1046         int b;
1047         mlog(0, "%s: %.*s: node map changed, restarting\n",
1048              dlm->name, res->lockname.len, res->lockname.name);
1049         ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1050         b = (mle->type == DLM_MLE_BLOCK);
1051         if ((*blocked && !b) || (!*blocked && b)) {
1052             mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
1053                  dlm->name, res->lockname.len, res->lockname.name,
1054                  *blocked, b);
1055             *blocked = b;
1056         }
1057         spin_unlock(&mle->spinlock);
1058         if (ret < 0) {
1059             mlog_errno(ret);
1060             goto leave;
1061         }
1062         mlog(0, "%s:%.*s: restart lock mastery succeeded, "
1063              "rechecking now\n", dlm->name, res->lockname.len,
1064              res->lockname.name);
1065         goto recheck;
1066     } else {
1067         if (!voting_done) {
1068             mlog(0, "map not changed and voting not done "
1069                  "for %s:%.*s\n", dlm->name, res->lockname.len,
1070                  res->lockname.name);
1071         }
1072     }
1073 
1074     if (m != O2NM_MAX_NODES) {
1075         /* another node has done an assert!
1076          * all done! */
1077         sleep = 0;
1078     } else {
1079         sleep = 1;
1080         /* have all nodes responded? */
1081         if (voting_done && !*blocked) {
1082             bit = find_first_bit(mle->maybe_map, O2NM_MAX_NODES);
1083             if (dlm->node_num <= bit) {
1084                 /* my node number is lowest.
1085                  * now tell other nodes that I am
1086                  * mastering this. */
1087                 mle->master = dlm->node_num;
1088                 /* ref was grabbed in get_lock_resource
1089                  * will be dropped in dlmlock_master */
1090                 assert = 1;
1091                 sleep = 0;
1092             }
1093             /* if voting is done, but we have not received
1094              * an assert master yet, we must sleep */
1095         }
1096     }
1097 
1098     spin_unlock(&mle->spinlock);
1099 
1100     /* sleep if we haven't finished voting yet */
1101     if (sleep) {
1102         unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
1103         atomic_set(&mle->woken, 0);
1104         (void)wait_event_timeout(mle->wq,
1105                      (atomic_read(&mle->woken) == 1),
1106                      timeo);
1107         if (res->owner == O2NM_MAX_NODES) {
1108             mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1109                  res->lockname.len, res->lockname.name);
1110             goto recheck;
1111         }
1112         mlog(0, "done waiting, master is %u\n", res->owner);
1113         ret = 0;
1114         goto leave;
1115     }
1116 
1117     ret = 0;   /* done */
1118     if (assert) {
1119         m = dlm->node_num;
1120         mlog(0, "about to master %.*s here, this=%u\n",
1121              res->lockname.len, res->lockname.name, m);
1122         ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1123         if (ret) {
1124             /* This is a failure in the network path,
1125              * not in the response to the assert_master
1126              * (any nonzero response is a BUG on this node).
1127              * Most likely a socket just got disconnected
1128              * due to node death. */
1129             mlog_errno(ret);
1130         }
1131         /* no longer need to restart lock mastery.
1132          * all living nodes have been contacted. */
1133         ret = 0;
1134     }
1135 
1136     /* set the lockres owner */
1137     spin_lock(&res->spinlock);
1138     /* mastery reference obtained either during
1139      * assert_master_handler or in get_lock_resource */
1140     dlm_change_lockres_owner(dlm, res, m);
1141     spin_unlock(&res->spinlock);
1142 
1143 leave:
1144     return ret;
1145 }
1146 
1147 struct dlm_bitmap_diff_iter
1148 {
1149     int curnode;
1150     unsigned long *orig_bm;
1151     unsigned long *cur_bm;
1152     unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1153 };
1154 
1155 enum dlm_node_state_change
1156 {
1157     NODE_DOWN = -1,
1158     NODE_NO_CHANGE = 0,
1159     NODE_UP
1160 };
1161 
1162 static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1163                       unsigned long *orig_bm,
1164                       unsigned long *cur_bm)
1165 {
1166     unsigned long p1, p2;
1167     int i;
1168 
1169     iter->curnode = -1;
1170     iter->orig_bm = orig_bm;
1171     iter->cur_bm = cur_bm;
1172 
1173     for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1174             p1 = *(iter->orig_bm + i);
1175             p2 = *(iter->cur_bm + i);
1176         iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1177     }
1178 }
1179 
1180 static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1181                      enum dlm_node_state_change *state)
1182 {
1183     int bit;
1184 
1185     if (iter->curnode >= O2NM_MAX_NODES)
1186         return -ENOENT;
1187 
1188     bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1189                 iter->curnode+1);
1190     if (bit >= O2NM_MAX_NODES) {
1191         iter->curnode = O2NM_MAX_NODES;
1192         return -ENOENT;
1193     }
1194 
1195     /* if it was there in the original then this node died */
1196     if (test_bit(bit, iter->orig_bm))
1197         *state = NODE_DOWN;
1198     else
1199         *state = NODE_UP;
1200 
1201     iter->curnode = bit;
1202     return bit;
1203 }
1204 
1205 
1206 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1207                     struct dlm_lock_resource *res,
1208                     struct dlm_master_list_entry *mle,
1209                     int blocked)
1210 {
1211     struct dlm_bitmap_diff_iter bdi;
1212     enum dlm_node_state_change sc;
1213     int node;
1214     int ret = 0;
1215 
1216     mlog(0, "something happened such that the "
1217          "master process may need to be restarted!\n");
1218 
1219     assert_spin_locked(&mle->spinlock);
1220 
1221     dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1222     node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1223     while (node >= 0) {
1224         if (sc == NODE_UP) {
1225             /* a node came up.  clear any old vote from
1226              * the response map and set it in the vote map
1227              * then restart the mastery. */
1228             mlog(ML_NOTICE, "node %d up while restarting\n", node);
1229 
1230             /* redo the master request, but only for the new node */
1231             mlog(0, "sending request to new node\n");
1232             clear_bit(node, mle->response_map);
1233             set_bit(node, mle->vote_map);
1234         } else {
1235             mlog(ML_ERROR, "node down! %d\n", node);
1236             if (blocked) {
1237                 int lowest = find_first_bit(mle->maybe_map,
1238                                O2NM_MAX_NODES);
1239 
1240                 /* act like it was never there */
1241                 clear_bit(node, mle->maybe_map);
1242 
1243                     if (node == lowest) {
1244                     mlog(0, "expected master %u died"
1245                         " while this node was blocked "
1246                         "waiting on it!\n", node);
1247                     lowest = find_next_bit(mle->maybe_map,
1248                                 O2NM_MAX_NODES,
1249                                 lowest+1);
1250                     if (lowest < O2NM_MAX_NODES) {
1251                         mlog(0, "%s:%.*s:still "
1252                              "blocked. waiting on %u "
1253                              "now\n", dlm->name,
1254                              res->lockname.len,
1255                              res->lockname.name,
1256                              lowest);
1257                     } else {
1258                         /* mle is an MLE_BLOCK, but
1259                          * there is now nothing left to
1260                          * block on.  we need to return
1261                          * all the way back out and try
1262                          * again with an MLE_MASTER.
1263                          * dlm_do_local_recovery_cleanup
1264                          * has already run, so the mle
1265                          * refcount is ok */
1266                         mlog(0, "%s:%.*s: no "
1267                              "longer blocking. try to "
1268                              "master this here\n",
1269                              dlm->name,
1270                              res->lockname.len,
1271                              res->lockname.name);
1272                         mle->type = DLM_MLE_MASTER;
1273                         mle->mleres = res;
1274                     }
1275                 }
1276             }
1277 
1278             /* now blank out everything, as if we had never
1279              * contacted anyone */
1280             memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
1281             memset(mle->response_map, 0, sizeof(mle->response_map));
1282             /* reset the vote_map to the current node_map */
1283             memcpy(mle->vote_map, mle->node_map,
1284                    sizeof(mle->node_map));
1285             /* put myself into the maybe map */
1286             if (mle->type != DLM_MLE_BLOCK)
1287                 set_bit(dlm->node_num, mle->maybe_map);
1288         }
1289         ret = -EAGAIN;
1290         node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1291     }
1292     return ret;
1293 }
1294 
1295 
1296 /*
1297  * DLM_MASTER_REQUEST_MSG
1298  *
1299  * returns: 0 on success,
1300  *          -errno on a network error
1301  *
1302  * on error, the caller should assume the target node is "dead"
1303  *
1304  */
1305 
1306 static int dlm_do_master_request(struct dlm_lock_resource *res,
1307                  struct dlm_master_list_entry *mle, int to)
1308 {
1309     struct dlm_ctxt *dlm = mle->dlm;
1310     struct dlm_master_request request;
1311     int ret, response=0, resend;
1312 
1313     memset(&request, 0, sizeof(request));
1314     request.node_idx = dlm->node_num;
1315 
1316     BUG_ON(mle->type == DLM_MLE_MIGRATION);
1317 
1318     request.namelen = (u8)mle->mnamelen;
1319     memcpy(request.name, mle->mname, request.namelen);
1320 
1321 again:
1322     ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1323                  sizeof(request), to, &response);
1324     if (ret < 0)  {
1325         if (ret == -ESRCH) {
1326             /* should never happen */
1327             mlog(ML_ERROR, "TCP stack not ready!\n");
1328             BUG();
1329         } else if (ret == -EINVAL) {
1330             mlog(ML_ERROR, "bad args passed to o2net!\n");
1331             BUG();
1332         } else if (ret == -ENOMEM) {
1333             mlog(ML_ERROR, "out of memory while trying to send "
1334                  "network message!  retrying\n");
1335             /* this is totally crude */
1336             msleep(50);
1337             goto again;
1338         } else if (!dlm_is_host_down(ret)) {
1339             /* not a network error. bad. */
1340             mlog_errno(ret);
1341             mlog(ML_ERROR, "unhandled error!");
1342             BUG();
1343         }
1344         /* all other errors should be network errors,
1345          * and likely indicate node death */
1346         mlog(ML_ERROR, "link to %d went down!\n", to);
1347         goto out;
1348     }
1349 
1350     ret = 0;
1351     resend = 0;
1352     spin_lock(&mle->spinlock);
1353     switch (response) {
1354         case DLM_MASTER_RESP_YES:
1355             set_bit(to, mle->response_map);
1356             mlog(0, "node %u is the master, response=YES\n", to);
1357             mlog(0, "%s:%.*s: master node %u now knows I have a "
1358                  "reference\n", dlm->name, res->lockname.len,
1359                  res->lockname.name, to);
1360             mle->master = to;
1361             break;
1362         case DLM_MASTER_RESP_NO:
1363             mlog(0, "node %u not master, response=NO\n", to);
1364             set_bit(to, mle->response_map);
1365             break;
1366         case DLM_MASTER_RESP_MAYBE:
1367             mlog(0, "node %u not master, response=MAYBE\n", to);
1368             set_bit(to, mle->response_map);
1369             set_bit(to, mle->maybe_map);
1370             break;
1371         case DLM_MASTER_RESP_ERROR:
1372             mlog(0, "node %u hit an error, resending\n", to);
1373             resend = 1;
1374             response = 0;
1375             break;
1376         default:
1377             mlog(ML_ERROR, "bad response! %u\n", response);
1378             BUG();
1379     }
1380     spin_unlock(&mle->spinlock);
1381     if (resend) {
1382         /* this is also totally crude */
1383         msleep(50);
1384         goto again;
1385     }
1386 
1387 out:
1388     return ret;
1389 }
1390 
1391 /*
1392  * locks that can be taken here:
1393  * dlm->spinlock
1394  * res->spinlock
1395  * mle->spinlock
1396  * dlm->master_list
1397  *
1398  * if possible, TRIM THIS DOWN!!!
1399  */
1400 int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
1401                    void **ret_data)
1402 {
1403     u8 response = DLM_MASTER_RESP_MAYBE;
1404     struct dlm_ctxt *dlm = data;
1405     struct dlm_lock_resource *res = NULL;
1406     struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1407     struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1408     char *name;
1409     unsigned int namelen, hash;
1410     int found, ret;
1411     int set_maybe;
1412     int dispatch_assert = 0;
1413     int dispatched = 0;
1414 
1415     if (!dlm_grab(dlm))
1416         return DLM_MASTER_RESP_NO;
1417 
1418     if (!dlm_domain_fully_joined(dlm)) {
1419         response = DLM_MASTER_RESP_NO;
1420         goto send_response;
1421     }
1422 
1423     name = request->name;
1424     namelen = request->namelen;
1425     hash = dlm_lockid_hash(name, namelen);
1426 
1427     if (namelen > DLM_LOCKID_NAME_MAX) {
1428         response = DLM_IVBUFLEN;
1429         goto send_response;
1430     }
1431 
1432 way_up_top:
1433     spin_lock(&dlm->spinlock);
1434     res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1435     if (res) {
1436         spin_unlock(&dlm->spinlock);
1437 
1438         /* take care of the easy cases up front */
1439         spin_lock(&res->spinlock);
1440 
1441         /*
1442          * Right after dlm spinlock was released, dlm_thread could have
1443          * purged the lockres. Check if lockres got unhashed. If so
1444          * start over.
1445          */
1446         if (hlist_unhashed(&res->hash_node)) {
1447             spin_unlock(&res->spinlock);
1448             dlm_lockres_put(res);
1449             goto way_up_top;
1450         }
1451 
1452         if (res->state & (DLM_LOCK_RES_RECOVERING|
1453                   DLM_LOCK_RES_MIGRATING)) {
1454             spin_unlock(&res->spinlock);
1455             mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1456                  "being recovered/migrated\n");
1457             response = DLM_MASTER_RESP_ERROR;
1458             if (mle)
1459                 kmem_cache_free(dlm_mle_cache, mle);
1460             goto send_response;
1461         }
1462 
1463         if (res->owner == dlm->node_num) {
1464             dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
1465             spin_unlock(&res->spinlock);
1466             response = DLM_MASTER_RESP_YES;
1467             if (mle)
1468                 kmem_cache_free(dlm_mle_cache, mle);
1469 
1470             /* this node is the owner.
1471              * there is some extra work that needs to
1472              * happen now.  the requesting node has
1473              * caused all nodes up to this one to
1474              * create mles.  this node now needs to
1475              * go back and clean those up. */
1476             dispatch_assert = 1;
1477             goto send_response;
1478         } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1479             spin_unlock(&res->spinlock);
1480             // mlog(0, "node %u is the master\n", res->owner);
1481             response = DLM_MASTER_RESP_NO;
1482             if (mle)
1483                 kmem_cache_free(dlm_mle_cache, mle);
1484             goto send_response;
1485         }
1486 
1487         /* ok, there is no owner.  either this node is
1488          * being blocked, or it is actively trying to
1489          * master this lock. */
1490         if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1491             mlog(ML_ERROR, "lock with no owner should be "
1492                  "in-progress!\n");
1493             BUG();
1494         }
1495 
1496         // mlog(0, "lockres is in progress...\n");
1497         spin_lock(&dlm->master_lock);
1498         found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1499         if (!found) {
1500             mlog(ML_ERROR, "no mle found for this lock!\n");
1501             BUG();
1502         }
1503         set_maybe = 1;
1504         spin_lock(&tmpmle->spinlock);
1505         if (tmpmle->type == DLM_MLE_BLOCK) {
1506             // mlog(0, "this node is waiting for "
1507             // "lockres to be mastered\n");
1508             response = DLM_MASTER_RESP_NO;
1509         } else if (tmpmle->type == DLM_MLE_MIGRATION) {
1510             mlog(0, "node %u is master, but trying to migrate to "
1511                  "node %u.\n", tmpmle->master, tmpmle->new_master);
1512             if (tmpmle->master == dlm->node_num) {
1513                 mlog(ML_ERROR, "no owner on lockres, but this "
1514                      "node is trying to migrate it to %u?!\n",
1515                      tmpmle->new_master);
1516                 BUG();
1517             } else {
1518                 /* the real master can respond on its own */
1519                 response = DLM_MASTER_RESP_NO;
1520             }
1521         } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1522             set_maybe = 0;
1523             if (tmpmle->master == dlm->node_num) {
1524                 response = DLM_MASTER_RESP_YES;
1525                 /* this node will be the owner.
1526                  * go back and clean the mles on any
1527                  * other nodes */
1528                 dispatch_assert = 1;
1529                 dlm_lockres_set_refmap_bit(dlm, res,
1530                                request->node_idx);
1531             } else
1532                 response = DLM_MASTER_RESP_NO;
1533         } else {
1534             // mlog(0, "this node is attempting to "
1535             // "master lockres\n");
1536             response = DLM_MASTER_RESP_MAYBE;
1537         }
1538         if (set_maybe)
1539             set_bit(request->node_idx, tmpmle->maybe_map);
1540         spin_unlock(&tmpmle->spinlock);
1541 
1542         spin_unlock(&dlm->master_lock);
1543         spin_unlock(&res->spinlock);
1544 
1545         /* keep the mle attached to heartbeat events */
1546         dlm_put_mle(tmpmle);
1547         if (mle)
1548             kmem_cache_free(dlm_mle_cache, mle);
1549         goto send_response;
1550     }
1551 
1552     /*
1553      * lockres doesn't exist on this node
1554      * if there is an MLE_BLOCK, return NO
1555      * if there is an MLE_MASTER, return MAYBE
1556      * otherwise, add an MLE_BLOCK, return NO
1557      */
1558     spin_lock(&dlm->master_lock);
1559     found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1560     if (!found) {
1561         /* this lockid has never been seen on this node yet */
1562         // mlog(0, "no mle found\n");
1563         if (!mle) {
1564             spin_unlock(&dlm->master_lock);
1565             spin_unlock(&dlm->spinlock);
1566 
1567             mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1568             if (!mle) {
1569                 response = DLM_MASTER_RESP_ERROR;
1570                 mlog_errno(-ENOMEM);
1571                 goto send_response;
1572             }
1573             goto way_up_top;
1574         }
1575 
1576         // mlog(0, "this is second time thru, already allocated, "
1577         // "add the block.\n");
1578         dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1579         set_bit(request->node_idx, mle->maybe_map);
1580         __dlm_insert_mle(dlm, mle);
1581         response = DLM_MASTER_RESP_NO;
1582     } else {
1583         spin_lock(&tmpmle->spinlock);
1584         if (tmpmle->master == dlm->node_num) {
1585             mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1586             BUG();
1587         }
1588         if (tmpmle->type == DLM_MLE_BLOCK)
1589             response = DLM_MASTER_RESP_NO;
1590         else if (tmpmle->type == DLM_MLE_MIGRATION) {
1591             mlog(0, "migration mle was found (%u->%u)\n",
1592                  tmpmle->master, tmpmle->new_master);
1593             /* real master can respond on its own */
1594             response = DLM_MASTER_RESP_NO;
1595         } else
1596             response = DLM_MASTER_RESP_MAYBE;
1597         set_bit(request->node_idx, tmpmle->maybe_map);
1598         spin_unlock(&tmpmle->spinlock);
1599     }
1600     spin_unlock(&dlm->master_lock);
1601     spin_unlock(&dlm->spinlock);
1602 
1603     if (found) {
1604         /* keep the mle attached to heartbeat events */
1605         dlm_put_mle(tmpmle);
1606     }
1607 send_response:
1608     /*
1609      * __dlm_lookup_lockres() grabbed a reference to this lockres.
1610      * The reference is released by dlm_assert_master_worker() under
1611      * the call to dlm_dispatch_assert_master().  If
1612      * dlm_assert_master_worker() isn't called, we drop it here.
1613      */
1614     if (dispatch_assert) {
1615         mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1616                  dlm->node_num, res->lockname.len, res->lockname.name);
1617         spin_lock(&res->spinlock);
1618         ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1619                          DLM_ASSERT_MASTER_MLE_CLEANUP);
1620         if (ret < 0) {
1621             mlog(ML_ERROR, "failed to dispatch assert master work\n");
1622             response = DLM_MASTER_RESP_ERROR;
1623             spin_unlock(&res->spinlock);
1624             dlm_lockres_put(res);
1625         } else {
1626             dispatched = 1;
1627             __dlm_lockres_grab_inflight_worker(dlm, res);
1628             spin_unlock(&res->spinlock);
1629         }
1630     } else {
1631         if (res)
1632             dlm_lockres_put(res);
1633     }
1634 
1635     if (!dispatched)
1636         dlm_put(dlm);
1637     return response;
1638 }
1639 
1640 /*
1641  * DLM_ASSERT_MASTER_MSG
1642  */
1643 
1644 
1645 /*
1646  * NOTE: this can be used for debugging
1647  * can periodically run all locks owned by this node
1648  * and re-assert across the cluster...
1649  */
1650 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1651                 struct dlm_lock_resource *res,
1652                 void *nodemap, u32 flags)
1653 {
1654     struct dlm_assert_master assert;
1655     int to, tmpret;
1656     struct dlm_node_iter iter;
1657     int ret = 0;
1658     int reassert;
1659     const char *lockname = res->lockname.name;
1660     unsigned int namelen = res->lockname.len;
1661 
1662     BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1663 
1664     spin_lock(&res->spinlock);
1665     res->state |= DLM_LOCK_RES_SETREF_INPROG;
1666     spin_unlock(&res->spinlock);
1667 
1668 again:
1669     reassert = 0;
1670 
1671     /* note that if this nodemap is empty, it returns 0 */
1672     dlm_node_iter_init(nodemap, &iter);
1673     while ((to = dlm_node_iter_next(&iter)) >= 0) {
1674         int r = 0;
1675         struct dlm_master_list_entry *mle = NULL;
1676 
1677         mlog(0, "sending assert master to %d (%.*s)\n", to,
1678              namelen, lockname);
1679         memset(&assert, 0, sizeof(assert));
1680         assert.node_idx = dlm->node_num;
1681         assert.namelen = namelen;
1682         memcpy(assert.name, lockname, namelen);
1683         assert.flags = cpu_to_be32(flags);
1684 
1685         tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1686                         &assert, sizeof(assert), to, &r);
1687         if (tmpret < 0) {
1688             mlog(ML_ERROR, "Error %d when sending message %u (key "
1689                  "0x%x) to node %u\n", tmpret,
1690                  DLM_ASSERT_MASTER_MSG, dlm->key, to);
1691             if (!dlm_is_host_down(tmpret)) {
1692                 mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1693                 BUG();
1694             }
1695             /* a node died.  finish out the rest of the nodes. */
1696             mlog(0, "link to %d went down!\n", to);
1697             /* any nonzero status return will do */
1698             ret = tmpret;
1699             r = 0;
1700         } else if (r < 0) {
1701             /* ok, something horribly messed.  kill thyself. */
1702             mlog(ML_ERROR,"during assert master of %.*s to %u, "
1703                  "got %d.\n", namelen, lockname, to, r);
1704             spin_lock(&dlm->spinlock);
1705             spin_lock(&dlm->master_lock);
1706             if (dlm_find_mle(dlm, &mle, (char *)lockname,
1707                      namelen)) {
1708                 dlm_print_one_mle(mle);
1709                 __dlm_put_mle(mle);
1710             }
1711             spin_unlock(&dlm->master_lock);
1712             spin_unlock(&dlm->spinlock);
1713             BUG();
1714         }
1715 
1716         if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1717             !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1718                 mlog(ML_ERROR, "%.*s: very strange, "
1719                      "master MLE but no lockres on %u\n",
1720                      namelen, lockname, to);
1721         }
1722 
1723         if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1724             mlog(0, "%.*s: node %u create mles on other "
1725                  "nodes and requests a re-assert\n",
1726                  namelen, lockname, to);
1727             reassert = 1;
1728         }
1729         if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1730             mlog(0, "%.*s: node %u has a reference to this "
1731                  "lockres, set the bit in the refmap\n",
1732                  namelen, lockname, to);
1733             spin_lock(&res->spinlock);
1734             dlm_lockres_set_refmap_bit(dlm, res, to);
1735             spin_unlock(&res->spinlock);
1736         }
1737     }
1738 
1739     if (reassert)
1740         goto again;
1741 
1742     spin_lock(&res->spinlock);
1743     res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
1744     spin_unlock(&res->spinlock);
1745     wake_up(&res->wq);
1746 
1747     return ret;
1748 }
1749 
1750 /*
1751  * locks that can be taken here:
1752  * dlm->spinlock
1753  * res->spinlock
1754  * mle->spinlock
1755  * dlm->master_list
1756  *
1757  * if possible, TRIM THIS DOWN!!!
1758  */
1759 int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
1760                   void **ret_data)
1761 {
1762     struct dlm_ctxt *dlm = data;
1763     struct dlm_master_list_entry *mle = NULL;
1764     struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1765     struct dlm_lock_resource *res = NULL;
1766     char *name;
1767     unsigned int namelen, hash;
1768     u32 flags;
1769     int master_request = 0, have_lockres_ref = 0;
1770     int ret = 0;
1771 
1772     if (!dlm_grab(dlm))
1773         return 0;
1774 
1775     name = assert->name;
1776     namelen = assert->namelen;
1777     hash = dlm_lockid_hash(name, namelen);
1778     flags = be32_to_cpu(assert->flags);
1779 
1780     if (namelen > DLM_LOCKID_NAME_MAX) {
1781         mlog(ML_ERROR, "Invalid name length!");
1782         goto done;
1783     }
1784 
1785     spin_lock(&dlm->spinlock);
1786 
1787     if (flags)
1788         mlog(0, "assert_master with flags: %u\n", flags);
1789 
1790     /* find the MLE */
1791     spin_lock(&dlm->master_lock);
1792     if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1793         /* not an error, could be master just re-asserting */
1794         mlog(0, "just got an assert_master from %u, but no "
1795              "MLE for it! (%.*s)\n", assert->node_idx,
1796              namelen, name);
1797     } else {
1798         int bit = find_first_bit(mle->maybe_map, O2NM_MAX_NODES);
1799         if (bit >= O2NM_MAX_NODES) {
1800             /* not necessarily an error, though less likely.
1801              * could be master just re-asserting. */
1802             mlog(0, "no bits set in the maybe_map, but %u "
1803                  "is asserting! (%.*s)\n", assert->node_idx,
1804                  namelen, name);
1805         } else if (bit != assert->node_idx) {
1806             if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1807                 mlog(0, "master %u was found, %u should "
1808                      "back off\n", assert->node_idx, bit);
1809             } else {
1810                 /* with the fix for bug 569, a higher node
1811                  * number winning the mastery will respond
1812                  * YES to mastery requests, but this node
1813                  * had no way of knowing.  let it pass. */
1814                 mlog(0, "%u is the lowest node, "
1815                      "%u is asserting. (%.*s)  %u must "
1816                      "have begun after %u won.\n", bit,
1817                      assert->node_idx, namelen, name, bit,
1818                      assert->node_idx);
1819             }
1820         }
1821         if (mle->type == DLM_MLE_MIGRATION) {
1822             if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1823                 mlog(0, "%s:%.*s: got cleanup assert"
1824                      " from %u for migration\n",
1825                      dlm->name, namelen, name,
1826                      assert->node_idx);
1827             } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1828                 mlog(0, "%s:%.*s: got unrelated assert"
1829                      " from %u for migration, ignoring\n",
1830                      dlm->name, namelen, name,
1831                      assert->node_idx);
1832                 __dlm_put_mle(mle);
1833                 spin_unlock(&dlm->master_lock);
1834                 spin_unlock(&dlm->spinlock);
1835                 goto done;
1836             }
1837         }
1838     }
1839     spin_unlock(&dlm->master_lock);
1840 
1841     /* ok everything checks out with the MLE
1842      * now check to see if there is a lockres */
1843     res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1844     if (res) {
1845         spin_lock(&res->spinlock);
1846         if (res->state & DLM_LOCK_RES_RECOVERING)  {
1847             mlog(ML_ERROR, "%u asserting but %.*s is "
1848                  "RECOVERING!\n", assert->node_idx, namelen, name);
1849             goto kill;
1850         }
1851         if (!mle) {
1852             if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1853                 res->owner != assert->node_idx) {
1854                 mlog(ML_ERROR, "DIE! Mastery assert from %u, "
1855                      "but current owner is %u! (%.*s)\n",
1856                      assert->node_idx, res->owner, namelen,
1857                      name);
1858                 __dlm_print_one_lock_resource(res);
1859                 BUG();
1860             }
1861         } else if (mle->type != DLM_MLE_MIGRATION) {
1862             if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1863                 /* owner is just re-asserting */
1864                 if (res->owner == assert->node_idx) {
1865                     mlog(0, "owner %u re-asserting on "
1866                          "lock %.*s\n", assert->node_idx,
1867                          namelen, name);
1868                     goto ok;
1869                 }
1870                 mlog(ML_ERROR, "got assert_master from "
1871                      "node %u, but %u is the owner! "
1872                      "(%.*s)\n", assert->node_idx,
1873                      res->owner, namelen, name);
1874                 goto kill;
1875             }
1876             if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1877                 mlog(ML_ERROR, "got assert from %u, but lock "
1878                      "with no owner should be "
1879                      "in-progress! (%.*s)\n",
1880                      assert->node_idx,
1881                      namelen, name);
1882                 goto kill;
1883             }
1884         } else /* mle->type == DLM_MLE_MIGRATION */ {
1885             /* should only be getting an assert from new master */
1886             if (assert->node_idx != mle->new_master) {
1887                 mlog(ML_ERROR, "got assert from %u, but "
1888                      "new master is %u, and old master "
1889                      "was %u (%.*s)\n",
1890                      assert->node_idx, mle->new_master,
1891                      mle->master, namelen, name);
1892                 goto kill;
1893             }
1894 
1895         }
1896 ok:
1897         spin_unlock(&res->spinlock);
1898     }
1899 
1900     // mlog(0, "woo!  got an assert_master from node %u!\n",
1901     //       assert->node_idx);
1902     if (mle) {
1903         int extra_ref = 0;
1904         int nn = -1;
1905         int rr, err = 0;
1906 
1907         spin_lock(&mle->spinlock);
1908         if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1909             extra_ref = 1;
1910         else {
1911             /* MASTER mle: if any bits set in the response map
1912              * then the calling node needs to re-assert to clear
1913              * up nodes that this node contacted */
1914             while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1915                             nn+1)) < O2NM_MAX_NODES) {
1916                 if (nn != dlm->node_num && nn != assert->node_idx) {
1917                     master_request = 1;
1918                     break;
1919                 }
1920             }
1921         }
1922         mle->master = assert->node_idx;
1923         atomic_set(&mle->woken, 1);
1924         wake_up(&mle->wq);
1925         spin_unlock(&mle->spinlock);
1926 
1927         if (res) {
1928             int wake = 0;
1929             spin_lock(&res->spinlock);
1930             if (mle->type == DLM_MLE_MIGRATION) {
1931                 mlog(0, "finishing off migration of lockres %.*s, "
1932                         "from %u to %u\n",
1933                         res->lockname.len, res->lockname.name,
1934                         dlm->node_num, mle->new_master);
1935                 res->state &= ~DLM_LOCK_RES_MIGRATING;
1936                 wake = 1;
1937                 dlm_change_lockres_owner(dlm, res, mle->new_master);
1938                 BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1939             } else {
1940                 dlm_change_lockres_owner(dlm, res, mle->master);
1941             }
1942             spin_unlock(&res->spinlock);
1943             have_lockres_ref = 1;
1944             if (wake)
1945                 wake_up(&res->wq);
1946         }
1947 
1948         /* master is known, detach if not already detached.
1949          * ensures that only one assert_master call will happen
1950          * on this mle. */
1951         spin_lock(&dlm->master_lock);
1952 
1953         rr = kref_read(&mle->mle_refs);
1954         if (mle->inuse > 0) {
1955             if (extra_ref && rr < 3)
1956                 err = 1;
1957             else if (!extra_ref && rr < 2)
1958                 err = 1;
1959         } else {
1960             if (extra_ref && rr < 2)
1961                 err = 1;
1962             else if (!extra_ref && rr < 1)
1963                 err = 1;
1964         }
1965         if (err) {
1966             mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1967                  "that will mess up this node, refs=%d, extra=%d, "
1968                  "inuse=%d\n", dlm->name, namelen, name,
1969                  assert->node_idx, rr, extra_ref, mle->inuse);
1970             dlm_print_one_mle(mle);
1971         }
1972         __dlm_unlink_mle(dlm, mle);
1973         __dlm_mle_detach_hb_events(dlm, mle);
1974         __dlm_put_mle(mle);
1975         if (extra_ref) {
1976             /* the assert master message now balances the extra
1977              * ref given by the master / migration request message.
1978              * if this is the last put, it will be removed
1979              * from the list. */
1980             __dlm_put_mle(mle);
1981         }
1982         spin_unlock(&dlm->master_lock);
1983     } else if (res) {
1984         if (res->owner != assert->node_idx) {
1985             mlog(0, "assert_master from %u, but current "
1986                  "owner is %u (%.*s), no mle\n", assert->node_idx,
1987                  res->owner, namelen, name);
1988         }
1989     }
1990     spin_unlock(&dlm->spinlock);
1991 
1992 done:
1993     ret = 0;
1994     if (res) {
1995         spin_lock(&res->spinlock);
1996         res->state |= DLM_LOCK_RES_SETREF_INPROG;
1997         spin_unlock(&res->spinlock);
1998         *ret_data = (void *)res;
1999     }
2000     dlm_put(dlm);
2001     if (master_request) {
2002         mlog(0, "need to tell master to reassert\n");
2003         /* positive. negative would shoot down the node. */
2004         ret |= DLM_ASSERT_RESPONSE_REASSERT;
2005         if (!have_lockres_ref) {
2006             mlog(ML_ERROR, "strange, got assert from %u, MASTER "
2007                  "mle present here for %s:%.*s, but no lockres!\n",
2008                  assert->node_idx, dlm->name, namelen, name);
2009         }
2010     }
2011     if (have_lockres_ref) {
2012         /* let the master know we have a reference to the lockres */
2013         ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
2014         mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
2015              dlm->name, namelen, name, assert->node_idx);
2016     }
2017     return ret;
2018 
2019 kill:
2020     /* kill the caller! */
2021     mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
2022          "and killing the other node now!  This node is OK and can continue.\n");
2023     __dlm_print_one_lock_resource(res);
2024     spin_unlock(&res->spinlock);
2025     spin_lock(&dlm->master_lock);
2026     if (mle)
2027         __dlm_put_mle(mle);
2028     spin_unlock(&dlm->master_lock);
2029     spin_unlock(&dlm->spinlock);
2030     *ret_data = (void *)res;
2031     dlm_put(dlm);
2032     return -EINVAL;
2033 }
2034 
2035 void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
2036 {
2037     struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
2038 
2039     if (ret_data) {
2040         spin_lock(&res->spinlock);
2041         res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
2042         spin_unlock(&res->spinlock);
2043         wake_up(&res->wq);
2044         dlm_lockres_put(res);
2045     }
2046     return;
2047 }
2048 
2049 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
2050                    struct dlm_lock_resource *res,
2051                    int ignore_higher, u8 request_from, u32 flags)
2052 {
2053     struct dlm_work_item *item;
2054     item = kzalloc(sizeof(*item), GFP_ATOMIC);
2055     if (!item)
2056         return -ENOMEM;
2057 
2058 
2059     /* queue up work for dlm_assert_master_worker */
2060     dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
2061     item->u.am.lockres = res; /* already have a ref */
2062     /* can optionally ignore node numbers higher than this node */
2063     item->u.am.ignore_higher = ignore_higher;
2064     item->u.am.request_from = request_from;
2065     item->u.am.flags = flags;
2066 
2067     if (ignore_higher)
2068         mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
2069              res->lockname.name);
2070 
2071     spin_lock(&dlm->work_lock);
2072     list_add_tail(&item->list, &dlm->work_list);
2073     spin_unlock(&dlm->work_lock);
2074 
2075     queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2076     return 0;
2077 }
2078 
2079 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2080 {
2081     struct dlm_ctxt *dlm = data;
2082     int ret = 0;
2083     struct dlm_lock_resource *res;
2084     unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
2085     int ignore_higher;
2086     int bit;
2087     u8 request_from;
2088     u32 flags;
2089 
2090     dlm = item->dlm;
2091     res = item->u.am.lockres;
2092     ignore_higher = item->u.am.ignore_higher;
2093     request_from = item->u.am.request_from;
2094     flags = item->u.am.flags;
2095 
2096     spin_lock(&dlm->spinlock);
2097     memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
2098     spin_unlock(&dlm->spinlock);
2099 
2100     clear_bit(dlm->node_num, nodemap);
2101     if (ignore_higher) {
2102         /* if is this just to clear up mles for nodes below
2103          * this node, do not send the message to the original
2104          * caller or any node number higher than this */
2105         clear_bit(request_from, nodemap);
2106         bit = dlm->node_num;
2107         while (1) {
2108             bit = find_next_bit(nodemap, O2NM_MAX_NODES,
2109                         bit+1);
2110                 if (bit >= O2NM_MAX_NODES)
2111                 break;
2112             clear_bit(bit, nodemap);
2113         }
2114     }
2115 
2116     /*
2117      * If we're migrating this lock to someone else, we are no
2118      * longer allowed to assert out own mastery.  OTOH, we need to
2119      * prevent migration from starting while we're still asserting
2120      * our dominance.  The reserved ast delays migration.
2121      */
2122     spin_lock(&res->spinlock);
2123     if (res->state & DLM_LOCK_RES_MIGRATING) {
2124         mlog(0, "Someone asked us to assert mastery, but we're "
2125              "in the middle of migration.  Skipping assert, "
2126              "the new master will handle that.\n");
2127         spin_unlock(&res->spinlock);
2128         goto put;
2129     } else
2130         __dlm_lockres_reserve_ast(res);
2131     spin_unlock(&res->spinlock);
2132 
2133     /* this call now finishes out the nodemap
2134      * even if one or more nodes die */
2135     mlog(0, "worker about to master %.*s here, this=%u\n",
2136              res->lockname.len, res->lockname.name, dlm->node_num);
2137     ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2138     if (ret < 0) {
2139         /* no need to restart, we are done */
2140         if (!dlm_is_host_down(ret))
2141             mlog_errno(ret);
2142     }
2143 
2144     /* Ok, we've asserted ourselves.  Let's let migration start. */
2145     dlm_lockres_release_ast(dlm, res);
2146 
2147 put:
2148     dlm_lockres_drop_inflight_worker(dlm, res);
2149 
2150     dlm_lockres_put(res);
2151 
2152     mlog(0, "finished with dlm_assert_master_worker\n");
2153 }
2154 
2155 /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
2156  * We cannot wait for node recovery to complete to begin mastering this
2157  * lockres because this lockres is used to kick off recovery! ;-)
2158  * So, do a pre-check on all living nodes to see if any of those nodes
2159  * think that $RECOVERY is currently mastered by a dead node.  If so,
2160  * we wait a short time to allow that node to get notified by its own
2161  * heartbeat stack, then check again.  All $RECOVERY lock resources
2162  * mastered by dead nodes are purged when the heartbeat callback is
2163  * fired, so we can know for sure that it is safe to continue once
2164  * the node returns a live node or no node.  */
2165 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2166                        struct dlm_lock_resource *res)
2167 {
2168     struct dlm_node_iter iter;
2169     int nodenum;
2170     int ret = 0;
2171     u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
2172 
2173     spin_lock(&dlm->spinlock);
2174     dlm_node_iter_init(dlm->domain_map, &iter);
2175     spin_unlock(&dlm->spinlock);
2176 
2177     while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2178         /* do not send to self */
2179         if (nodenum == dlm->node_num)
2180             continue;
2181         ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2182         if (ret < 0) {
2183             mlog_errno(ret);
2184             if (!dlm_is_host_down(ret))
2185                 BUG();
2186             /* host is down, so answer for that node would be
2187              * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
2188             ret = 0;
2189         }
2190 
2191         if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2192             /* check to see if this master is in the recovery map */
2193             spin_lock(&dlm->spinlock);
2194             if (test_bit(master, dlm->recovery_map)) {
2195                 mlog(ML_NOTICE, "%s: node %u has not seen "
2196                      "node %u go down yet, and thinks the "
2197                      "dead node is mastering the recovery "
2198                      "lock.  must wait.\n", dlm->name,
2199                      nodenum, master);
2200                 ret = -EAGAIN;
2201             }
2202             spin_unlock(&dlm->spinlock);
2203             mlog(0, "%s: reco lock master is %u\n", dlm->name,
2204                  master);
2205             break;
2206         }
2207     }
2208     return ret;
2209 }
2210 
2211 /*
2212  * DLM_DEREF_LOCKRES_MSG
2213  */
2214 
2215 int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2216 {
2217     struct dlm_deref_lockres deref;
2218     int ret = 0, r;
2219     const char *lockname;
2220     unsigned int namelen;
2221 
2222     lockname = res->lockname.name;
2223     namelen = res->lockname.len;
2224     BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2225 
2226     memset(&deref, 0, sizeof(deref));
2227     deref.node_idx = dlm->node_num;
2228     deref.namelen = namelen;
2229     memcpy(deref.name, lockname, namelen);
2230 
2231     ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2232                  &deref, sizeof(deref), res->owner, &r);
2233     if (ret < 0)
2234         mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n",
2235              dlm->name, namelen, lockname, ret, res->owner);
2236     else if (r < 0) {
2237         /* BAD.  other node says I did not have a ref. */
2238         mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
2239              dlm->name, namelen, lockname, res->owner, r);
2240         dlm_print_one_lock_resource(res);
2241         if (r == -ENOMEM)
2242             BUG();
2243     } else
2244         ret = r;
2245 
2246     return ret;
2247 }
2248 
2249 int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2250                   void **ret_data)
2251 {
2252     struct dlm_ctxt *dlm = data;
2253     struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2254     struct dlm_lock_resource *res = NULL;
2255     char *name;
2256     unsigned int namelen;
2257     int ret = -EINVAL;
2258     u8 node;
2259     unsigned int hash;
2260     struct dlm_work_item *item;
2261     int cleared = 0;
2262     int dispatch = 0;
2263 
2264     if (!dlm_grab(dlm))
2265         return 0;
2266 
2267     name = deref->name;
2268     namelen = deref->namelen;
2269     node = deref->node_idx;
2270 
2271     if (namelen > DLM_LOCKID_NAME_MAX) {
2272         mlog(ML_ERROR, "Invalid name length!");
2273         goto done;
2274     }
2275     if (deref->node_idx >= O2NM_MAX_NODES) {
2276         mlog(ML_ERROR, "Invalid node number: %u\n", node);
2277         goto done;
2278     }
2279 
2280     hash = dlm_lockid_hash(name, namelen);
2281 
2282     spin_lock(&dlm->spinlock);
2283     res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2284     if (!res) {
2285         spin_unlock(&dlm->spinlock);
2286         mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2287              dlm->name, namelen, name);
2288         goto done;
2289     }
2290     spin_unlock(&dlm->spinlock);
2291 
2292     spin_lock(&res->spinlock);
2293     if (res->state & DLM_LOCK_RES_SETREF_INPROG)
2294         dispatch = 1;
2295     else {
2296         BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2297         if (test_bit(node, res->refmap)) {
2298             dlm_lockres_clear_refmap_bit(dlm, res, node);
2299             cleared = 1;
2300         }
2301     }
2302     spin_unlock(&res->spinlock);
2303 
2304     if (!dispatch) {
2305         if (cleared)
2306             dlm_lockres_calc_usage(dlm, res);
2307         else {
2308             mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2309                 "but it is already dropped!\n", dlm->name,
2310                 res->lockname.len, res->lockname.name, node);
2311             dlm_print_one_lock_resource(res);
2312         }
2313         ret = DLM_DEREF_RESPONSE_DONE;
2314         goto done;
2315     }
2316 
2317     item = kzalloc(sizeof(*item), GFP_NOFS);
2318     if (!item) {
2319         ret = -ENOMEM;
2320         mlog_errno(ret);
2321         goto done;
2322     }
2323 
2324     dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2325     item->u.dl.deref_res = res;
2326     item->u.dl.deref_node = node;
2327 
2328     spin_lock(&dlm->work_lock);
2329     list_add_tail(&item->list, &dlm->work_list);
2330     spin_unlock(&dlm->work_lock);
2331 
2332     queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2333     return DLM_DEREF_RESPONSE_INPROG;
2334 
2335 done:
2336     if (res)
2337         dlm_lockres_put(res);
2338     dlm_put(dlm);
2339 
2340     return ret;
2341 }
2342 
2343 int dlm_deref_lockres_done_handler(struct o2net_msg *msg, u32 len, void *data,
2344                   void **ret_data)
2345 {
2346     struct dlm_ctxt *dlm = data;
2347     struct dlm_deref_lockres_done *deref
2348             = (struct dlm_deref_lockres_done *)msg->buf;
2349     struct dlm_lock_resource *res = NULL;
2350     char *name;
2351     unsigned int namelen;
2352     int ret = -EINVAL;
2353     u8 node;
2354     unsigned int hash;
2355 
2356     if (!dlm_grab(dlm))
2357         return 0;
2358 
2359     name = deref->name;
2360     namelen = deref->namelen;
2361     node = deref->node_idx;
2362 
2363     if (namelen > DLM_LOCKID_NAME_MAX) {
2364         mlog(ML_ERROR, "Invalid name length!");
2365         goto done;
2366     }
2367     if (deref->node_idx >= O2NM_MAX_NODES) {
2368         mlog(ML_ERROR, "Invalid node number: %u\n", node);
2369         goto done;
2370     }
2371 
2372     hash = dlm_lockid_hash(name, namelen);
2373 
2374     spin_lock(&dlm->spinlock);
2375     res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2376     if (!res) {
2377         spin_unlock(&dlm->spinlock);
2378         mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2379              dlm->name, namelen, name);
2380         goto done;
2381     }
2382 
2383     spin_lock(&res->spinlock);
2384     if (!(res->state & DLM_LOCK_RES_DROPPING_REF)) {
2385         spin_unlock(&res->spinlock);
2386         spin_unlock(&dlm->spinlock);
2387         mlog(ML_NOTICE, "%s:%.*s: node %u sends deref done "
2388             "but it is already derefed!\n", dlm->name,
2389             res->lockname.len, res->lockname.name, node);
2390         ret = 0;
2391         goto done;
2392     }
2393 
2394     __dlm_do_purge_lockres(dlm, res);
2395     spin_unlock(&res->spinlock);
2396     wake_up(&res->wq);
2397 
2398     spin_unlock(&dlm->spinlock);
2399 
2400     ret = 0;
2401 done:
2402     if (res)
2403         dlm_lockres_put(res);
2404     dlm_put(dlm);
2405     return ret;
2406 }
2407 
2408 static void dlm_drop_lockres_ref_done(struct dlm_ctxt *dlm,
2409         struct dlm_lock_resource *res, u8 node)
2410 {
2411     struct dlm_deref_lockres_done deref;
2412     int ret = 0, r;
2413     const char *lockname;
2414     unsigned int namelen;
2415 
2416     lockname = res->lockname.name;
2417     namelen = res->lockname.len;
2418     BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2419 
2420     memset(&deref, 0, sizeof(deref));
2421     deref.node_idx = dlm->node_num;
2422     deref.namelen = namelen;
2423     memcpy(deref.name, lockname, namelen);
2424 
2425     ret = o2net_send_message(DLM_DEREF_LOCKRES_DONE, dlm->key,
2426                  &deref, sizeof(deref), node, &r);
2427     if (ret < 0) {
2428         mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF DONE "
2429                 " to node %u\n", dlm->name, namelen,
2430                 lockname, ret, node);
2431     } else if (r < 0) {
2432         /* ignore the error */
2433         mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
2434              dlm->name, namelen, lockname, node, r);
2435         dlm_print_one_lock_resource(res);
2436     }
2437 }
2438 
2439 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2440 {
2441     struct dlm_ctxt *dlm;
2442     struct dlm_lock_resource *res;
2443     u8 node;
2444     u8 cleared = 0;
2445 
2446     dlm = item->dlm;
2447     res = item->u.dl.deref_res;
2448     node = item->u.dl.deref_node;
2449 
2450     spin_lock(&res->spinlock);
2451     BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2452     __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
2453     if (test_bit(node, res->refmap)) {
2454         dlm_lockres_clear_refmap_bit(dlm, res, node);
2455         cleared = 1;
2456     }
2457     spin_unlock(&res->spinlock);
2458 
2459     dlm_drop_lockres_ref_done(dlm, res, node);
2460 
2461     if (cleared) {
2462         mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
2463              dlm->name, res->lockname.len, res->lockname.name, node);
2464         dlm_lockres_calc_usage(dlm, res);
2465     } else {
2466         mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2467              "but it is already dropped!\n", dlm->name,
2468              res->lockname.len, res->lockname.name, node);
2469         dlm_print_one_lock_resource(res);
2470     }
2471 
2472     dlm_lockres_put(res);
2473 }
2474 
2475 /*
2476  * A migratable resource is one that is :
2477  * 1. locally mastered, and,
2478  * 2. zero local locks, and,
2479  * 3. one or more non-local locks, or, one or more references
2480  * Returns 1 if yes, 0 if not.
2481  */
2482 static int dlm_is_lockres_migratable(struct dlm_ctxt *dlm,
2483                       struct dlm_lock_resource *res)
2484 {
2485     enum dlm_lockres_list idx;
2486     int nonlocal = 0, node_ref;
2487     struct list_head *queue;
2488     struct dlm_lock *lock;
2489     u64 cookie;
2490 
2491     assert_spin_locked(&res->spinlock);
2492 
2493     /* delay migration when the lockres is in MIGRATING state */
2494     if (res->state & DLM_LOCK_RES_MIGRATING)
2495         return 0;
2496 
2497     /* delay migration when the lockres is in RECOCERING state */
2498     if (res->state & (DLM_LOCK_RES_RECOVERING|
2499             DLM_LOCK_RES_RECOVERY_WAITING))
2500         return 0;
2501 
2502     if (res->owner != dlm->node_num)
2503         return 0;
2504 
2505         for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2506         queue = dlm_list_idx_to_ptr(res, idx);
2507         list_for_each_entry(lock, queue, list) {
2508             if (lock->ml.node != dlm->node_num) {
2509                 nonlocal++;
2510                 continue;
2511             }
2512             cookie = be64_to_cpu(lock->ml.cookie);
2513             mlog(0, "%s: Not migratable res %.*s, lock %u:%llu on "
2514                  "%s list\n", dlm->name, res->lockname.len,
2515                  res->lockname.name,
2516                  dlm_get_lock_cookie_node(cookie),
2517                  dlm_get_lock_cookie_seq(cookie),
2518                  dlm_list_in_text(idx));
2519             return 0;
2520         }
2521     }
2522 
2523     if (!nonlocal) {
2524         node_ref = find_first_bit(res->refmap, O2NM_MAX_NODES);
2525         if (node_ref >= O2NM_MAX_NODES)
2526             return 0;
2527     }
2528 
2529     mlog(0, "%s: res %.*s, Migratable\n", dlm->name, res->lockname.len,
2530          res->lockname.name);
2531 
2532     return 1;
2533 }
2534 
2535 /*
2536  * DLM_MIGRATE_LOCKRES
2537  */
2538 
2539 
2540 static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2541                    struct dlm_lock_resource *res, u8 target)
2542 {
2543     struct dlm_master_list_entry *mle = NULL;
2544     struct dlm_master_list_entry *oldmle = NULL;
2545     struct dlm_migratable_lockres *mres = NULL;
2546     int ret = 0;
2547     const char *name;
2548     unsigned int namelen;
2549     int mle_added = 0;
2550     int wake = 0;
2551 
2552     if (!dlm_grab(dlm))
2553         return -EINVAL;
2554 
2555     name = res->lockname.name;
2556     namelen = res->lockname.len;
2557 
2558     mlog(0, "%s: Migrating %.*s to node %u\n", dlm->name, namelen, name,
2559          target);
2560 
2561     /* preallocate up front. if this fails, abort */
2562     ret = -ENOMEM;
2563     mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2564     if (!mres) {
2565         mlog_errno(ret);
2566         goto leave;
2567     }
2568 
2569     mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
2570     if (!mle) {
2571         mlog_errno(ret);
2572         goto leave;
2573     }
2574     ret = 0;
2575 
2576     /*
2577      * clear any existing master requests and
2578      * add the migration mle to the list
2579      */
2580     spin_lock(&dlm->spinlock);
2581     spin_lock(&dlm->master_lock);
2582     ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2583                     namelen, target, dlm->node_num);
2584     /* get an extra reference on the mle.
2585      * otherwise the assert_master from the new
2586      * master will destroy this.
2587      */
2588     if (ret != -EEXIST)
2589         dlm_get_mle_inuse(mle);
2590 
2591     spin_unlock(&dlm->master_lock);
2592     spin_unlock(&dlm->spinlock);
2593 
2594     if (ret == -EEXIST) {
2595         mlog(0, "another process is already migrating it\n");
2596         goto fail;
2597     }
2598     mle_added = 1;
2599 
2600     /*
2601      * set the MIGRATING flag and flush asts
2602      * if we fail after this we need to re-dirty the lockres
2603      */
2604     if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2605         mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2606              "the target went down.\n", res->lockname.len,
2607              res->lockname.name, target);
2608         spin_lock(&res->spinlock);
2609         res->state &= ~DLM_LOCK_RES_MIGRATING;
2610         wake = 1;
2611         spin_unlock(&res->spinlock);
2612         ret = -EINVAL;
2613     }
2614 
2615 fail:
2616     if (ret != -EEXIST && oldmle) {
2617         /* master is known, detach if not already detached */
2618         dlm_mle_detach_hb_events(dlm, oldmle);
2619         dlm_put_mle(oldmle);
2620     }
2621 
2622     if (ret < 0) {
2623         if (mle_added) {
2624             dlm_mle_detach_hb_events(dlm, mle);
2625             dlm_put_mle(mle);
2626             dlm_put_mle_inuse(mle);
2627         } else if (mle) {
2628             kmem_cache_free(dlm_mle_cache, mle);
2629             mle = NULL;
2630         }
2631         goto leave;
2632     }
2633 
2634     /*
2635      * at this point, we have a migration target, an mle
2636      * in the master list, and the MIGRATING flag set on
2637      * the lockres
2638      */
2639 
2640     /* now that remote nodes are spinning on the MIGRATING flag,
2641      * ensure that all assert_master work is flushed. */
2642     flush_workqueue(dlm->dlm_worker);
2643 
2644     /* notify new node and send all lock state */
2645     /* call send_one_lockres with migration flag.
2646      * this serves as notice to the target node that a
2647      * migration is starting. */
2648     ret = dlm_send_one_lockres(dlm, res, mres, target,
2649                    DLM_MRES_MIGRATION);
2650 
2651     if (ret < 0) {
2652         mlog(0, "migration to node %u failed with %d\n",
2653              target, ret);
2654         /* migration failed, detach and clean up mle */
2655         dlm_mle_detach_hb_events(dlm, mle);
2656         dlm_put_mle(mle);
2657         dlm_put_mle_inuse(mle);
2658         spin_lock(&res->spinlock);
2659         res->state &= ~DLM_LOCK_RES_MIGRATING;
2660         wake = 1;
2661         spin_unlock(&res->spinlock);
2662         if (dlm_is_host_down(ret))
2663             dlm_wait_for_node_death(dlm, target,
2664                         DLM_NODE_DEATH_WAIT_MAX);
2665         goto leave;
2666     }
2667 
2668     /* at this point, the target sends a message to all nodes,
2669      * (using dlm_do_migrate_request).  this node is skipped since
2670      * we had to put an mle in the list to begin the process.  this
2671      * node now waits for target to do an assert master.  this node
2672      * will be the last one notified, ensuring that the migration
2673      * is complete everywhere.  if the target dies while this is
2674      * going on, some nodes could potentially see the target as the
2675      * master, so it is important that my recovery finds the migration
2676      * mle and sets the master to UNKNOWN. */
2677 
2678 
2679     /* wait for new node to assert master */
2680     while (1) {
2681         ret = wait_event_interruptible_timeout(mle->wq,
2682                     (atomic_read(&mle->woken) == 1),
2683                     msecs_to_jiffies(5000));
2684 
2685         if (ret >= 0) {
2686                 if (atomic_read(&mle->woken) == 1 ||
2687                 res->owner == target)
2688                 break;
2689 
2690             mlog(0, "%s:%.*s: timed out during migration\n",
2691                  dlm->name, res->lockname.len, res->lockname.name);
2692             /* avoid hang during shutdown when migrating lockres
2693              * to a node which also goes down */
2694             if (dlm_is_node_dead(dlm, target)) {
2695                 mlog(0, "%s:%.*s: expected migration "
2696                      "target %u is no longer up, restarting\n",
2697                      dlm->name, res->lockname.len,
2698                      res->lockname.name, target);
2699                 ret = -EINVAL;
2700                 /* migration failed, detach and clean up mle */
2701                 dlm_mle_detach_hb_events(dlm, mle);
2702                 dlm_put_mle(mle);
2703                 dlm_put_mle_inuse(mle);
2704                 spin_lock(&res->spinlock);
2705                 res->state &= ~DLM_LOCK_RES_MIGRATING;
2706                 wake = 1;
2707                 spin_unlock(&res->spinlock);
2708                 goto leave;
2709             }
2710         } else
2711             mlog(0, "%s:%.*s: caught signal during migration\n",
2712                  dlm->name, res->lockname.len, res->lockname.name);
2713     }
2714 
2715     /* all done, set the owner, clear the flag */
2716     spin_lock(&res->spinlock);
2717     dlm_set_lockres_owner(dlm, res, target);
2718     res->state &= ~DLM_LOCK_RES_MIGRATING;
2719     dlm_remove_nonlocal_locks(dlm, res);
2720     spin_unlock(&res->spinlock);
2721     wake_up(&res->wq);
2722 
2723     /* master is known, detach if not already detached */
2724     dlm_mle_detach_hb_events(dlm, mle);
2725     dlm_put_mle_inuse(mle);
2726     ret = 0;
2727 
2728     dlm_lockres_calc_usage(dlm, res);
2729 
2730 leave:
2731     /* re-dirty the lockres if we failed */
2732     if (ret < 0)
2733         dlm_kick_thread(dlm, res);
2734 
2735     /* wake up waiters if the MIGRATING flag got set
2736      * but migration failed */
2737     if (wake)
2738         wake_up(&res->wq);
2739 
2740     if (mres)
2741         free_page((unsigned long)mres);
2742 
2743     dlm_put(dlm);
2744 
2745     mlog(0, "%s: Migrating %.*s to %u, returns %d\n", dlm->name, namelen,
2746          name, target, ret);
2747     return ret;
2748 }
2749 
2750 /*
2751  * Should be called only after beginning the domain leave process.
2752  * There should not be any remaining locks on nonlocal lock resources,
2753  * and there should be no local locks left on locally mastered resources.
2754  *
2755  * Called with the dlm spinlock held, may drop it to do migration, but
2756  * will re-acquire before exit.
2757  *
2758  * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped
2759  */
2760 int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2761     __must_hold(&dlm->spinlock)
2762 {
2763     int ret;
2764     int lock_dropped = 0;
2765     u8 target = O2NM_MAX_NODES;
2766 
2767     assert_spin_locked(&dlm->spinlock);
2768 
2769     spin_lock(&res->spinlock);
2770     if (dlm_is_lockres_migratable(dlm, res))
2771         target = dlm_pick_migration_target(dlm, res);
2772     spin_unlock(&res->spinlock);
2773 
2774     if (target == O2NM_MAX_NODES)
2775         goto leave;
2776 
2777     /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2778     spin_unlock(&dlm->spinlock);
2779     lock_dropped = 1;
2780     ret = dlm_migrate_lockres(dlm, res, target);
2781     if (ret)
2782         mlog(0, "%s: res %.*s, Migrate to node %u failed with %d\n",
2783              dlm->name, res->lockname.len, res->lockname.name,
2784              target, ret);
2785     spin_lock(&dlm->spinlock);
2786 leave:
2787     return lock_dropped;
2788 }
2789 
2790 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2791 {
2792     int ret;
2793     spin_lock(&dlm->ast_lock);
2794     spin_lock(&lock->spinlock);
2795     ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2796     spin_unlock(&lock->spinlock);
2797     spin_unlock(&dlm->ast_lock);
2798     return ret;
2799 }
2800 
2801 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2802                      struct dlm_lock_resource *res,
2803                      u8 mig_target)
2804 {
2805     int can_proceed;
2806     spin_lock(&res->spinlock);
2807     can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2808     spin_unlock(&res->spinlock);
2809 
2810     /* target has died, so make the caller break out of the
2811      * wait_event, but caller must recheck the domain_map */
2812     spin_lock(&dlm->spinlock);
2813     if (!test_bit(mig_target, dlm->domain_map))
2814         can_proceed = 1;
2815     spin_unlock(&dlm->spinlock);
2816     return can_proceed;
2817 }
2818 
2819 static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2820                 struct dlm_lock_resource *res)
2821 {
2822     int ret;
2823     spin_lock(&res->spinlock);
2824     ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2825     spin_unlock(&res->spinlock);
2826     return ret;
2827 }
2828 
2829 
2830 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2831                        struct dlm_lock_resource *res,
2832                        u8 target)
2833 {
2834     int ret = 0;
2835 
2836     mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2837            res->lockname.len, res->lockname.name, dlm->node_num,
2838            target);
2839     /* need to set MIGRATING flag on lockres.  this is done by
2840      * ensuring that all asts have been flushed for this lockres. */
2841     spin_lock(&res->spinlock);
2842     BUG_ON(res->migration_pending);
2843     res->migration_pending = 1;
2844     /* strategy is to reserve an extra ast then release
2845      * it below, letting the release do all of the work */
2846     __dlm_lockres_reserve_ast(res);
2847     spin_unlock(&res->spinlock);
2848 
2849     /* now flush all the pending asts */
2850     dlm_kick_thread(dlm, res);
2851     /* before waiting on DIRTY, block processes which may
2852      * try to dirty the lockres before MIGRATING is set */
2853     spin_lock(&res->spinlock);
2854     BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
2855     res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
2856     spin_unlock(&res->spinlock);
2857     /* now wait on any pending asts and the DIRTY state */
2858     wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2859     dlm_lockres_release_ast(dlm, res);
2860 
2861     mlog(0, "about to wait on migration_wq, dirty=%s\n",
2862            res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2863     /* if the extra ref we just put was the final one, this
2864      * will pass thru immediately.  otherwise, we need to wait
2865      * for the last ast to finish. */
2866 again:
2867     ret = wait_event_interruptible_timeout(dlm->migration_wq,
2868            dlm_migration_can_proceed(dlm, res, target),
2869            msecs_to_jiffies(1000));
2870     if (ret < 0) {
2871         mlog(0, "woken again: migrating? %s, dead? %s\n",
2872                res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2873                test_bit(target, dlm->domain_map) ? "no":"yes");
2874     } else {
2875         mlog(0, "all is well: migrating? %s, dead? %s\n",
2876                res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2877                test_bit(target, dlm->domain_map) ? "no":"yes");
2878     }
2879     if (!dlm_migration_can_proceed(dlm, res, target)) {
2880         mlog(0, "trying again...\n");
2881         goto again;
2882     }
2883 
2884     ret = 0;
2885     /* did the target go down or die? */
2886     spin_lock(&dlm->spinlock);
2887     if (!test_bit(target, dlm->domain_map)) {
2888         mlog(ML_ERROR, "aha. migration target %u just went down\n",
2889              target);
2890         ret = -EHOSTDOWN;
2891     }
2892     spin_unlock(&dlm->spinlock);
2893 
2894     /*
2895      * if target is down, we need to clear DLM_LOCK_RES_BLOCK_DIRTY for
2896      * another try; otherwise, we are sure the MIGRATING state is there,
2897      * drop the unneeded state which blocked threads trying to DIRTY
2898      */
2899     spin_lock(&res->spinlock);
2900     BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
2901     res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
2902     if (!ret)
2903         BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
2904     else
2905         res->migration_pending = 0;
2906     spin_unlock(&res->spinlock);
2907 
2908     /*
2909      * at this point:
2910      *
2911      *   o the DLM_LOCK_RES_MIGRATING flag is set if target not down
2912      *   o there are no pending asts on this lockres
2913      *   o all processes trying to reserve an ast on this
2914      *     lockres must wait for the MIGRATING flag to clear
2915      */
2916     return ret;
2917 }
2918 
2919 /* last step in the migration process.
2920  * original master calls this to free all of the dlm_lock
2921  * structures that used to be for other nodes. */
2922 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2923                       struct dlm_lock_resource *res)
2924 {
2925     struct list_head *queue = &res->granted;
2926     int i, bit;
2927     struct dlm_lock *lock, *next;
2928 
2929     assert_spin_locked(&res->spinlock);
2930 
2931     BUG_ON(res->owner == dlm->node_num);
2932 
2933     for (i=0; i<3; i++) {
2934         list_for_each_entry_safe(lock, next, queue, list) {
2935             if (lock->ml.node != dlm->node_num) {
2936                 mlog(0, "putting lock for node %u\n",
2937                      lock->ml.node);
2938                 /* be extra careful */
2939                 BUG_ON(!list_empty(&lock->ast_list));
2940                 BUG_ON(!list_empty(&lock->bast_list));
2941                 BUG_ON(lock->ast_pending);
2942                 BUG_ON(lock->bast_pending);
2943                 dlm_lockres_clear_refmap_bit(dlm, res,
2944                                  lock->ml.node);
2945                 list_del_init(&lock->list);
2946                 dlm_lock_put(lock);
2947                 /* In a normal unlock, we would have added a
2948                  * DLM_UNLOCK_FREE_LOCK action. Force it. */
2949                 dlm_lock_put(lock);
2950             }
2951         }
2952         queue++;
2953     }
2954     bit = 0;
2955     while (1) {
2956         bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2957         if (bit >= O2NM_MAX_NODES)
2958             break;
2959         /* do not clear the local node reference, if there is a
2960          * process holding this, let it drop the ref itself */
2961         if (bit != dlm->node_num) {
2962             mlog(0, "%s:%.*s: node %u had a ref to this "
2963                  "migrating lockres, clearing\n", dlm->name,
2964                  res->lockname.len, res->lockname.name, bit);
2965             dlm_lockres_clear_refmap_bit(dlm, res, bit);
2966         }
2967         bit++;
2968     }
2969 }
2970 
2971 /*
2972  * Pick a node to migrate the lock resource to. This function selects a
2973  * potential target based first on the locks and then on refmap. It skips
2974  * nodes that are in the process of exiting the domain.
2975  */
2976 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2977                     struct dlm_lock_resource *res)
2978 {
2979     enum dlm_lockres_list idx;
2980     struct list_head *queue;
2981     struct dlm_lock *lock;
2982     int noderef;
2983     u8 nodenum = O2NM_MAX_NODES;
2984 
2985     assert_spin_locked(&dlm->spinlock);
2986     assert_spin_locked(&res->spinlock);
2987 
2988     /* Go through all the locks */
2989     for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2990         queue = dlm_list_idx_to_ptr(res, idx);
2991         list_for_each_entry(lock, queue, list) {
2992             if (lock->ml.node == dlm->node_num)
2993                 continue;
2994             if (test_bit(lock->ml.node, dlm->exit_domain_map))
2995                 continue;
2996             nodenum = lock->ml.node;
2997             goto bail;
2998         }
2999     }
3000 
3001     /* Go thru the refmap */
3002     noderef = -1;
3003     while (1) {
3004         noderef = find_next_bit(res->refmap, O2NM_MAX_NODES,
3005                     noderef + 1);
3006         if (noderef >= O2NM_MAX_NODES)
3007             break;
3008         if (noderef == dlm->node_num)
3009             continue;
3010         if (test_bit(noderef, dlm->exit_domain_map))
3011             continue;
3012         nodenum = noderef;
3013         goto bail;
3014     }
3015 
3016 bail:
3017     return nodenum;
3018 }
3019 
3020 /* this is called by the new master once all lockres
3021  * data has been received */
3022 static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
3023                   struct dlm_lock_resource *res,
3024                   u8 master, u8 new_master,
3025                   struct dlm_node_iter *iter)
3026 {
3027     struct dlm_migrate_request migrate;
3028     int ret, skip, status = 0;
3029     int nodenum;
3030 
3031     memset(&migrate, 0, sizeof(migrate));
3032     migrate.namelen = res->lockname.len;
3033     memcpy(migrate.name, res->lockname.name, migrate.namelen);
3034     migrate.new_master = new_master;
3035     migrate.master = master;
3036 
3037     ret = 0;
3038 
3039     /* send message to all nodes, except the master and myself */
3040     while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
3041         if (nodenum == master ||
3042             nodenum == new_master)
3043             continue;
3044 
3045         /* We could race exit domain. If exited, skip. */
3046         spin_lock(&dlm->spinlock);
3047         skip = (!test_bit(nodenum, dlm->domain_map));
3048         spin_unlock(&dlm->spinlock);
3049         if (skip) {
3050             clear_bit(nodenum, iter->node_map);
3051             continue;
3052         }
3053 
3054         ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
3055                      &migrate, sizeof(migrate), nodenum,
3056                      &status);
3057         if (ret < 0) {
3058             mlog(ML_ERROR, "%s: res %.*s, Error %d send "
3059                  "MIGRATE_REQUEST to node %u\n", dlm->name,
3060                  migrate.namelen, migrate.name, ret, nodenum);
3061             if (!dlm_is_host_down(ret)) {
3062                 mlog(ML_ERROR, "unhandled error=%d!\n", ret);
3063                 BUG();
3064             }
3065             clear_bit(nodenum, iter->node_map);
3066             ret = 0;
3067         } else if (status < 0) {
3068             mlog(0, "migrate request (node %u) returned %d!\n",
3069                  nodenum, status);
3070             ret = status;
3071         } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
3072             /* during the migration request we short-circuited
3073              * the mastery of the lockres.  make sure we have
3074              * a mastery ref for nodenum */
3075             mlog(0, "%s:%.*s: need ref for node %u\n",
3076                  dlm->name, res->lockname.len, res->lockname.name,
3077                  nodenum);
3078             spin_lock(&res->spinlock);
3079             dlm_lockres_set_refmap_bit(dlm, res, nodenum);
3080             spin_unlock(&res->spinlock);
3081         }
3082     }
3083 
3084     if (ret < 0)
3085         mlog_errno(ret);
3086 
3087     mlog(0, "returning ret=%d\n", ret);
3088     return ret;
3089 }
3090 
3091 
3092 /* if there is an existing mle for this lockres, we now know who the master is.
3093  * (the one who sent us *this* message) we can clear it up right away.
3094  * since the process that put the mle on the list still has a reference to it,
3095  * we can unhash it now, set the master and wake the process.  as a result,
3096  * we will have no mle in the list to start with.  now we can add an mle for
3097  * the migration and this should be the only one found for those scanning the
3098  * list.  */
3099 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3100                 void **ret_data)
3101 {
3102     struct dlm_ctxt *dlm = data;
3103     struct dlm_lock_resource *res = NULL;
3104     struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
3105     struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
3106     const char *name;
3107     unsigned int namelen, hash;
3108     int ret = 0;
3109 
3110     if (!dlm_grab(dlm))
3111         return 0;
3112 
3113     name = migrate->name;
3114     namelen = migrate->namelen;
3115     hash = dlm_lockid_hash(name, namelen);
3116 
3117     /* preallocate.. if this fails, abort */
3118     mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
3119 
3120     if (!mle) {
3121         ret = -ENOMEM;
3122         goto leave;
3123     }
3124 
3125     /* check for pre-existing lock */
3126     spin_lock(&dlm->spinlock);
3127     res = __dlm_lookup_lockres(dlm, name, namelen, hash);
3128     if (res) {
3129         spin_lock(&res->spinlock);
3130         if (res->state & DLM_LOCK_RES_RECOVERING) {
3131             /* if all is working ok, this can only mean that we got
3132             * a migrate request from a node that we now see as
3133             * dead.  what can we do here?  drop it to the floor? */
3134             spin_unlock(&res->spinlock);
3135             mlog(ML_ERROR, "Got a migrate request, but the "
3136                  "lockres is marked as recovering!");
3137             kmem_cache_free(dlm_mle_cache, mle);
3138             ret = -EINVAL; /* need a better solution */
3139             goto unlock;
3140         }
3141         res->state |= DLM_LOCK_RES_MIGRATING;
3142         spin_unlock(&res->spinlock);
3143     }
3144 
3145     spin_lock(&dlm->master_lock);
3146     /* ignore status.  only nonzero status would BUG. */
3147     ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3148                     name, namelen,
3149                     migrate->new_master,
3150                     migrate->master);
3151 
3152     if (ret < 0)
3153         kmem_cache_free(dlm_mle_cache, mle);
3154 
3155     spin_unlock(&dlm->master_lock);
3156 unlock:
3157     spin_unlock(&dlm->spinlock);
3158 
3159     if (oldmle) {
3160         /* master is known, detach if not already detached */
3161         dlm_mle_detach_hb_events(dlm, oldmle);
3162         dlm_put_mle(oldmle);
3163     }
3164 
3165     if (res)
3166         dlm_lockres_put(res);
3167 leave:
3168     dlm_put(dlm);
3169     return ret;
3170 }
3171 
3172 /* must be holding dlm->spinlock and dlm->master_lock
3173  * when adding a migration mle, we can clear any other mles
3174  * in the master list because we know with certainty that
3175  * the master is "master".  so we remove any old mle from
3176  * the list after setting it's master field, and then add
3177  * the new migration mle.  this way we can hold with the rule
3178  * of having only one mle for a given lock name at all times. */
3179 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3180                  struct dlm_lock_resource *res,
3181                  struct dlm_master_list_entry *mle,
3182                  struct dlm_master_list_entry **oldmle,
3183                  const char *name, unsigned int namelen,
3184                  u8 new_master, u8 master)
3185 {
3186     int found;
3187     int ret = 0;
3188 
3189     *oldmle = NULL;
3190 
3191     assert_spin_locked(&dlm->spinlock);
3192     assert_spin_locked(&dlm->master_lock);
3193 
3194     /* caller is responsible for any ref taken here on oldmle */
3195     found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
3196     if (found) {
3197         struct dlm_master_list_entry *tmp = *oldmle;
3198         spin_lock(&tmp->spinlock);
3199         if (tmp->type == DLM_MLE_MIGRATION) {
3200             if (master == dlm->node_num) {
3201                 /* ah another process raced me to it */
3202                 mlog(0, "tried to migrate %.*s, but some "
3203                      "process beat me to it\n",
3204                      namelen, name);
3205                 spin_unlock(&tmp->spinlock);
3206                 return -EEXIST;
3207             } else {
3208                 /* bad.  2 NODES are trying to migrate! */
3209                 mlog(ML_ERROR, "migration error  mle: "
3210                      "master=%u new_master=%u // request: "
3211                      "master=%u new_master=%u // "
3212                      "lockres=%.*s\n",
3213                      tmp->master, tmp->new_master,
3214                      master, new_master,
3215                      namelen, name);
3216                 BUG();
3217             }
3218         } else {
3219             /* this is essentially what assert_master does */
3220             tmp->master = master;
3221             atomic_set(&tmp->woken, 1);
3222             wake_up(&tmp->wq);
3223             /* remove it so that only one mle will be found */
3224             __dlm_unlink_mle(dlm, tmp);
3225             __dlm_mle_detach_hb_events(dlm, tmp);
3226             if (tmp->type == DLM_MLE_MASTER) {
3227                 ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3228                 mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3229                         "telling master to get ref "
3230                         "for cleared out mle during "
3231                         "migration\n", dlm->name,
3232                         namelen, name, master,
3233                         new_master);
3234             }
3235         }
3236         spin_unlock(&tmp->spinlock);
3237     }
3238 
3239     /* now add a migration mle to the tail of the list */
3240     dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
3241     mle->new_master = new_master;
3242     /* the new master will be sending an assert master for this.
3243      * at that point we will get the refmap reference */
3244     mle->master = master;
3245     /* do this for consistency with other mle types */
3246     set_bit(new_master, mle->maybe_map);
3247     __dlm_insert_mle(dlm, mle);
3248 
3249     return ret;
3250 }
3251 
3252 /*
3253  * Sets the owner of the lockres, associated to the mle, to UNKNOWN
3254  */
3255 static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3256                     struct dlm_master_list_entry *mle)
3257 {
3258     struct dlm_lock_resource *res;
3259 
3260     /* Find the lockres associated to the mle and set its owner to UNK */
3261     res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
3262                    mle->mnamehash);
3263     if (res) {
3264         spin_unlock(&dlm->master_lock);
3265 
3266         /* move lockres onto recovery list */
3267         spin_lock(&res->spinlock);
3268         dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3269         dlm_move_lockres_to_recovery_list(dlm, res);
3270         spin_unlock(&res->spinlock);
3271         dlm_lockres_put(res);
3272 
3273         /* about to get rid of mle, detach from heartbeat */
3274         __dlm_mle_detach_hb_events(dlm, mle);
3275 
3276         /* dump the mle */
3277         spin_lock(&dlm->master_lock);
3278         __dlm_put_mle(mle);
3279         spin_unlock(&dlm->master_lock);
3280     }
3281 
3282     return res;
3283 }
3284 
3285 static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3286                     struct dlm_master_list_entry *mle)
3287 {
3288     __dlm_mle_detach_hb_events(dlm, mle);
3289 
3290     spin_lock(&mle->spinlock);
3291     __dlm_unlink_mle(dlm, mle);
3292     atomic_set(&mle->woken, 1);
3293     spin_unlock(&mle->spinlock);
3294 
3295     wake_up(&mle->wq);
3296 }
3297 
3298 static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3299                 struct dlm_master_list_entry *mle, u8 dead_node)
3300 {
3301     int bit;
3302 
3303     BUG_ON(mle->type != DLM_MLE_BLOCK);
3304 
3305     spin_lock(&mle->spinlock);
3306     bit = find_first_bit(mle->maybe_map, O2NM_MAX_NODES);
3307     if (bit != dead_node) {
3308         mlog(0, "mle found, but dead node %u would not have been "
3309              "master\n", dead_node);
3310         spin_unlock(&mle->spinlock);
3311     } else {
3312         /* Must drop the refcount by one since the assert_master will
3313          * never arrive. This may result in the mle being unlinked and
3314          * freed, but there may still be a process waiting in the
3315          * dlmlock path which is fine. */
3316         mlog(0, "node %u was expected master\n", dead_node);
3317         atomic_set(&mle->woken, 1);
3318         spin_unlock(&mle->spinlock);
3319         wake_up(&mle->wq);
3320 
3321         /* Do not need events any longer, so detach from heartbeat */
3322         __dlm_mle_detach_hb_events(dlm, mle);
3323         __dlm_put_mle(mle);
3324     }
3325 }
3326 
3327 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3328 {
3329     struct dlm_master_list_entry *mle;
3330     struct dlm_lock_resource *res;
3331     struct hlist_head *bucket;
3332     struct hlist_node *tmp;
3333     unsigned int i;
3334 
3335     mlog(0, "dlm=%s, dead node=%u\n", dlm->name, dead_node);
3336 top:
3337     assert_spin_locked(&dlm->spinlock);
3338 
3339     /* clean the master list */
3340     spin_lock(&dlm->master_lock);
3341     for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3342         bucket = dlm_master_hash(dlm, i);
3343         hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3344             BUG_ON(mle->type != DLM_MLE_BLOCK &&
3345                    mle->type != DLM_MLE_MASTER &&
3346                    mle->type != DLM_MLE_MIGRATION);
3347 
3348             /* MASTER mles are initiated locally. The waiting
3349              * process will notice the node map change shortly.
3350              * Let that happen as normal. */
3351             if (mle->type == DLM_MLE_MASTER)
3352                 continue;
3353 
3354             /* BLOCK mles are initiated by other nodes. Need to
3355              * clean up if the dead node would have been the
3356              * master. */
3357             if (mle->type == DLM_MLE_BLOCK) {
3358                 dlm_clean_block_mle(dlm, mle, dead_node);
3359                 continue;
3360             }
3361 
3362             /* Everything else is a MIGRATION mle */
3363 
3364             /* The rule for MIGRATION mles is that the master
3365              * becomes UNKNOWN if *either* the original or the new
3366              * master dies. All UNKNOWN lockres' are sent to
3367              * whichever node becomes the recovery master. The new
3368              * master is responsible for determining if there is
3369              * still a master for this lockres, or if he needs to
3370              * take over mastery. Either way, this node should
3371              * expect another message to resolve this. */
3372 
3373             if (mle->master != dead_node &&
3374                 mle->new_master != dead_node)
3375                 continue;
3376 
3377             if (mle->new_master == dead_node && mle->inuse) {
3378                 mlog(ML_NOTICE, "%s: target %u died during "
3379                         "migration from %u, the MLE is "
3380                         "still keep used, ignore it!\n",
3381                         dlm->name, dead_node,
3382                         mle->master);
3383                 continue;
3384             }
3385 
3386             /* If we have reached this point, this mle needs to be
3387              * removed from the list and freed. */
3388             dlm_clean_migration_mle(dlm, mle);
3389 
3390             mlog(0, "%s: node %u died during migration from "
3391                  "%u to %u!\n", dlm->name, dead_node, mle->master,
3392                  mle->new_master);
3393 
3394             /* If we find a lockres associated with the mle, we've
3395              * hit this rare case that messes up our lock ordering.
3396              * If so, we need to drop the master lock so that we can
3397              * take the lockres lock, meaning that we will have to
3398              * restart from the head of list. */
3399             res = dlm_reset_mleres_owner(dlm, mle);
3400             if (res)
3401                 /* restart */
3402                 goto top;
3403 
3404             /* This may be the last reference */
3405             __dlm_put_mle(mle);
3406         }
3407     }
3408     spin_unlock(&dlm->master_lock);
3409 }
3410 
3411 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3412              u8 old_master)
3413 {
3414     struct dlm_node_iter iter;
3415     int ret = 0;
3416 
3417     spin_lock(&dlm->spinlock);
3418     dlm_node_iter_init(dlm->domain_map, &iter);
3419     clear_bit(old_master, iter.node_map);
3420     clear_bit(dlm->node_num, iter.node_map);
3421     spin_unlock(&dlm->spinlock);
3422 
3423     /* ownership of the lockres is changing.  account for the
3424      * mastery reference here since old_master will briefly have
3425      * a reference after the migration completes */
3426     spin_lock(&res->spinlock);
3427     dlm_lockres_set_refmap_bit(dlm, res, old_master);
3428     spin_unlock(&res->spinlock);
3429 
3430     mlog(0, "now time to do a migrate request to other nodes\n");
3431     ret = dlm_do_migrate_request(dlm, res, old_master,
3432                      dlm->node_num, &iter);
3433     if (ret < 0) {
3434         mlog_errno(ret);
3435         goto leave;
3436     }
3437 
3438     mlog(0, "doing assert master of %.*s to all except the original node\n",
3439          res->lockname.len, res->lockname.name);
3440     /* this call now finishes out the nodemap
3441      * even if one or more nodes die */
3442     ret = dlm_do_assert_master(dlm, res, iter.node_map,
3443                    DLM_ASSERT_MASTER_FINISH_MIGRATION);
3444     if (ret < 0) {
3445         /* no longer need to retry.  all living nodes contacted. */
3446         mlog_errno(ret);
3447         ret = 0;
3448     }
3449 
3450     memset(iter.node_map, 0, sizeof(iter.node_map));
3451     set_bit(old_master, iter.node_map);
3452     mlog(0, "doing assert master of %.*s back to %u\n",
3453          res->lockname.len, res->lockname.name, old_master);
3454     ret = dlm_do_assert_master(dlm, res, iter.node_map,
3455                    DLM_ASSERT_MASTER_FINISH_MIGRATION);
3456     if (ret < 0) {
3457         mlog(0, "assert master to original master failed "
3458              "with %d.\n", ret);
3459         /* the only nonzero status here would be because of
3460          * a dead original node.  we're done. */
3461         ret = 0;
3462     }
3463 
3464     /* all done, set the owner, clear the flag */
3465     spin_lock(&res->spinlock);
3466     dlm_set_lockres_owner(dlm, res, dlm->node_num);
3467     res->state &= ~DLM_LOCK_RES_MIGRATING;
3468     spin_unlock(&res->spinlock);
3469     /* re-dirty it on the new master */
3470     dlm_kick_thread(dlm, res);
3471     wake_up(&res->wq);
3472 leave:
3473     return ret;
3474 }
3475 
3476 /*
3477  * LOCKRES AST REFCOUNT
3478  * this is integral to migration
3479  */
3480 
3481 /* for future intent to call an ast, reserve one ahead of time.
3482  * this should be called only after waiting on the lockres
3483  * with dlm_wait_on_lockres, and while still holding the
3484  * spinlock after the call. */
3485 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
3486 {
3487     assert_spin_locked(&res->spinlock);
3488     if (res->state & DLM_LOCK_RES_MIGRATING) {
3489         __dlm_print_one_lock_resource(res);
3490     }
3491     BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3492 
3493     atomic_inc(&res->asts_reserved);
3494 }
3495 
3496 /*
3497  * used to drop the reserved ast, either because it went unused,
3498  * or because the ast/bast was actually called.
3499  *
3500  * also, if there is a pending migration on this lockres,
3501  * and this was the last pending ast on the lockres,
3502  * atomically set the MIGRATING flag before we drop the lock.
3503  * this is how we ensure that migration can proceed with no
3504  * asts in progress.  note that it is ok if the state of the
3505  * queues is such that a lock should be granted in the future
3506  * or that a bast should be fired, because the new master will
3507  * shuffle the lists on this lockres as soon as it is migrated.
3508  */
3509 void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
3510                  struct dlm_lock_resource *res)
3511 {
3512     if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
3513         return;
3514 
3515     if (!res->migration_pending) {
3516         spin_unlock(&res->spinlock);
3517         return;
3518     }
3519 
3520     BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3521     res->migration_pending = 0;
3522     res->state |= DLM_LOCK_RES_MIGRATING;
3523     spin_unlock(&res->spinlock);
3524     wake_up(&res->wq);
3525     wake_up(&dlm->migration_wq);
3526 }
3527 
3528 void dlm_force_free_mles(struct dlm_ctxt *dlm)
3529 {
3530     int i;
3531     struct hlist_head *bucket;
3532     struct dlm_master_list_entry *mle;
3533     struct hlist_node *tmp;
3534 
3535     /*
3536      * We notified all other nodes that we are exiting the domain and
3537      * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
3538      * around we force free them and wake any processes that are waiting
3539      * on the mles
3540      */
3541     spin_lock(&dlm->spinlock);
3542     spin_lock(&dlm->master_lock);
3543 
3544     BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING);
3545     BUG_ON((find_first_bit(dlm->domain_map, O2NM_MAX_NODES) < O2NM_MAX_NODES));
3546 
3547     for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3548         bucket = dlm_master_hash(dlm, i);
3549         hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3550             if (mle->type != DLM_MLE_BLOCK) {
3551                 mlog(ML_ERROR, "bad mle: %p\n", mle);
3552                 dlm_print_one_mle(mle);
3553             }
3554             atomic_set(&mle->woken, 1);
3555             wake_up(&mle->wq);
3556 
3557             __dlm_unlink_mle(dlm, mle);
3558             __dlm_mle_detach_hb_events(dlm, mle);
3559             __dlm_put_mle(mle);
3560         }
3561     }
3562     spin_unlock(&dlm->master_lock);
3563     spin_unlock(&dlm->spinlock);
3564 }