0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 #include <trace/events/dlm.h>
0013
0014 #include "dlm_internal.h"
0015 #include "lock.h"
0016 #include "user.h"
0017 #include "ast.h"
0018
0019 static uint64_t dlm_cb_seq;
0020 static DEFINE_SPINLOCK(dlm_cb_seq_spin);
0021
0022 static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
0023 {
0024 int i;
0025
0026 log_print("last_bast %x %llu flags %x mode %d sb %d %x",
0027 lkb->lkb_id,
0028 (unsigned long long)lkb->lkb_last_bast.seq,
0029 lkb->lkb_last_bast.flags,
0030 lkb->lkb_last_bast.mode,
0031 lkb->lkb_last_bast.sb_status,
0032 lkb->lkb_last_bast.sb_flags);
0033
0034 log_print("last_cast %x %llu flags %x mode %d sb %d %x",
0035 lkb->lkb_id,
0036 (unsigned long long)lkb->lkb_last_cast.seq,
0037 lkb->lkb_last_cast.flags,
0038 lkb->lkb_last_cast.mode,
0039 lkb->lkb_last_cast.sb_status,
0040 lkb->lkb_last_cast.sb_flags);
0041
0042 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
0043 log_print("cb %x %llu flags %x mode %d sb %d %x",
0044 lkb->lkb_id,
0045 (unsigned long long)lkb->lkb_callbacks[i].seq,
0046 lkb->lkb_callbacks[i].flags,
0047 lkb->lkb_callbacks[i].mode,
0048 lkb->lkb_callbacks[i].sb_status,
0049 lkb->lkb_callbacks[i].sb_flags);
0050 }
0051 }
0052
0053 int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
0054 int status, uint32_t sbflags, uint64_t seq)
0055 {
0056 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
0057 uint64_t prev_seq;
0058 int prev_mode;
0059 int i, rv;
0060
0061 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
0062 if (lkb->lkb_callbacks[i].seq)
0063 continue;
0064
0065
0066
0067
0068
0069
0070
0071
0072 if ((i > 0) && (flags & DLM_CB_BAST) &&
0073 (lkb->lkb_callbacks[i-1].flags & DLM_CB_BAST)) {
0074
0075 prev_seq = lkb->lkb_callbacks[i-1].seq;
0076 prev_mode = lkb->lkb_callbacks[i-1].mode;
0077
0078 if ((prev_mode == mode) ||
0079 (prev_mode > mode && prev_mode > DLM_LOCK_PR)) {
0080
0081 log_debug(ls, "skip %x add bast %llu mode %d "
0082 "for bast %llu mode %d",
0083 lkb->lkb_id,
0084 (unsigned long long)seq,
0085 mode,
0086 (unsigned long long)prev_seq,
0087 prev_mode);
0088 rv = 0;
0089 goto out;
0090 }
0091 }
0092
0093 lkb->lkb_callbacks[i].seq = seq;
0094 lkb->lkb_callbacks[i].flags = flags;
0095 lkb->lkb_callbacks[i].mode = mode;
0096 lkb->lkb_callbacks[i].sb_status = status;
0097 lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF);
0098 rv = 0;
0099 break;
0100 }
0101
0102 if (i == DLM_CALLBACKS_SIZE) {
0103 log_error(ls, "no callbacks %x %llu flags %x mode %d sb %d %x",
0104 lkb->lkb_id, (unsigned long long)seq,
0105 flags, mode, status, sbflags);
0106 dlm_dump_lkb_callbacks(lkb);
0107 rv = -1;
0108 goto out;
0109 }
0110 out:
0111 return rv;
0112 }
0113
0114 int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
0115 struct dlm_callback *cb, int *resid)
0116 {
0117 int i, rv;
0118
0119 *resid = 0;
0120
0121 if (!lkb->lkb_callbacks[0].seq) {
0122 rv = -ENOENT;
0123 goto out;
0124 }
0125
0126
0127
0128 memcpy(cb, &lkb->lkb_callbacks[0], sizeof(struct dlm_callback));
0129 memset(&lkb->lkb_callbacks[0], 0, sizeof(struct dlm_callback));
0130
0131
0132
0133 for (i = 1; i < DLM_CALLBACKS_SIZE; i++) {
0134 if (!lkb->lkb_callbacks[i].seq)
0135 break;
0136 memcpy(&lkb->lkb_callbacks[i-1], &lkb->lkb_callbacks[i],
0137 sizeof(struct dlm_callback));
0138 memset(&lkb->lkb_callbacks[i], 0, sizeof(struct dlm_callback));
0139 (*resid)++;
0140 }
0141
0142
0143
0144
0145 if ((cb->flags & DLM_CB_BAST) && lkb->lkb_last_cast.seq) {
0146 if (dlm_modes_compat(cb->mode, lkb->lkb_last_cast.mode)) {
0147 cb->flags |= DLM_CB_SKIP;
0148
0149 log_debug(ls, "skip %x bast %llu mode %d "
0150 "for cast %llu mode %d",
0151 lkb->lkb_id,
0152 (unsigned long long)cb->seq,
0153 cb->mode,
0154 (unsigned long long)lkb->lkb_last_cast.seq,
0155 lkb->lkb_last_cast.mode);
0156 rv = 0;
0157 goto out;
0158 }
0159 }
0160
0161 if (cb->flags & DLM_CB_CAST) {
0162 memcpy(&lkb->lkb_last_cast, cb, sizeof(struct dlm_callback));
0163 lkb->lkb_last_cast_time = ktime_get();
0164 }
0165
0166 if (cb->flags & DLM_CB_BAST) {
0167 memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback));
0168 lkb->lkb_last_bast_time = ktime_get();
0169 }
0170 rv = 0;
0171 out:
0172 return rv;
0173 }
0174
0175 void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
0176 uint32_t sbflags)
0177 {
0178 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
0179 uint64_t new_seq, prev_seq;
0180 int rv;
0181
0182 spin_lock(&dlm_cb_seq_spin);
0183 new_seq = ++dlm_cb_seq;
0184 if (!dlm_cb_seq)
0185 new_seq = ++dlm_cb_seq;
0186 spin_unlock(&dlm_cb_seq_spin);
0187
0188 if (lkb->lkb_flags & DLM_IFL_USER) {
0189 dlm_user_add_ast(lkb, flags, mode, status, sbflags, new_seq);
0190 return;
0191 }
0192
0193 mutex_lock(&lkb->lkb_cb_mutex);
0194 prev_seq = lkb->lkb_callbacks[0].seq;
0195
0196 rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, new_seq);
0197 if (rv < 0)
0198 goto out;
0199
0200 if (!prev_seq) {
0201 kref_get(&lkb->lkb_ref);
0202
0203 if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
0204 mutex_lock(&ls->ls_cb_mutex);
0205 list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
0206 mutex_unlock(&ls->ls_cb_mutex);
0207 } else {
0208 queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
0209 }
0210 }
0211 out:
0212 mutex_unlock(&lkb->lkb_cb_mutex);
0213 }
0214
0215 void dlm_callback_work(struct work_struct *work)
0216 {
0217 struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work);
0218 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
0219 void (*castfn) (void *astparam);
0220 void (*bastfn) (void *astparam, int mode);
0221 struct dlm_callback callbacks[DLM_CALLBACKS_SIZE];
0222 int i, rv, resid;
0223
0224 memset(&callbacks, 0, sizeof(callbacks));
0225
0226 mutex_lock(&lkb->lkb_cb_mutex);
0227 if (!lkb->lkb_callbacks[0].seq) {
0228
0229 log_error(ls, "dlm_callback_work %x no work", lkb->lkb_id);
0230 dlm_print_lkb(lkb);
0231 dlm_dump_lkb_callbacks(lkb);
0232 }
0233
0234 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
0235 rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid);
0236 if (rv < 0)
0237 break;
0238 }
0239
0240 if (resid) {
0241
0242 log_error(ls, "dlm_callback_work %x resid %d", lkb->lkb_id,
0243 resid);
0244 dlm_print_lkb(lkb);
0245 dlm_dump_lkb_callbacks(lkb);
0246 }
0247 mutex_unlock(&lkb->lkb_cb_mutex);
0248
0249 castfn = lkb->lkb_astfn;
0250 bastfn = lkb->lkb_bastfn;
0251
0252 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
0253 if (!callbacks[i].seq)
0254 break;
0255 if (callbacks[i].flags & DLM_CB_SKIP) {
0256 continue;
0257 } else if (callbacks[i].flags & DLM_CB_BAST) {
0258 trace_dlm_bast(ls, lkb, callbacks[i].mode);
0259 bastfn(lkb->lkb_astparam, callbacks[i].mode);
0260 } else if (callbacks[i].flags & DLM_CB_CAST) {
0261 lkb->lkb_lksb->sb_status = callbacks[i].sb_status;
0262 lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags;
0263 trace_dlm_ast(ls, lkb);
0264 castfn(lkb->lkb_astparam);
0265 }
0266 }
0267
0268
0269 dlm_put_lkb(lkb);
0270 }
0271
0272 int dlm_callback_start(struct dlm_ls *ls)
0273 {
0274 ls->ls_callback_wq = alloc_workqueue("dlm_callback",
0275 WQ_HIGHPRI | WQ_MEM_RECLAIM, 0);
0276 if (!ls->ls_callback_wq) {
0277 log_print("can't start dlm_callback workqueue");
0278 return -ENOMEM;
0279 }
0280 return 0;
0281 }
0282
0283 void dlm_callback_stop(struct dlm_ls *ls)
0284 {
0285 if (ls->ls_callback_wq)
0286 destroy_workqueue(ls->ls_callback_wq);
0287 }
0288
0289 void dlm_callback_suspend(struct dlm_ls *ls)
0290 {
0291 set_bit(LSFL_CB_DELAY, &ls->ls_flags);
0292
0293 if (ls->ls_callback_wq)
0294 flush_workqueue(ls->ls_callback_wq);
0295 }
0296
0297 #define MAX_CB_QUEUE 25
0298
0299 void dlm_callback_resume(struct dlm_ls *ls)
0300 {
0301 struct dlm_lkb *lkb, *safe;
0302 int count = 0, sum = 0;
0303 bool empty;
0304
0305 clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
0306
0307 if (!ls->ls_callback_wq)
0308 return;
0309
0310 more:
0311 mutex_lock(&ls->ls_cb_mutex);
0312 list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
0313 list_del_init(&lkb->lkb_cb_list);
0314 queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
0315 count++;
0316 if (count == MAX_CB_QUEUE)
0317 break;
0318 }
0319 empty = list_empty(&ls->ls_cb_delay);
0320 mutex_unlock(&ls->ls_cb_mutex);
0321
0322 sum += count;
0323 if (!empty) {
0324 count = 0;
0325 cond_resched();
0326 goto more;
0327 }
0328
0329 if (sum)
0330 log_rinfo(ls, "%s %d", __func__, sum);
0331 }
0332