Back to home page

OSCL-LXR

 
 

    


0001 /* SPDX-License-Identifier: GPL-2.0-or-later */
0002 /*
0003  *  Definitions for the 'struct ptr_ring' datastructure.
0004  *
0005  *  Author:
0006  *      Michael S. Tsirkin <mst@redhat.com>
0007  *
0008  *  Copyright (C) 2016 Red Hat, Inc.
0009  *
0010  *  This is a limited-size FIFO maintaining pointers in FIFO order, with
0011  *  one CPU producing entries and another consuming entries from a FIFO.
0012  *
0013  *  This implementation tries to minimize cache-contention when there is a
0014  *  single producer and a single consumer CPU.
0015  */
0016 
0017 #ifndef _LINUX_PTR_RING_H
0018 #define _LINUX_PTR_RING_H 1
0019 
0020 #ifdef __KERNEL__
0021 #include <linux/spinlock.h>
0022 #include <linux/cache.h>
0023 #include <linux/types.h>
0024 #include <linux/compiler.h>
0025 #include <linux/slab.h>
0026 #include <linux/mm.h>
0027 #include <asm/errno.h>
0028 #endif
0029 
0030 struct ptr_ring {
0031     int producer ____cacheline_aligned_in_smp;
0032     spinlock_t producer_lock;
0033     int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
0034     int consumer_tail; /* next entry to invalidate */
0035     spinlock_t consumer_lock;
0036     /* Shared consumer/producer data */
0037     /* Read-only by both the producer and the consumer */
0038     int size ____cacheline_aligned_in_smp; /* max entries in queue */
0039     int batch; /* number of entries to consume in a batch */
0040     void **queue;
0041 };
0042 
0043 /* Note: callers invoking this in a loop must use a compiler barrier,
0044  * for example cpu_relax().
0045  *
0046  * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
0047  * see e.g. ptr_ring_full.
0048  */
0049 static inline bool __ptr_ring_full(struct ptr_ring *r)
0050 {
0051     return r->queue[r->producer];
0052 }
0053 
0054 static inline bool ptr_ring_full(struct ptr_ring *r)
0055 {
0056     bool ret;
0057 
0058     spin_lock(&r->producer_lock);
0059     ret = __ptr_ring_full(r);
0060     spin_unlock(&r->producer_lock);
0061 
0062     return ret;
0063 }
0064 
0065 static inline bool ptr_ring_full_irq(struct ptr_ring *r)
0066 {
0067     bool ret;
0068 
0069     spin_lock_irq(&r->producer_lock);
0070     ret = __ptr_ring_full(r);
0071     spin_unlock_irq(&r->producer_lock);
0072 
0073     return ret;
0074 }
0075 
0076 static inline bool ptr_ring_full_any(struct ptr_ring *r)
0077 {
0078     unsigned long flags;
0079     bool ret;
0080 
0081     spin_lock_irqsave(&r->producer_lock, flags);
0082     ret = __ptr_ring_full(r);
0083     spin_unlock_irqrestore(&r->producer_lock, flags);
0084 
0085     return ret;
0086 }
0087 
0088 static inline bool ptr_ring_full_bh(struct ptr_ring *r)
0089 {
0090     bool ret;
0091 
0092     spin_lock_bh(&r->producer_lock);
0093     ret = __ptr_ring_full(r);
0094     spin_unlock_bh(&r->producer_lock);
0095 
0096     return ret;
0097 }
0098 
0099 /* Note: callers invoking this in a loop must use a compiler barrier,
0100  * for example cpu_relax(). Callers must hold producer_lock.
0101  * Callers are responsible for making sure pointer that is being queued
0102  * points to a valid data.
0103  */
0104 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
0105 {
0106     if (unlikely(!r->size) || r->queue[r->producer])
0107         return -ENOSPC;
0108 
0109     /* Make sure the pointer we are storing points to a valid data. */
0110     /* Pairs with the dependency ordering in __ptr_ring_consume. */
0111     smp_wmb();
0112 
0113     WRITE_ONCE(r->queue[r->producer++], ptr);
0114     if (unlikely(r->producer >= r->size))
0115         r->producer = 0;
0116     return 0;
0117 }
0118 
0119 /*
0120  * Note: resize (below) nests producer lock within consumer lock, so if you
0121  * consume in interrupt or BH context, you must disable interrupts/BH when
0122  * calling this.
0123  */
0124 static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
0125 {
0126     int ret;
0127 
0128     spin_lock(&r->producer_lock);
0129     ret = __ptr_ring_produce(r, ptr);
0130     spin_unlock(&r->producer_lock);
0131 
0132     return ret;
0133 }
0134 
0135 static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
0136 {
0137     int ret;
0138 
0139     spin_lock_irq(&r->producer_lock);
0140     ret = __ptr_ring_produce(r, ptr);
0141     spin_unlock_irq(&r->producer_lock);
0142 
0143     return ret;
0144 }
0145 
0146 static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
0147 {
0148     unsigned long flags;
0149     int ret;
0150 
0151     spin_lock_irqsave(&r->producer_lock, flags);
0152     ret = __ptr_ring_produce(r, ptr);
0153     spin_unlock_irqrestore(&r->producer_lock, flags);
0154 
0155     return ret;
0156 }
0157 
0158 static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
0159 {
0160     int ret;
0161 
0162     spin_lock_bh(&r->producer_lock);
0163     ret = __ptr_ring_produce(r, ptr);
0164     spin_unlock_bh(&r->producer_lock);
0165 
0166     return ret;
0167 }
0168 
0169 static inline void *__ptr_ring_peek(struct ptr_ring *r)
0170 {
0171     if (likely(r->size))
0172         return READ_ONCE(r->queue[r->consumer_head]);
0173     return NULL;
0174 }
0175 
0176 /*
0177  * Test ring empty status without taking any locks.
0178  *
0179  * NB: This is only safe to call if ring is never resized.
0180  *
0181  * However, if some other CPU consumes ring entries at the same time, the value
0182  * returned is not guaranteed to be correct.
0183  *
0184  * In this case - to avoid incorrectly detecting the ring
0185  * as empty - the CPU consuming the ring entries is responsible
0186  * for either consuming all ring entries until the ring is empty,
0187  * or synchronizing with some other CPU and causing it to
0188  * re-test __ptr_ring_empty and/or consume the ring enteries
0189  * after the synchronization point.
0190  *
0191  * Note: callers invoking this in a loop must use a compiler barrier,
0192  * for example cpu_relax().
0193  */
0194 static inline bool __ptr_ring_empty(struct ptr_ring *r)
0195 {
0196     if (likely(r->size))
0197         return !r->queue[READ_ONCE(r->consumer_head)];
0198     return true;
0199 }
0200 
0201 static inline bool ptr_ring_empty(struct ptr_ring *r)
0202 {
0203     bool ret;
0204 
0205     spin_lock(&r->consumer_lock);
0206     ret = __ptr_ring_empty(r);
0207     spin_unlock(&r->consumer_lock);
0208 
0209     return ret;
0210 }
0211 
0212 static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
0213 {
0214     bool ret;
0215 
0216     spin_lock_irq(&r->consumer_lock);
0217     ret = __ptr_ring_empty(r);
0218     spin_unlock_irq(&r->consumer_lock);
0219 
0220     return ret;
0221 }
0222 
0223 static inline bool ptr_ring_empty_any(struct ptr_ring *r)
0224 {
0225     unsigned long flags;
0226     bool ret;
0227 
0228     spin_lock_irqsave(&r->consumer_lock, flags);
0229     ret = __ptr_ring_empty(r);
0230     spin_unlock_irqrestore(&r->consumer_lock, flags);
0231 
0232     return ret;
0233 }
0234 
0235 static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
0236 {
0237     bool ret;
0238 
0239     spin_lock_bh(&r->consumer_lock);
0240     ret = __ptr_ring_empty(r);
0241     spin_unlock_bh(&r->consumer_lock);
0242 
0243     return ret;
0244 }
0245 
0246 /* Must only be called after __ptr_ring_peek returned !NULL */
0247 static inline void __ptr_ring_discard_one(struct ptr_ring *r)
0248 {
0249     /* Fundamentally, what we want to do is update consumer
0250      * index and zero out the entry so producer can reuse it.
0251      * Doing it naively at each consume would be as simple as:
0252      *       consumer = r->consumer;
0253      *       r->queue[consumer++] = NULL;
0254      *       if (unlikely(consumer >= r->size))
0255      *               consumer = 0;
0256      *       r->consumer = consumer;
0257      * but that is suboptimal when the ring is full as producer is writing
0258      * out new entries in the same cache line.  Defer these updates until a
0259      * batch of entries has been consumed.
0260      */
0261     /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
0262      * to work correctly.
0263      */
0264     int consumer_head = r->consumer_head;
0265     int head = consumer_head++;
0266 
0267     /* Once we have processed enough entries invalidate them in
0268      * the ring all at once so producer can reuse their space in the ring.
0269      * We also do this when we reach end of the ring - not mandatory
0270      * but helps keep the implementation simple.
0271      */
0272     if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
0273              consumer_head >= r->size)) {
0274         /* Zero out entries in the reverse order: this way we touch the
0275          * cache line that producer might currently be reading the last;
0276          * producer won't make progress and touch other cache lines
0277          * besides the first one until we write out all entries.
0278          */
0279         while (likely(head >= r->consumer_tail))
0280             r->queue[head--] = NULL;
0281         r->consumer_tail = consumer_head;
0282     }
0283     if (unlikely(consumer_head >= r->size)) {
0284         consumer_head = 0;
0285         r->consumer_tail = 0;
0286     }
0287     /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
0288     WRITE_ONCE(r->consumer_head, consumer_head);
0289 }
0290 
0291 static inline void *__ptr_ring_consume(struct ptr_ring *r)
0292 {
0293     void *ptr;
0294 
0295     /* The READ_ONCE in __ptr_ring_peek guarantees that anyone
0296      * accessing data through the pointer is up to date. Pairs
0297      * with smp_wmb in __ptr_ring_produce.
0298      */
0299     ptr = __ptr_ring_peek(r);
0300     if (ptr)
0301         __ptr_ring_discard_one(r);
0302 
0303     return ptr;
0304 }
0305 
0306 static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
0307                          void **array, int n)
0308 {
0309     void *ptr;
0310     int i;
0311 
0312     for (i = 0; i < n; i++) {
0313         ptr = __ptr_ring_consume(r);
0314         if (!ptr)
0315             break;
0316         array[i] = ptr;
0317     }
0318 
0319     return i;
0320 }
0321 
0322 /*
0323  * Note: resize (below) nests producer lock within consumer lock, so if you
0324  * call this in interrupt or BH context, you must disable interrupts/BH when
0325  * producing.
0326  */
0327 static inline void *ptr_ring_consume(struct ptr_ring *r)
0328 {
0329     void *ptr;
0330 
0331     spin_lock(&r->consumer_lock);
0332     ptr = __ptr_ring_consume(r);
0333     spin_unlock(&r->consumer_lock);
0334 
0335     return ptr;
0336 }
0337 
0338 static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
0339 {
0340     void *ptr;
0341 
0342     spin_lock_irq(&r->consumer_lock);
0343     ptr = __ptr_ring_consume(r);
0344     spin_unlock_irq(&r->consumer_lock);
0345 
0346     return ptr;
0347 }
0348 
0349 static inline void *ptr_ring_consume_any(struct ptr_ring *r)
0350 {
0351     unsigned long flags;
0352     void *ptr;
0353 
0354     spin_lock_irqsave(&r->consumer_lock, flags);
0355     ptr = __ptr_ring_consume(r);
0356     spin_unlock_irqrestore(&r->consumer_lock, flags);
0357 
0358     return ptr;
0359 }
0360 
0361 static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
0362 {
0363     void *ptr;
0364 
0365     spin_lock_bh(&r->consumer_lock);
0366     ptr = __ptr_ring_consume(r);
0367     spin_unlock_bh(&r->consumer_lock);
0368 
0369     return ptr;
0370 }
0371 
0372 static inline int ptr_ring_consume_batched(struct ptr_ring *r,
0373                        void **array, int n)
0374 {
0375     int ret;
0376 
0377     spin_lock(&r->consumer_lock);
0378     ret = __ptr_ring_consume_batched(r, array, n);
0379     spin_unlock(&r->consumer_lock);
0380 
0381     return ret;
0382 }
0383 
0384 static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
0385                            void **array, int n)
0386 {
0387     int ret;
0388 
0389     spin_lock_irq(&r->consumer_lock);
0390     ret = __ptr_ring_consume_batched(r, array, n);
0391     spin_unlock_irq(&r->consumer_lock);
0392 
0393     return ret;
0394 }
0395 
0396 static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
0397                            void **array, int n)
0398 {
0399     unsigned long flags;
0400     int ret;
0401 
0402     spin_lock_irqsave(&r->consumer_lock, flags);
0403     ret = __ptr_ring_consume_batched(r, array, n);
0404     spin_unlock_irqrestore(&r->consumer_lock, flags);
0405 
0406     return ret;
0407 }
0408 
0409 static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
0410                           void **array, int n)
0411 {
0412     int ret;
0413 
0414     spin_lock_bh(&r->consumer_lock);
0415     ret = __ptr_ring_consume_batched(r, array, n);
0416     spin_unlock_bh(&r->consumer_lock);
0417 
0418     return ret;
0419 }
0420 
0421 /* Cast to structure type and call a function without discarding from FIFO.
0422  * Function must return a value.
0423  * Callers must take consumer_lock.
0424  */
0425 #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
0426 
0427 #define PTR_RING_PEEK_CALL(r, f) ({ \
0428     typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
0429     \
0430     spin_lock(&(r)->consumer_lock); \
0431     __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
0432     spin_unlock(&(r)->consumer_lock); \
0433     __PTR_RING_PEEK_CALL_v; \
0434 })
0435 
0436 #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
0437     typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
0438     \
0439     spin_lock_irq(&(r)->consumer_lock); \
0440     __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
0441     spin_unlock_irq(&(r)->consumer_lock); \
0442     __PTR_RING_PEEK_CALL_v; \
0443 })
0444 
0445 #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
0446     typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
0447     \
0448     spin_lock_bh(&(r)->consumer_lock); \
0449     __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
0450     spin_unlock_bh(&(r)->consumer_lock); \
0451     __PTR_RING_PEEK_CALL_v; \
0452 })
0453 
0454 #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
0455     typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
0456     unsigned long __PTR_RING_PEEK_CALL_f;\
0457     \
0458     spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
0459     __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
0460     spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
0461     __PTR_RING_PEEK_CALL_v; \
0462 })
0463 
0464 /* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
0465  * documentation for vmalloc for which of them are legal.
0466  */
0467 static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
0468 {
0469     if (size > KMALLOC_MAX_SIZE / sizeof(void *))
0470         return NULL;
0471     return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
0472 }
0473 
0474 static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
0475 {
0476     r->size = size;
0477     r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
0478     /* We need to set batch at least to 1 to make logic
0479      * in __ptr_ring_discard_one work correctly.
0480      * Batching too much (because ring is small) would cause a lot of
0481      * burstiness. Needs tuning, for now disable batching.
0482      */
0483     if (r->batch > r->size / 2 || !r->batch)
0484         r->batch = 1;
0485 }
0486 
0487 static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
0488 {
0489     r->queue = __ptr_ring_init_queue_alloc(size, gfp);
0490     if (!r->queue)
0491         return -ENOMEM;
0492 
0493     __ptr_ring_set_size(r, size);
0494     r->producer = r->consumer_head = r->consumer_tail = 0;
0495     spin_lock_init(&r->producer_lock);
0496     spin_lock_init(&r->consumer_lock);
0497 
0498     return 0;
0499 }
0500 
0501 /*
0502  * Return entries into ring. Destroy entries that don't fit.
0503  *
0504  * Note: this is expected to be a rare slow path operation.
0505  *
0506  * Note: producer lock is nested within consumer lock, so if you
0507  * resize you must make sure all uses nest correctly.
0508  * In particular if you consume ring in interrupt or BH context, you must
0509  * disable interrupts/BH when doing so.
0510  */
0511 static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
0512                       void (*destroy)(void *))
0513 {
0514     unsigned long flags;
0515     int head;
0516 
0517     spin_lock_irqsave(&r->consumer_lock, flags);
0518     spin_lock(&r->producer_lock);
0519 
0520     if (!r->size)
0521         goto done;
0522 
0523     /*
0524      * Clean out buffered entries (for simplicity). This way following code
0525      * can test entries for NULL and if not assume they are valid.
0526      */
0527     head = r->consumer_head - 1;
0528     while (likely(head >= r->consumer_tail))
0529         r->queue[head--] = NULL;
0530     r->consumer_tail = r->consumer_head;
0531 
0532     /*
0533      * Go over entries in batch, start moving head back and copy entries.
0534      * Stop when we run into previously unconsumed entries.
0535      */
0536     while (n) {
0537         head = r->consumer_head - 1;
0538         if (head < 0)
0539             head = r->size - 1;
0540         if (r->queue[head]) {
0541             /* This batch entry will have to be destroyed. */
0542             goto done;
0543         }
0544         r->queue[head] = batch[--n];
0545         r->consumer_tail = head;
0546         /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
0547         WRITE_ONCE(r->consumer_head, head);
0548     }
0549 
0550 done:
0551     /* Destroy all entries left in the batch. */
0552     while (n)
0553         destroy(batch[--n]);
0554     spin_unlock(&r->producer_lock);
0555     spin_unlock_irqrestore(&r->consumer_lock, flags);
0556 }
0557 
0558 static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
0559                        int size, gfp_t gfp,
0560                        void (*destroy)(void *))
0561 {
0562     int producer = 0;
0563     void **old;
0564     void *ptr;
0565 
0566     while ((ptr = __ptr_ring_consume(r)))
0567         if (producer < size)
0568             queue[producer++] = ptr;
0569         else if (destroy)
0570             destroy(ptr);
0571 
0572     if (producer >= size)
0573         producer = 0;
0574     __ptr_ring_set_size(r, size);
0575     r->producer = producer;
0576     r->consumer_head = 0;
0577     r->consumer_tail = 0;
0578     old = r->queue;
0579     r->queue = queue;
0580 
0581     return old;
0582 }
0583 
0584 /*
0585  * Note: producer lock is nested within consumer lock, so if you
0586  * resize you must make sure all uses nest correctly.
0587  * In particular if you consume ring in interrupt or BH context, you must
0588  * disable interrupts/BH when doing so.
0589  */
0590 static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
0591                   void (*destroy)(void *))
0592 {
0593     unsigned long flags;
0594     void **queue = __ptr_ring_init_queue_alloc(size, gfp);
0595     void **old;
0596 
0597     if (!queue)
0598         return -ENOMEM;
0599 
0600     spin_lock_irqsave(&(r)->consumer_lock, flags);
0601     spin_lock(&(r)->producer_lock);
0602 
0603     old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
0604 
0605     spin_unlock(&(r)->producer_lock);
0606     spin_unlock_irqrestore(&(r)->consumer_lock, flags);
0607 
0608     kvfree(old);
0609 
0610     return 0;
0611 }
0612 
0613 /*
0614  * Note: producer lock is nested within consumer lock, so if you
0615  * resize you must make sure all uses nest correctly.
0616  * In particular if you consume ring in interrupt or BH context, you must
0617  * disable interrupts/BH when doing so.
0618  */
0619 static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
0620                        unsigned int nrings,
0621                        int size,
0622                        gfp_t gfp, void (*destroy)(void *))
0623 {
0624     unsigned long flags;
0625     void ***queues;
0626     int i;
0627 
0628     queues = kmalloc_array(nrings, sizeof(*queues), gfp);
0629     if (!queues)
0630         goto noqueues;
0631 
0632     for (i = 0; i < nrings; ++i) {
0633         queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
0634         if (!queues[i])
0635             goto nomem;
0636     }
0637 
0638     for (i = 0; i < nrings; ++i) {
0639         spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
0640         spin_lock(&(rings[i])->producer_lock);
0641         queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
0642                           size, gfp, destroy);
0643         spin_unlock(&(rings[i])->producer_lock);
0644         spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
0645     }
0646 
0647     for (i = 0; i < nrings; ++i)
0648         kvfree(queues[i]);
0649 
0650     kfree(queues);
0651 
0652     return 0;
0653 
0654 nomem:
0655     while (--i >= 0)
0656         kvfree(queues[i]);
0657 
0658     kfree(queues);
0659 
0660 noqueues:
0661     return -ENOMEM;
0662 }
0663 
0664 static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
0665 {
0666     void *ptr;
0667 
0668     if (destroy)
0669         while ((ptr = ptr_ring_consume(r)))
0670             destroy(ptr);
0671     kvfree(r->queue);
0672 }
0673 
0674 #endif /* _LINUX_PTR_RING_H  */