Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0
0002 /*
0003  * ring buffer tester and benchmark
0004  *
0005  * Copyright (C) 2009 Steven Rostedt <srostedt@redhat.com>
0006  */
0007 #include <linux/ring_buffer.h>
0008 #include <linux/completion.h>
0009 #include <linux/kthread.h>
0010 #include <uapi/linux/sched/types.h>
0011 #include <linux/module.h>
0012 #include <linux/ktime.h>
0013 #include <asm/local.h>
0014 
0015 struct rb_page {
0016     u64     ts;
0017     local_t     commit;
0018     char        data[4080];
0019 };
0020 
0021 /* run time and sleep time in seconds */
0022 #define RUN_TIME    10ULL
0023 #define SLEEP_TIME  10
0024 
0025 /* number of events for writer to wake up the reader */
0026 static int wakeup_interval = 100;
0027 
0028 static int reader_finish;
0029 static DECLARE_COMPLETION(read_start);
0030 static DECLARE_COMPLETION(read_done);
0031 
0032 static struct trace_buffer *buffer;
0033 static struct task_struct *producer;
0034 static struct task_struct *consumer;
0035 static unsigned long read;
0036 
0037 static unsigned int disable_reader;
0038 module_param(disable_reader, uint, 0644);
0039 MODULE_PARM_DESC(disable_reader, "only run producer");
0040 
0041 static unsigned int write_iteration = 50;
0042 module_param(write_iteration, uint, 0644);
0043 MODULE_PARM_DESC(write_iteration, "# of writes between timestamp readings");
0044 
0045 static int producer_nice = MAX_NICE;
0046 static int consumer_nice = MAX_NICE;
0047 
0048 static int producer_fifo;
0049 static int consumer_fifo;
0050 
0051 module_param(producer_nice, int, 0644);
0052 MODULE_PARM_DESC(producer_nice, "nice prio for producer");
0053 
0054 module_param(consumer_nice, int, 0644);
0055 MODULE_PARM_DESC(consumer_nice, "nice prio for consumer");
0056 
0057 module_param(producer_fifo, int, 0644);
0058 MODULE_PARM_DESC(producer_fifo, "use fifo for producer: 0 - disabled, 1 - low prio, 2 - fifo");
0059 
0060 module_param(consumer_fifo, int, 0644);
0061 MODULE_PARM_DESC(consumer_fifo, "use fifo for consumer: 0 - disabled, 1 - low prio, 2 - fifo");
0062 
0063 static int read_events;
0064 
0065 static int test_error;
0066 
0067 #define TEST_ERROR()                \
0068     do {                    \
0069         if (!test_error) {      \
0070             test_error = 1;     \
0071             WARN_ON(1);     \
0072         }               \
0073     } while (0)
0074 
0075 enum event_status {
0076     EVENT_FOUND,
0077     EVENT_DROPPED,
0078 };
0079 
0080 static bool break_test(void)
0081 {
0082     return test_error || kthread_should_stop();
0083 }
0084 
0085 static enum event_status read_event(int cpu)
0086 {
0087     struct ring_buffer_event *event;
0088     int *entry;
0089     u64 ts;
0090 
0091     event = ring_buffer_consume(buffer, cpu, &ts, NULL);
0092     if (!event)
0093         return EVENT_DROPPED;
0094 
0095     entry = ring_buffer_event_data(event);
0096     if (*entry != cpu) {
0097         TEST_ERROR();
0098         return EVENT_DROPPED;
0099     }
0100 
0101     read++;
0102     return EVENT_FOUND;
0103 }
0104 
0105 static enum event_status read_page(int cpu)
0106 {
0107     struct ring_buffer_event *event;
0108     struct rb_page *rpage;
0109     unsigned long commit;
0110     void *bpage;
0111     int *entry;
0112     int ret;
0113     int inc;
0114     int i;
0115 
0116     bpage = ring_buffer_alloc_read_page(buffer, cpu);
0117     if (IS_ERR(bpage))
0118         return EVENT_DROPPED;
0119 
0120     ret = ring_buffer_read_page(buffer, &bpage, PAGE_SIZE, cpu, 1);
0121     if (ret >= 0) {
0122         rpage = bpage;
0123         /* The commit may have missed event flags set, clear them */
0124         commit = local_read(&rpage->commit) & 0xfffff;
0125         for (i = 0; i < commit && !test_error ; i += inc) {
0126 
0127             if (i >= (PAGE_SIZE - offsetof(struct rb_page, data))) {
0128                 TEST_ERROR();
0129                 break;
0130             }
0131 
0132             inc = -1;
0133             event = (void *)&rpage->data[i];
0134             switch (event->type_len) {
0135             case RINGBUF_TYPE_PADDING:
0136                 /* failed writes may be discarded events */
0137                 if (!event->time_delta)
0138                     TEST_ERROR();
0139                 inc = event->array[0] + 4;
0140                 break;
0141             case RINGBUF_TYPE_TIME_EXTEND:
0142                 inc = 8;
0143                 break;
0144             case 0:
0145                 entry = ring_buffer_event_data(event);
0146                 if (*entry != cpu) {
0147                     TEST_ERROR();
0148                     break;
0149                 }
0150                 read++;
0151                 if (!event->array[0]) {
0152                     TEST_ERROR();
0153                     break;
0154                 }
0155                 inc = event->array[0] + 4;
0156                 break;
0157             default:
0158                 entry = ring_buffer_event_data(event);
0159                 if (*entry != cpu) {
0160                     TEST_ERROR();
0161                     break;
0162                 }
0163                 read++;
0164                 inc = ((event->type_len + 1) * 4);
0165             }
0166             if (test_error)
0167                 break;
0168 
0169             if (inc <= 0) {
0170                 TEST_ERROR();
0171                 break;
0172             }
0173         }
0174     }
0175     ring_buffer_free_read_page(buffer, cpu, bpage);
0176 
0177     if (ret < 0)
0178         return EVENT_DROPPED;
0179     return EVENT_FOUND;
0180 }
0181 
0182 static void ring_buffer_consumer(void)
0183 {
0184     /* toggle between reading pages and events */
0185     read_events ^= 1;
0186 
0187     read = 0;
0188     /*
0189      * Continue running until the producer specifically asks to stop
0190      * and is ready for the completion.
0191      */
0192     while (!READ_ONCE(reader_finish)) {
0193         int found = 1;
0194 
0195         while (found && !test_error) {
0196             int cpu;
0197 
0198             found = 0;
0199             for_each_online_cpu(cpu) {
0200                 enum event_status stat;
0201 
0202                 if (read_events)
0203                     stat = read_event(cpu);
0204                 else
0205                     stat = read_page(cpu);
0206 
0207                 if (test_error)
0208                     break;
0209 
0210                 if (stat == EVENT_FOUND)
0211                     found = 1;
0212 
0213             }
0214         }
0215 
0216         /* Wait till the producer wakes us up when there is more data
0217          * available or when the producer wants us to finish reading.
0218          */
0219         set_current_state(TASK_INTERRUPTIBLE);
0220         if (reader_finish)
0221             break;
0222 
0223         schedule();
0224     }
0225     __set_current_state(TASK_RUNNING);
0226     reader_finish = 0;
0227     complete(&read_done);
0228 }
0229 
0230 static void ring_buffer_producer(void)
0231 {
0232     ktime_t start_time, end_time, timeout;
0233     unsigned long long time;
0234     unsigned long long entries;
0235     unsigned long long overruns;
0236     unsigned long missed = 0;
0237     unsigned long hit = 0;
0238     unsigned long avg;
0239     int cnt = 0;
0240 
0241     /*
0242      * Hammer the buffer for 10 secs (this may
0243      * make the system stall)
0244      */
0245     trace_printk("Starting ring buffer hammer\n");
0246     start_time = ktime_get();
0247     timeout = ktime_add_ns(start_time, RUN_TIME * NSEC_PER_SEC);
0248     do {
0249         struct ring_buffer_event *event;
0250         int *entry;
0251         int i;
0252 
0253         for (i = 0; i < write_iteration; i++) {
0254             event = ring_buffer_lock_reserve(buffer, 10);
0255             if (!event) {
0256                 missed++;
0257             } else {
0258                 hit++;
0259                 entry = ring_buffer_event_data(event);
0260                 *entry = smp_processor_id();
0261                 ring_buffer_unlock_commit(buffer, event);
0262             }
0263         }
0264         end_time = ktime_get();
0265 
0266         cnt++;
0267         if (consumer && !(cnt % wakeup_interval))
0268             wake_up_process(consumer);
0269 
0270 #ifndef CONFIG_PREEMPTION
0271         /*
0272          * If we are a non preempt kernel, the 10 seconds run will
0273          * stop everything while it runs. Instead, we will call
0274          * cond_resched and also add any time that was lost by a
0275          * reschedule.
0276          *
0277          * Do a cond resched at the same frequency we would wake up
0278          * the reader.
0279          */
0280         if (cnt % wakeup_interval)
0281             cond_resched();
0282 #endif
0283     } while (ktime_before(end_time, timeout) && !break_test());
0284     trace_printk("End ring buffer hammer\n");
0285 
0286     if (consumer) {
0287         /* Init both completions here to avoid races */
0288         init_completion(&read_start);
0289         init_completion(&read_done);
0290         /* the completions must be visible before the finish var */
0291         smp_wmb();
0292         reader_finish = 1;
0293         wake_up_process(consumer);
0294         wait_for_completion(&read_done);
0295     }
0296 
0297     time = ktime_us_delta(end_time, start_time);
0298 
0299     entries = ring_buffer_entries(buffer);
0300     overruns = ring_buffer_overruns(buffer);
0301 
0302     if (test_error)
0303         trace_printk("ERROR!\n");
0304 
0305     if (!disable_reader) {
0306         if (consumer_fifo)
0307             trace_printk("Running Consumer at SCHED_FIFO %s\n",
0308                      consumer_fifo == 1 ? "low" : "high");
0309         else
0310             trace_printk("Running Consumer at nice: %d\n",
0311                      consumer_nice);
0312     }
0313     if (producer_fifo)
0314         trace_printk("Running Producer at SCHED_FIFO %s\n",
0315                  producer_fifo == 1 ? "low" : "high");
0316     else
0317         trace_printk("Running Producer at nice: %d\n",
0318                  producer_nice);
0319 
0320     /* Let the user know that the test is running at low priority */
0321     if (!producer_fifo && !consumer_fifo &&
0322         producer_nice == MAX_NICE && consumer_nice == MAX_NICE)
0323         trace_printk("WARNING!!! This test is running at lowest priority.\n");
0324 
0325     trace_printk("Time:     %lld (usecs)\n", time);
0326     trace_printk("Overruns: %lld\n", overruns);
0327     if (disable_reader)
0328         trace_printk("Read:     (reader disabled)\n");
0329     else
0330         trace_printk("Read:     %ld  (by %s)\n", read,
0331             read_events ? "events" : "pages");
0332     trace_printk("Entries:  %lld\n", entries);
0333     trace_printk("Total:    %lld\n", entries + overruns + read);
0334     trace_printk("Missed:   %ld\n", missed);
0335     trace_printk("Hit:      %ld\n", hit);
0336 
0337     /* Convert time from usecs to millisecs */
0338     do_div(time, USEC_PER_MSEC);
0339     if (time)
0340         hit /= (long)time;
0341     else
0342         trace_printk("TIME IS ZERO??\n");
0343 
0344     trace_printk("Entries per millisec: %ld\n", hit);
0345 
0346     if (hit) {
0347         /* Calculate the average time in nanosecs */
0348         avg = NSEC_PER_MSEC / hit;
0349         trace_printk("%ld ns per entry\n", avg);
0350     }
0351 
0352     if (missed) {
0353         if (time)
0354             missed /= (long)time;
0355 
0356         trace_printk("Total iterations per millisec: %ld\n",
0357                  hit + missed);
0358 
0359         /* it is possible that hit + missed will overflow and be zero */
0360         if (!(hit + missed)) {
0361             trace_printk("hit + missed overflowed and totalled zero!\n");
0362             hit--; /* make it non zero */
0363         }
0364 
0365         /* Calculate the average time in nanosecs */
0366         avg = NSEC_PER_MSEC / (hit + missed);
0367         trace_printk("%ld ns per entry\n", avg);
0368     }
0369 }
0370 
0371 static void wait_to_die(void)
0372 {
0373     set_current_state(TASK_INTERRUPTIBLE);
0374     while (!kthread_should_stop()) {
0375         schedule();
0376         set_current_state(TASK_INTERRUPTIBLE);
0377     }
0378     __set_current_state(TASK_RUNNING);
0379 }
0380 
0381 static int ring_buffer_consumer_thread(void *arg)
0382 {
0383     while (!break_test()) {
0384         complete(&read_start);
0385 
0386         ring_buffer_consumer();
0387 
0388         set_current_state(TASK_INTERRUPTIBLE);
0389         if (break_test())
0390             break;
0391         schedule();
0392     }
0393     __set_current_state(TASK_RUNNING);
0394 
0395     if (!kthread_should_stop())
0396         wait_to_die();
0397 
0398     return 0;
0399 }
0400 
0401 static int ring_buffer_producer_thread(void *arg)
0402 {
0403     while (!break_test()) {
0404         ring_buffer_reset(buffer);
0405 
0406         if (consumer) {
0407             wake_up_process(consumer);
0408             wait_for_completion(&read_start);
0409         }
0410 
0411         ring_buffer_producer();
0412         if (break_test())
0413             goto out_kill;
0414 
0415         trace_printk("Sleeping for 10 secs\n");
0416         set_current_state(TASK_INTERRUPTIBLE);
0417         if (break_test())
0418             goto out_kill;
0419         schedule_timeout(HZ * SLEEP_TIME);
0420     }
0421 
0422 out_kill:
0423     __set_current_state(TASK_RUNNING);
0424     if (!kthread_should_stop())
0425         wait_to_die();
0426 
0427     return 0;
0428 }
0429 
0430 static int __init ring_buffer_benchmark_init(void)
0431 {
0432     int ret;
0433 
0434     /* make a one meg buffer in overwite mode */
0435     buffer = ring_buffer_alloc(1000000, RB_FL_OVERWRITE);
0436     if (!buffer)
0437         return -ENOMEM;
0438 
0439     if (!disable_reader) {
0440         consumer = kthread_create(ring_buffer_consumer_thread,
0441                       NULL, "rb_consumer");
0442         ret = PTR_ERR(consumer);
0443         if (IS_ERR(consumer))
0444             goto out_fail;
0445     }
0446 
0447     producer = kthread_run(ring_buffer_producer_thread,
0448                    NULL, "rb_producer");
0449     ret = PTR_ERR(producer);
0450 
0451     if (IS_ERR(producer))
0452         goto out_kill;
0453 
0454     /*
0455      * Run them as low-prio background tasks by default:
0456      */
0457     if (!disable_reader) {
0458         if (consumer_fifo >= 2)
0459             sched_set_fifo(consumer);
0460         else if (consumer_fifo == 1)
0461             sched_set_fifo_low(consumer);
0462         else
0463             set_user_nice(consumer, consumer_nice);
0464     }
0465 
0466     if (producer_fifo >= 2)
0467         sched_set_fifo(producer);
0468     else if (producer_fifo == 1)
0469         sched_set_fifo_low(producer);
0470     else
0471         set_user_nice(producer, producer_nice);
0472 
0473     return 0;
0474 
0475  out_kill:
0476     if (consumer)
0477         kthread_stop(consumer);
0478 
0479  out_fail:
0480     ring_buffer_free(buffer);
0481     return ret;
0482 }
0483 
0484 static void __exit ring_buffer_benchmark_exit(void)
0485 {
0486     kthread_stop(producer);
0487     if (consumer)
0488         kthread_stop(consumer);
0489     ring_buffer_free(buffer);
0490 }
0491 
0492 module_init(ring_buffer_benchmark_init);
0493 module_exit(ring_buffer_benchmark_exit);
0494 
0495 MODULE_AUTHOR("Steven Rostedt");
0496 MODULE_DESCRIPTION("ring_buffer_benchmark");
0497 MODULE_LICENSE("GPL");