Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-or-later
0002 /*
0003  * Copyright (C) 2019 Oracle.  All Rights Reserved.
0004  * Author: Darrick J. Wong <darrick.wong@oracle.com>
0005  */
0006 #include "xfs.h"
0007 #include "xfs_fs.h"
0008 #include "xfs_shared.h"
0009 #include "xfs_format.h"
0010 #include "xfs_log_format.h"
0011 #include "xfs_trans_resv.h"
0012 #include "xfs_mount.h"
0013 #include "xfs_trace.h"
0014 #include "xfs_sysctl.h"
0015 #include "xfs_pwork.h"
0016 #include <linux/nmi.h>
0017 
0018 /*
0019  * Parallel Work Queue
0020  * ===================
0021  *
0022  * Abstract away the details of running a large and "obviously" parallelizable
0023  * task across multiple CPUs.  Callers initialize the pwork control object with
0024  * a desired level of parallelization and a work function.  Next, they embed
0025  * struct xfs_pwork in whatever structure they use to pass work context to a
0026  * worker thread and queue that pwork.  The work function will be passed the
0027  * pwork item when it is run (from process context) and any returned error will
0028  * be recorded in xfs_pwork_ctl.error.  Work functions should check for errors
0029  * and abort if necessary; the non-zeroness of xfs_pwork_ctl.error does not
0030  * stop workqueue item processing.
0031  *
0032  * This is the rough equivalent of the xfsprogs workqueue code, though we can't
0033  * reuse that name here.
0034  */
0035 
0036 /* Invoke our caller's function. */
0037 static void
0038 xfs_pwork_work(
0039     struct work_struct  *work)
0040 {
0041     struct xfs_pwork    *pwork;
0042     struct xfs_pwork_ctl    *pctl;
0043     int         error;
0044 
0045     pwork = container_of(work, struct xfs_pwork, work);
0046     pctl = pwork->pctl;
0047     error = pctl->work_fn(pctl->mp, pwork);
0048     if (error && !pctl->error)
0049         pctl->error = error;
0050     if (atomic_dec_and_test(&pctl->nr_work))
0051         wake_up(&pctl->poll_wait);
0052 }
0053 
0054 /*
0055  * Set up control data for parallel work.  @work_fn is the function that will
0056  * be called.  @tag will be written into the kernel threads.  @nr_threads is
0057  * the level of parallelism desired, or 0 for no limit.
0058  */
0059 int
0060 xfs_pwork_init(
0061     struct xfs_mount    *mp,
0062     struct xfs_pwork_ctl    *pctl,
0063     xfs_pwork_work_fn   work_fn,
0064     const char      *tag)
0065 {
0066     unsigned int        nr_threads = 0;
0067 
0068 #ifdef DEBUG
0069     if (xfs_globals.pwork_threads >= 0)
0070         nr_threads = xfs_globals.pwork_threads;
0071 #endif
0072     trace_xfs_pwork_init(mp, nr_threads, current->pid);
0073 
0074     pctl->wq = alloc_workqueue("%s-%d",
0075             WQ_UNBOUND | WQ_SYSFS | WQ_FREEZABLE, nr_threads, tag,
0076             current->pid);
0077     if (!pctl->wq)
0078         return -ENOMEM;
0079     pctl->work_fn = work_fn;
0080     pctl->error = 0;
0081     pctl->mp = mp;
0082     atomic_set(&pctl->nr_work, 0);
0083     init_waitqueue_head(&pctl->poll_wait);
0084 
0085     return 0;
0086 }
0087 
0088 /* Queue some parallel work. */
0089 void
0090 xfs_pwork_queue(
0091     struct xfs_pwork_ctl    *pctl,
0092     struct xfs_pwork    *pwork)
0093 {
0094     INIT_WORK(&pwork->work, xfs_pwork_work);
0095     pwork->pctl = pctl;
0096     atomic_inc(&pctl->nr_work);
0097     queue_work(pctl->wq, &pwork->work);
0098 }
0099 
0100 /* Wait for the work to finish and tear down the control structure. */
0101 int
0102 xfs_pwork_destroy(
0103     struct xfs_pwork_ctl    *pctl)
0104 {
0105     destroy_workqueue(pctl->wq);
0106     pctl->wq = NULL;
0107     return pctl->error;
0108 }
0109 
0110 /*
0111  * Wait for the work to finish by polling completion status and touch the soft
0112  * lockup watchdog.  This is for callers such as mount which hold locks.
0113  */
0114 void
0115 xfs_pwork_poll(
0116     struct xfs_pwork_ctl    *pctl)
0117 {
0118     while (wait_event_timeout(pctl->poll_wait,
0119                 atomic_read(&pctl->nr_work) == 0, HZ) == 0)
0120         touch_softlockup_watchdog();
0121 }