Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-only
0002 /******************************************************************************
0003 *******************************************************************************
0004 **
0005 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
0006 **  Copyright (C) 2004-2010 Red Hat, Inc.  All rights reserved.
0007 **
0008 **
0009 *******************************************************************************
0010 ******************************************************************************/
0011 
0012 #include <trace/events/dlm.h>
0013 
0014 #include "dlm_internal.h"
0015 #include "lock.h"
0016 #include "user.h"
0017 #include "ast.h"
0018 
0019 static uint64_t dlm_cb_seq;
0020 static DEFINE_SPINLOCK(dlm_cb_seq_spin);
0021 
0022 static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
0023 {
0024     int i;
0025 
0026     log_print("last_bast %x %llu flags %x mode %d sb %d %x",
0027           lkb->lkb_id,
0028           (unsigned long long)lkb->lkb_last_bast.seq,
0029           lkb->lkb_last_bast.flags,
0030           lkb->lkb_last_bast.mode,
0031           lkb->lkb_last_bast.sb_status,
0032           lkb->lkb_last_bast.sb_flags);
0033 
0034     log_print("last_cast %x %llu flags %x mode %d sb %d %x",
0035           lkb->lkb_id,
0036           (unsigned long long)lkb->lkb_last_cast.seq,
0037           lkb->lkb_last_cast.flags,
0038           lkb->lkb_last_cast.mode,
0039           lkb->lkb_last_cast.sb_status,
0040           lkb->lkb_last_cast.sb_flags);
0041 
0042     for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
0043         log_print("cb %x %llu flags %x mode %d sb %d %x",
0044               lkb->lkb_id,
0045               (unsigned long long)lkb->lkb_callbacks[i].seq,
0046               lkb->lkb_callbacks[i].flags,
0047               lkb->lkb_callbacks[i].mode,
0048               lkb->lkb_callbacks[i].sb_status,
0049               lkb->lkb_callbacks[i].sb_flags);
0050     }
0051 }
0052 
0053 int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
0054              int status, uint32_t sbflags, uint64_t seq)
0055 {
0056     struct dlm_ls *ls = lkb->lkb_resource->res_ls;
0057     uint64_t prev_seq;
0058     int prev_mode;
0059     int i, rv;
0060 
0061     for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
0062         if (lkb->lkb_callbacks[i].seq)
0063             continue;
0064 
0065         /*
0066          * Suppress some redundant basts here, do more on removal.
0067          * Don't even add a bast if the callback just before it
0068          * is a bast for the same mode or a more restrictive mode.
0069          * (the addional > PR check is needed for PR/CW inversion)
0070          */
0071 
0072         if ((i > 0) && (flags & DLM_CB_BAST) &&
0073             (lkb->lkb_callbacks[i-1].flags & DLM_CB_BAST)) {
0074 
0075             prev_seq = lkb->lkb_callbacks[i-1].seq;
0076             prev_mode = lkb->lkb_callbacks[i-1].mode;
0077 
0078             if ((prev_mode == mode) ||
0079                 (prev_mode > mode && prev_mode > DLM_LOCK_PR)) {
0080 
0081                 log_debug(ls, "skip %x add bast %llu mode %d "
0082                       "for bast %llu mode %d",
0083                       lkb->lkb_id,
0084                       (unsigned long long)seq,
0085                       mode,
0086                       (unsigned long long)prev_seq,
0087                       prev_mode);
0088                 rv = 0;
0089                 goto out;
0090             }
0091         }
0092 
0093         lkb->lkb_callbacks[i].seq = seq;
0094         lkb->lkb_callbacks[i].flags = flags;
0095         lkb->lkb_callbacks[i].mode = mode;
0096         lkb->lkb_callbacks[i].sb_status = status;
0097         lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF);
0098         rv = 0;
0099         break;
0100     }
0101 
0102     if (i == DLM_CALLBACKS_SIZE) {
0103         log_error(ls, "no callbacks %x %llu flags %x mode %d sb %d %x",
0104               lkb->lkb_id, (unsigned long long)seq,
0105               flags, mode, status, sbflags);
0106         dlm_dump_lkb_callbacks(lkb);
0107         rv = -1;
0108         goto out;
0109     }
0110  out:
0111     return rv;
0112 }
0113 
0114 int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
0115              struct dlm_callback *cb, int *resid)
0116 {
0117     int i, rv;
0118 
0119     *resid = 0;
0120 
0121     if (!lkb->lkb_callbacks[0].seq) {
0122         rv = -ENOENT;
0123         goto out;
0124     }
0125 
0126     /* oldest undelivered cb is callbacks[0] */
0127 
0128     memcpy(cb, &lkb->lkb_callbacks[0], sizeof(struct dlm_callback));
0129     memset(&lkb->lkb_callbacks[0], 0, sizeof(struct dlm_callback));
0130 
0131     /* shift others down */
0132 
0133     for (i = 1; i < DLM_CALLBACKS_SIZE; i++) {
0134         if (!lkb->lkb_callbacks[i].seq)
0135             break;
0136         memcpy(&lkb->lkb_callbacks[i-1], &lkb->lkb_callbacks[i],
0137                sizeof(struct dlm_callback));
0138         memset(&lkb->lkb_callbacks[i], 0, sizeof(struct dlm_callback));
0139         (*resid)++;
0140     }
0141 
0142     /* if cb is a bast, it should be skipped if the blocking mode is
0143        compatible with the last granted mode */
0144 
0145     if ((cb->flags & DLM_CB_BAST) && lkb->lkb_last_cast.seq) {
0146         if (dlm_modes_compat(cb->mode, lkb->lkb_last_cast.mode)) {
0147             cb->flags |= DLM_CB_SKIP;
0148 
0149             log_debug(ls, "skip %x bast %llu mode %d "
0150                   "for cast %llu mode %d",
0151                   lkb->lkb_id,
0152                   (unsigned long long)cb->seq,
0153                   cb->mode,
0154                   (unsigned long long)lkb->lkb_last_cast.seq,
0155                   lkb->lkb_last_cast.mode);
0156             rv = 0;
0157             goto out;
0158         }
0159     }
0160 
0161     if (cb->flags & DLM_CB_CAST) {
0162         memcpy(&lkb->lkb_last_cast, cb, sizeof(struct dlm_callback));
0163         lkb->lkb_last_cast_time = ktime_get();
0164     }
0165 
0166     if (cb->flags & DLM_CB_BAST) {
0167         memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback));
0168         lkb->lkb_last_bast_time = ktime_get();
0169     }
0170     rv = 0;
0171  out:
0172     return rv;
0173 }
0174 
0175 void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
0176         uint32_t sbflags)
0177 {
0178     struct dlm_ls *ls = lkb->lkb_resource->res_ls;
0179     uint64_t new_seq, prev_seq;
0180     int rv;
0181 
0182     spin_lock(&dlm_cb_seq_spin);
0183     new_seq = ++dlm_cb_seq;
0184     if (!dlm_cb_seq)
0185         new_seq = ++dlm_cb_seq;
0186     spin_unlock(&dlm_cb_seq_spin);
0187 
0188     if (lkb->lkb_flags & DLM_IFL_USER) {
0189         dlm_user_add_ast(lkb, flags, mode, status, sbflags, new_seq);
0190         return;
0191     }
0192 
0193     mutex_lock(&lkb->lkb_cb_mutex);
0194     prev_seq = lkb->lkb_callbacks[0].seq;
0195 
0196     rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, new_seq);
0197     if (rv < 0)
0198         goto out;
0199 
0200     if (!prev_seq) {
0201         kref_get(&lkb->lkb_ref);
0202 
0203         if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
0204             mutex_lock(&ls->ls_cb_mutex);
0205             list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
0206             mutex_unlock(&ls->ls_cb_mutex);
0207         } else {
0208             queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
0209         }
0210     }
0211  out:
0212     mutex_unlock(&lkb->lkb_cb_mutex);
0213 }
0214 
0215 void dlm_callback_work(struct work_struct *work)
0216 {
0217     struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work);
0218     struct dlm_ls *ls = lkb->lkb_resource->res_ls;
0219     void (*castfn) (void *astparam);
0220     void (*bastfn) (void *astparam, int mode);
0221     struct dlm_callback callbacks[DLM_CALLBACKS_SIZE];
0222     int i, rv, resid;
0223 
0224     memset(&callbacks, 0, sizeof(callbacks));
0225 
0226     mutex_lock(&lkb->lkb_cb_mutex);
0227     if (!lkb->lkb_callbacks[0].seq) {
0228         /* no callback work exists, shouldn't happen */
0229         log_error(ls, "dlm_callback_work %x no work", lkb->lkb_id);
0230         dlm_print_lkb(lkb);
0231         dlm_dump_lkb_callbacks(lkb);
0232     }
0233 
0234     for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
0235         rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid);
0236         if (rv < 0)
0237             break;
0238     }
0239 
0240     if (resid) {
0241         /* cbs remain, loop should have removed all, shouldn't happen */
0242         log_error(ls, "dlm_callback_work %x resid %d", lkb->lkb_id,
0243               resid);
0244         dlm_print_lkb(lkb);
0245         dlm_dump_lkb_callbacks(lkb);
0246     }
0247     mutex_unlock(&lkb->lkb_cb_mutex);
0248 
0249     castfn = lkb->lkb_astfn;
0250     bastfn = lkb->lkb_bastfn;
0251 
0252     for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
0253         if (!callbacks[i].seq)
0254             break;
0255         if (callbacks[i].flags & DLM_CB_SKIP) {
0256             continue;
0257         } else if (callbacks[i].flags & DLM_CB_BAST) {
0258             trace_dlm_bast(ls, lkb, callbacks[i].mode);
0259             bastfn(lkb->lkb_astparam, callbacks[i].mode);
0260         } else if (callbacks[i].flags & DLM_CB_CAST) {
0261             lkb->lkb_lksb->sb_status = callbacks[i].sb_status;
0262             lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags;
0263             trace_dlm_ast(ls, lkb);
0264             castfn(lkb->lkb_astparam);
0265         }
0266     }
0267 
0268     /* undo kref_get from dlm_add_callback, may cause lkb to be freed */
0269     dlm_put_lkb(lkb);
0270 }
0271 
0272 int dlm_callback_start(struct dlm_ls *ls)
0273 {
0274     ls->ls_callback_wq = alloc_workqueue("dlm_callback",
0275                          WQ_HIGHPRI | WQ_MEM_RECLAIM, 0);
0276     if (!ls->ls_callback_wq) {
0277         log_print("can't start dlm_callback workqueue");
0278         return -ENOMEM;
0279     }
0280     return 0;
0281 }
0282 
0283 void dlm_callback_stop(struct dlm_ls *ls)
0284 {
0285     if (ls->ls_callback_wq)
0286         destroy_workqueue(ls->ls_callback_wq);
0287 }
0288 
0289 void dlm_callback_suspend(struct dlm_ls *ls)
0290 {
0291     set_bit(LSFL_CB_DELAY, &ls->ls_flags);
0292 
0293     if (ls->ls_callback_wq)
0294         flush_workqueue(ls->ls_callback_wq);
0295 }
0296 
0297 #define MAX_CB_QUEUE 25
0298 
0299 void dlm_callback_resume(struct dlm_ls *ls)
0300 {
0301     struct dlm_lkb *lkb, *safe;
0302     int count = 0, sum = 0;
0303     bool empty;
0304 
0305     clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
0306 
0307     if (!ls->ls_callback_wq)
0308         return;
0309 
0310 more:
0311     mutex_lock(&ls->ls_cb_mutex);
0312     list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
0313         list_del_init(&lkb->lkb_cb_list);
0314         queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
0315         count++;
0316         if (count == MAX_CB_QUEUE)
0317             break;
0318     }
0319     empty = list_empty(&ls->ls_cb_delay);
0320     mutex_unlock(&ls->ls_cb_mutex);
0321 
0322     sum += count;
0323     if (!empty) {
0324         count = 0;
0325         cond_resched();
0326         goto more;
0327     }
0328 
0329     if (sum)
0330         log_rinfo(ls, "%s %d", __func__, sum);
0331 }
0332