0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #include "dlm_internal.h"
0012 #include "member.h"
0013 #include "lock.h"
0014 #include "dir.h"
0015 #include "config.h"
0016 #include "requestqueue.h"
0017 #include "util.h"
0018
0019 struct rq_entry {
0020 struct list_head list;
0021 uint32_t recover_seq;
0022 int nodeid;
0023 struct dlm_message request;
0024 };
0025
0026
0027
0028
0029
0030
0031
0032
0033 void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
0034 {
0035 struct rq_entry *e;
0036 int length = le16_to_cpu(ms->m_header.h_length) -
0037 sizeof(struct dlm_message);
0038
0039 e = kmalloc(sizeof(struct rq_entry) + length, GFP_NOFS);
0040 if (!e) {
0041 log_print("dlm_add_requestqueue: out of memory len %d", length);
0042 return;
0043 }
0044
0045 e->recover_seq = ls->ls_recover_seq & 0xFFFFFFFF;
0046 e->nodeid = nodeid;
0047 memcpy(&e->request, ms, le16_to_cpu(ms->m_header.h_length));
0048
0049 atomic_inc(&ls->ls_requestqueue_cnt);
0050 mutex_lock(&ls->ls_requestqueue_mutex);
0051 list_add_tail(&e->list, &ls->ls_requestqueue);
0052 mutex_unlock(&ls->ls_requestqueue_mutex);
0053 }
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066 int dlm_process_requestqueue(struct dlm_ls *ls)
0067 {
0068 struct rq_entry *e;
0069 struct dlm_message *ms;
0070 int error = 0;
0071
0072 mutex_lock(&ls->ls_requestqueue_mutex);
0073
0074 for (;;) {
0075 if (list_empty(&ls->ls_requestqueue)) {
0076 mutex_unlock(&ls->ls_requestqueue_mutex);
0077 error = 0;
0078 break;
0079 }
0080 e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
0081 mutex_unlock(&ls->ls_requestqueue_mutex);
0082
0083 ms = &e->request;
0084
0085 log_limit(ls, "dlm_process_requestqueue msg %d from %d "
0086 "lkid %x remid %x result %d seq %u",
0087 le32_to_cpu(ms->m_type),
0088 le32_to_cpu(ms->m_header.h_nodeid),
0089 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
0090 from_dlm_errno(le32_to_cpu(ms->m_result)),
0091 e->recover_seq);
0092
0093 dlm_receive_message_saved(ls, &e->request, e->recover_seq);
0094
0095 mutex_lock(&ls->ls_requestqueue_mutex);
0096 list_del(&e->list);
0097 if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
0098 wake_up(&ls->ls_requestqueue_wait);
0099 kfree(e);
0100
0101 if (dlm_locking_stopped(ls)) {
0102 log_debug(ls, "process_requestqueue abort running");
0103 mutex_unlock(&ls->ls_requestqueue_mutex);
0104 error = -EINTR;
0105 break;
0106 }
0107 schedule();
0108 }
0109
0110 return error;
0111 }
0112
0113
0114
0115
0116
0117
0118
0119
0120
0121
0122
0123 void dlm_wait_requestqueue(struct dlm_ls *ls)
0124 {
0125 wait_event(ls->ls_requestqueue_wait,
0126 atomic_read(&ls->ls_requestqueue_cnt) == 0);
0127 }
0128
0129 static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
0130 {
0131 __le32 type = ms->m_type;
0132
0133
0134 if (!atomic_read(&ls->ls_count))
0135 return 1;
0136
0137 if (dlm_is_removed(ls, nodeid))
0138 return 1;
0139
0140
0141
0142
0143 if (type == cpu_to_le32(DLM_MSG_REMOVE) ||
0144 type == cpu_to_le32(DLM_MSG_LOOKUP) ||
0145 type == cpu_to_le32(DLM_MSG_LOOKUP_REPLY))
0146 return 1;
0147
0148 if (!dlm_no_directory(ls))
0149 return 0;
0150
0151 return 1;
0152 }
0153
0154 void dlm_purge_requestqueue(struct dlm_ls *ls)
0155 {
0156 struct dlm_message *ms;
0157 struct rq_entry *e, *safe;
0158
0159 mutex_lock(&ls->ls_requestqueue_mutex);
0160 list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
0161 ms = &e->request;
0162
0163 if (purge_request(ls, ms, e->nodeid)) {
0164 list_del(&e->list);
0165 if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
0166 wake_up(&ls->ls_requestqueue_wait);
0167 kfree(e);
0168 }
0169 }
0170 mutex_unlock(&ls->ls_requestqueue_mutex);
0171 }
0172