Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-only
0002 /******************************************************************************
0003 *******************************************************************************
0004 **
0005 **  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
0006 **
0007 **
0008 *******************************************************************************
0009 ******************************************************************************/
0010 
0011 #include "dlm_internal.h"
0012 #include "member.h"
0013 #include "lock.h"
0014 #include "dir.h"
0015 #include "config.h"
0016 #include "requestqueue.h"
0017 #include "util.h"
0018 
0019 struct rq_entry {
0020     struct list_head list;
0021     uint32_t recover_seq;
0022     int nodeid;
0023     struct dlm_message request;
0024 };
0025 
0026 /*
0027  * Requests received while the lockspace is in recovery get added to the
0028  * request queue and processed when recovery is complete.  This happens when
0029  * the lockspace is suspended on some nodes before it is on others, or the
0030  * lockspace is enabled on some while still suspended on others.
0031  */
0032 
0033 void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
0034 {
0035     struct rq_entry *e;
0036     int length = le16_to_cpu(ms->m_header.h_length) -
0037         sizeof(struct dlm_message);
0038 
0039     e = kmalloc(sizeof(struct rq_entry) + length, GFP_NOFS);
0040     if (!e) {
0041         log_print("dlm_add_requestqueue: out of memory len %d", length);
0042         return;
0043     }
0044 
0045     e->recover_seq = ls->ls_recover_seq & 0xFFFFFFFF;
0046     e->nodeid = nodeid;
0047     memcpy(&e->request, ms, le16_to_cpu(ms->m_header.h_length));
0048 
0049     atomic_inc(&ls->ls_requestqueue_cnt);
0050     mutex_lock(&ls->ls_requestqueue_mutex);
0051     list_add_tail(&e->list, &ls->ls_requestqueue);
0052     mutex_unlock(&ls->ls_requestqueue_mutex);
0053 }
0054 
0055 /*
0056  * Called by dlm_recoverd to process normal messages saved while recovery was
0057  * happening.  Normal locking has been enabled before this is called.  dlm_recv
0058  * upon receiving a message, will wait for all saved messages to be drained
0059  * here before processing the message it got.  If a new dlm_ls_stop() arrives
0060  * while we're processing these saved messages, it may block trying to suspend
0061  * dlm_recv if dlm_recv is waiting for us in dlm_wait_requestqueue.  In that
0062  * case, we don't abort since locking_stopped is still 0.  If dlm_recv is not
0063  * waiting for us, then this processing may be aborted due to locking_stopped.
0064  */
0065 
0066 int dlm_process_requestqueue(struct dlm_ls *ls)
0067 {
0068     struct rq_entry *e;
0069     struct dlm_message *ms;
0070     int error = 0;
0071 
0072     mutex_lock(&ls->ls_requestqueue_mutex);
0073 
0074     for (;;) {
0075         if (list_empty(&ls->ls_requestqueue)) {
0076             mutex_unlock(&ls->ls_requestqueue_mutex);
0077             error = 0;
0078             break;
0079         }
0080         e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
0081         mutex_unlock(&ls->ls_requestqueue_mutex);
0082 
0083         ms = &e->request;
0084 
0085         log_limit(ls, "dlm_process_requestqueue msg %d from %d "
0086               "lkid %x remid %x result %d seq %u",
0087               le32_to_cpu(ms->m_type),
0088               le32_to_cpu(ms->m_header.h_nodeid),
0089               le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
0090               from_dlm_errno(le32_to_cpu(ms->m_result)),
0091               e->recover_seq);
0092 
0093         dlm_receive_message_saved(ls, &e->request, e->recover_seq);
0094 
0095         mutex_lock(&ls->ls_requestqueue_mutex);
0096         list_del(&e->list);
0097         if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
0098             wake_up(&ls->ls_requestqueue_wait);
0099         kfree(e);
0100 
0101         if (dlm_locking_stopped(ls)) {
0102             log_debug(ls, "process_requestqueue abort running");
0103             mutex_unlock(&ls->ls_requestqueue_mutex);
0104             error = -EINTR;
0105             break;
0106         }
0107         schedule();
0108     }
0109 
0110     return error;
0111 }
0112 
0113 /*
0114  * After recovery is done, locking is resumed and dlm_recoverd takes all the
0115  * saved requests and processes them as they would have been by dlm_recv.  At
0116  * the same time, dlm_recv will start receiving new requests from remote nodes.
0117  * We want to delay dlm_recv processing new requests until dlm_recoverd has
0118  * finished processing the old saved requests.  We don't check for locking
0119  * stopped here because dlm_ls_stop won't stop locking until it's suspended us
0120  * (dlm_recv).
0121  */
0122 
0123 void dlm_wait_requestqueue(struct dlm_ls *ls)
0124 {
0125     wait_event(ls->ls_requestqueue_wait,
0126            atomic_read(&ls->ls_requestqueue_cnt) == 0);
0127 }
0128 
0129 static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
0130 {
0131     __le32 type = ms->m_type;
0132 
0133     /* the ls is being cleaned up and freed by release_lockspace */
0134     if (!atomic_read(&ls->ls_count))
0135         return 1;
0136 
0137     if (dlm_is_removed(ls, nodeid))
0138         return 1;
0139 
0140     /* directory operations are always purged because the directory is
0141        always rebuilt during recovery and the lookups resent */
0142 
0143     if (type == cpu_to_le32(DLM_MSG_REMOVE) ||
0144         type == cpu_to_le32(DLM_MSG_LOOKUP) ||
0145         type == cpu_to_le32(DLM_MSG_LOOKUP_REPLY))
0146         return 1;
0147 
0148     if (!dlm_no_directory(ls))
0149         return 0;
0150 
0151     return 1;
0152 }
0153 
0154 void dlm_purge_requestqueue(struct dlm_ls *ls)
0155 {
0156     struct dlm_message *ms;
0157     struct rq_entry *e, *safe;
0158 
0159     mutex_lock(&ls->ls_requestqueue_mutex);
0160     list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
0161         ms =  &e->request;
0162 
0163         if (purge_request(ls, ms, e->nodeid)) {
0164             list_del(&e->list);
0165             if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
0166                 wake_up(&ls->ls_requestqueue_wait);
0167             kfree(e);
0168         }
0169     }
0170     mutex_unlock(&ls->ls_requestqueue_mutex);
0171 }
0172