Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0
0002 /*
0003  * Copyright (C) 2015 Davidlohr Bueso.
0004  *
0005  * Block a bunch of threads and let parallel waker threads wakeup an
0006  * equal amount of them. The program output reflects the avg latency
0007  * for each individual thread to service its share of work. Ultimately
0008  * it can be used to measure futex_wake() changes.
0009  */
0010 #include "bench.h"
0011 #include <linux/compiler.h>
0012 #include "../util/debug.h"
0013 
0014 #ifndef HAVE_PTHREAD_BARRIER
0015 int bench_futex_wake_parallel(int argc __maybe_unused, const char **argv __maybe_unused)
0016 {
0017     pr_err("%s: pthread_barrier_t unavailable, disabling this test...\n", __func__);
0018     return 0;
0019 }
0020 #else /* HAVE_PTHREAD_BARRIER */
0021 /* For the CLR_() macros */
0022 #include <string.h>
0023 #include <pthread.h>
0024 
0025 #include <signal.h>
0026 #include "../util/stat.h"
0027 #include <subcmd/parse-options.h>
0028 #include <linux/kernel.h>
0029 #include <linux/time64.h>
0030 #include <errno.h>
0031 #include "futex.h"
0032 #include <perf/cpumap.h>
0033 
0034 #include <err.h>
0035 #include <stdlib.h>
0036 #include <sys/time.h>
0037 #include <sys/mman.h>
0038 
0039 struct thread_data {
0040     pthread_t worker;
0041     unsigned int nwoken;
0042     struct timeval runtime;
0043 };
0044 
0045 static unsigned int nwakes = 1;
0046 
0047 /* all threads will block on the same futex -- hash bucket chaos ;) */
0048 static u_int32_t futex = 0;
0049 
0050 static pthread_t *blocked_worker;
0051 static bool done = false;
0052 static pthread_mutex_t thread_lock;
0053 static pthread_cond_t thread_parent, thread_worker;
0054 static pthread_barrier_t barrier;
0055 static struct stats waketime_stats, wakeup_stats;
0056 static unsigned int threads_starting;
0057 static int futex_flag = 0;
0058 
0059 static struct bench_futex_parameters params;
0060 
0061 static const struct option options[] = {
0062     OPT_UINTEGER('t', "threads", &params.nthreads, "Specify amount of threads"),
0063     OPT_UINTEGER('w', "nwakers", &params.nwakes, "Specify amount of waking threads"),
0064     OPT_BOOLEAN( 's', "silent",  &params.silent, "Silent mode: do not display data/details"),
0065     OPT_BOOLEAN( 'S', "shared",  &params.fshared, "Use shared futexes instead of private ones"),
0066     OPT_BOOLEAN( 'm', "mlockall", &params.mlockall, "Lock all current and future memory"),
0067 
0068     OPT_END()
0069 };
0070 
0071 static const char * const bench_futex_wake_parallel_usage[] = {
0072     "perf bench futex wake-parallel <options>",
0073     NULL
0074 };
0075 
0076 static void *waking_workerfn(void *arg)
0077 {
0078     struct thread_data *waker = (struct thread_data *) arg;
0079     struct timeval start, end;
0080 
0081     pthread_barrier_wait(&barrier);
0082 
0083     gettimeofday(&start, NULL);
0084 
0085     waker->nwoken = futex_wake(&futex, nwakes, futex_flag);
0086     if (waker->nwoken != nwakes)
0087         warnx("couldn't wakeup all tasks (%d/%d)",
0088               waker->nwoken, nwakes);
0089 
0090     gettimeofday(&end, NULL);
0091     timersub(&end, &start, &waker->runtime);
0092 
0093     pthread_exit(NULL);
0094     return NULL;
0095 }
0096 
0097 static void wakeup_threads(struct thread_data *td, pthread_attr_t thread_attr)
0098 {
0099     unsigned int i;
0100 
0101     pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE);
0102 
0103     pthread_barrier_init(&barrier, NULL, params.nwakes + 1);
0104 
0105     /* create and block all threads */
0106     for (i = 0; i < params.nwakes; i++) {
0107         /*
0108          * Thread creation order will impact per-thread latency
0109          * as it will affect the order to acquire the hb spinlock.
0110          * For now let the scheduler decide.
0111          */
0112         if (pthread_create(&td[i].worker, &thread_attr,
0113                    waking_workerfn, (void *)&td[i]))
0114             err(EXIT_FAILURE, "pthread_create");
0115     }
0116 
0117     pthread_barrier_wait(&barrier);
0118 
0119     for (i = 0; i < params.nwakes; i++)
0120         if (pthread_join(td[i].worker, NULL))
0121             err(EXIT_FAILURE, "pthread_join");
0122 
0123     pthread_barrier_destroy(&barrier);
0124 }
0125 
0126 static void *blocked_workerfn(void *arg __maybe_unused)
0127 {
0128     pthread_mutex_lock(&thread_lock);
0129     threads_starting--;
0130     if (!threads_starting)
0131         pthread_cond_signal(&thread_parent);
0132     pthread_cond_wait(&thread_worker, &thread_lock);
0133     pthread_mutex_unlock(&thread_lock);
0134 
0135     while (1) { /* handle spurious wakeups */
0136         if (futex_wait(&futex, 0, NULL, futex_flag) != EINTR)
0137             break;
0138     }
0139 
0140     pthread_exit(NULL);
0141     return NULL;
0142 }
0143 
0144 static void block_threads(pthread_t *w, pthread_attr_t thread_attr,
0145               struct perf_cpu_map *cpu)
0146 {
0147     cpu_set_t *cpuset;
0148     unsigned int i;
0149     int nrcpus = perf_cpu_map__nr(cpu);
0150     size_t size;
0151 
0152     threads_starting = params.nthreads;
0153 
0154     cpuset = CPU_ALLOC(nrcpus);
0155     BUG_ON(!cpuset);
0156     size = CPU_ALLOC_SIZE(nrcpus);
0157 
0158     /* create and block all threads */
0159     for (i = 0; i < params.nthreads; i++) {
0160         CPU_ZERO_S(size, cpuset);
0161         CPU_SET_S(perf_cpu_map__cpu(cpu, i % perf_cpu_map__nr(cpu)).cpu, size, cpuset);
0162 
0163         if (pthread_attr_setaffinity_np(&thread_attr, size, cpuset)) {
0164             CPU_FREE(cpuset);
0165             err(EXIT_FAILURE, "pthread_attr_setaffinity_np");
0166         }
0167 
0168         if (pthread_create(&w[i], &thread_attr, blocked_workerfn, NULL)) {
0169             CPU_FREE(cpuset);
0170             err(EXIT_FAILURE, "pthread_create");
0171         }
0172     }
0173     CPU_FREE(cpuset);
0174 }
0175 
0176 static void print_run(struct thread_data *waking_worker, unsigned int run_num)
0177 {
0178     unsigned int i, wakeup_avg;
0179     double waketime_avg, waketime_stddev;
0180     struct stats __waketime_stats, __wakeup_stats;
0181 
0182     init_stats(&__wakeup_stats);
0183     init_stats(&__waketime_stats);
0184 
0185     for (i = 0; i < params.nwakes; i++) {
0186         update_stats(&__waketime_stats, waking_worker[i].runtime.tv_usec);
0187         update_stats(&__wakeup_stats, waking_worker[i].nwoken);
0188     }
0189 
0190     waketime_avg = avg_stats(&__waketime_stats);
0191     waketime_stddev = stddev_stats(&__waketime_stats);
0192     wakeup_avg = avg_stats(&__wakeup_stats);
0193 
0194     printf("[Run %d]: Avg per-thread latency (waking %d/%d threads) "
0195            "in %.4f ms (+-%.2f%%)\n", run_num + 1, wakeup_avg,
0196            params.nthreads, waketime_avg / USEC_PER_MSEC,
0197            rel_stddev_stats(waketime_stddev, waketime_avg));
0198 }
0199 
0200 static void print_summary(void)
0201 {
0202     unsigned int wakeup_avg;
0203     double waketime_avg, waketime_stddev;
0204 
0205     waketime_avg = avg_stats(&waketime_stats);
0206     waketime_stddev = stddev_stats(&waketime_stats);
0207     wakeup_avg = avg_stats(&wakeup_stats);
0208 
0209     printf("Avg per-thread latency (waking %d/%d threads) in %.4f ms (+-%.2f%%)\n",
0210            wakeup_avg,
0211            params.nthreads,
0212            waketime_avg / USEC_PER_MSEC,
0213            rel_stddev_stats(waketime_stddev, waketime_avg));
0214 }
0215 
0216 
0217 static void do_run_stats(struct thread_data *waking_worker)
0218 {
0219     unsigned int i;
0220 
0221     for (i = 0; i < params.nwakes; i++) {
0222         update_stats(&waketime_stats, waking_worker[i].runtime.tv_usec);
0223         update_stats(&wakeup_stats, waking_worker[i].nwoken);
0224     }
0225 
0226 }
0227 
0228 static void toggle_done(int sig __maybe_unused,
0229             siginfo_t *info __maybe_unused,
0230             void *uc __maybe_unused)
0231 {
0232     done = true;
0233 }
0234 
0235 int bench_futex_wake_parallel(int argc, const char **argv)
0236 {
0237     int ret = 0;
0238     unsigned int i, j;
0239     struct sigaction act;
0240     pthread_attr_t thread_attr;
0241     struct thread_data *waking_worker;
0242     struct perf_cpu_map *cpu;
0243 
0244     argc = parse_options(argc, argv, options,
0245                  bench_futex_wake_parallel_usage, 0);
0246     if (argc) {
0247         usage_with_options(bench_futex_wake_parallel_usage, options);
0248         exit(EXIT_FAILURE);
0249     }
0250 
0251     memset(&act, 0, sizeof(act));
0252     sigfillset(&act.sa_mask);
0253     act.sa_sigaction = toggle_done;
0254     sigaction(SIGINT, &act, NULL);
0255 
0256     if (params.mlockall) {
0257         if (mlockall(MCL_CURRENT | MCL_FUTURE))
0258             err(EXIT_FAILURE, "mlockall");
0259     }
0260 
0261     cpu = perf_cpu_map__new(NULL);
0262     if (!cpu)
0263         err(EXIT_FAILURE, "calloc");
0264 
0265     if (!params.nthreads)
0266         params.nthreads = perf_cpu_map__nr(cpu);
0267 
0268     /* some sanity checks */
0269     if (params.nwakes > params.nthreads ||
0270         !params.nwakes)
0271         params.nwakes = params.nthreads;
0272 
0273     if (params.nthreads % params.nwakes)
0274         errx(EXIT_FAILURE, "Must be perfectly divisible");
0275     /*
0276      * Each thread will wakeup nwakes tasks in
0277      * a single futex_wait call.
0278      */
0279     nwakes = params.nthreads/params.nwakes;
0280 
0281     blocked_worker = calloc(params.nthreads, sizeof(*blocked_worker));
0282     if (!blocked_worker)
0283         err(EXIT_FAILURE, "calloc");
0284 
0285     if (!params.fshared)
0286         futex_flag = FUTEX_PRIVATE_FLAG;
0287 
0288     printf("Run summary [PID %d]: blocking on %d threads (at [%s] "
0289            "futex %p), %d threads waking up %d at a time.\n\n",
0290            getpid(), params.nthreads, params.fshared ? "shared":"private",
0291            &futex, params.nwakes, nwakes);
0292 
0293     init_stats(&wakeup_stats);
0294     init_stats(&waketime_stats);
0295 
0296     pthread_attr_init(&thread_attr);
0297     pthread_mutex_init(&thread_lock, NULL);
0298     pthread_cond_init(&thread_parent, NULL);
0299     pthread_cond_init(&thread_worker, NULL);
0300 
0301     for (j = 0; j < bench_repeat && !done; j++) {
0302         waking_worker = calloc(params.nwakes, sizeof(*waking_worker));
0303         if (!waking_worker)
0304             err(EXIT_FAILURE, "calloc");
0305 
0306         /* create, launch & block all threads */
0307         block_threads(blocked_worker, thread_attr, cpu);
0308 
0309         /* make sure all threads are already blocked */
0310         pthread_mutex_lock(&thread_lock);
0311         while (threads_starting)
0312             pthread_cond_wait(&thread_parent, &thread_lock);
0313         pthread_cond_broadcast(&thread_worker);
0314         pthread_mutex_unlock(&thread_lock);
0315 
0316         usleep(100000);
0317 
0318         /* Ok, all threads are patiently blocked, start waking folks up */
0319         wakeup_threads(waking_worker, thread_attr);
0320 
0321         for (i = 0; i < params.nthreads; i++) {
0322             ret = pthread_join(blocked_worker[i], NULL);
0323             if (ret)
0324                 err(EXIT_FAILURE, "pthread_join");
0325         }
0326 
0327         do_run_stats(waking_worker);
0328         if (!params.silent)
0329             print_run(waking_worker, j);
0330 
0331         free(waking_worker);
0332     }
0333 
0334     /* cleanup & report results */
0335     pthread_cond_destroy(&thread_parent);
0336     pthread_cond_destroy(&thread_worker);
0337     pthread_mutex_destroy(&thread_lock);
0338     pthread_attr_destroy(&thread_attr);
0339 
0340     print_summary();
0341 
0342     free(blocked_worker);
0343     perf_cpu_map__put(cpu);
0344     return ret;
0345 }
0346 #endif /* HAVE_PTHREAD_BARRIER */