0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef _LINUX_PTR_RING_H
0018 #define _LINUX_PTR_RING_H 1
0019
0020 #ifdef __KERNEL__
0021 #include <linux/spinlock.h>
0022 #include <linux/cache.h>
0023 #include <linux/types.h>
0024 #include <linux/compiler.h>
0025 #include <linux/slab.h>
0026 #include <linux/mm.h>
0027 #include <asm/errno.h>
0028 #endif
0029
0030 struct ptr_ring {
0031 int producer ____cacheline_aligned_in_smp;
0032 spinlock_t producer_lock;
0033 int consumer_head ____cacheline_aligned_in_smp;
0034 int consumer_tail;
0035 spinlock_t consumer_lock;
0036
0037
0038 int size ____cacheline_aligned_in_smp;
0039 int batch;
0040 void **queue;
0041 };
0042
0043
0044
0045
0046
0047
0048
0049 static inline bool __ptr_ring_full(struct ptr_ring *r)
0050 {
0051 return r->queue[r->producer];
0052 }
0053
0054 static inline bool ptr_ring_full(struct ptr_ring *r)
0055 {
0056 bool ret;
0057
0058 spin_lock(&r->producer_lock);
0059 ret = __ptr_ring_full(r);
0060 spin_unlock(&r->producer_lock);
0061
0062 return ret;
0063 }
0064
0065 static inline bool ptr_ring_full_irq(struct ptr_ring *r)
0066 {
0067 bool ret;
0068
0069 spin_lock_irq(&r->producer_lock);
0070 ret = __ptr_ring_full(r);
0071 spin_unlock_irq(&r->producer_lock);
0072
0073 return ret;
0074 }
0075
0076 static inline bool ptr_ring_full_any(struct ptr_ring *r)
0077 {
0078 unsigned long flags;
0079 bool ret;
0080
0081 spin_lock_irqsave(&r->producer_lock, flags);
0082 ret = __ptr_ring_full(r);
0083 spin_unlock_irqrestore(&r->producer_lock, flags);
0084
0085 return ret;
0086 }
0087
0088 static inline bool ptr_ring_full_bh(struct ptr_ring *r)
0089 {
0090 bool ret;
0091
0092 spin_lock_bh(&r->producer_lock);
0093 ret = __ptr_ring_full(r);
0094 spin_unlock_bh(&r->producer_lock);
0095
0096 return ret;
0097 }
0098
0099
0100
0101
0102
0103
0104 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
0105 {
0106 if (unlikely(!r->size) || r->queue[r->producer])
0107 return -ENOSPC;
0108
0109
0110
0111 smp_wmb();
0112
0113 WRITE_ONCE(r->queue[r->producer++], ptr);
0114 if (unlikely(r->producer >= r->size))
0115 r->producer = 0;
0116 return 0;
0117 }
0118
0119
0120
0121
0122
0123
0124 static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
0125 {
0126 int ret;
0127
0128 spin_lock(&r->producer_lock);
0129 ret = __ptr_ring_produce(r, ptr);
0130 spin_unlock(&r->producer_lock);
0131
0132 return ret;
0133 }
0134
0135 static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
0136 {
0137 int ret;
0138
0139 spin_lock_irq(&r->producer_lock);
0140 ret = __ptr_ring_produce(r, ptr);
0141 spin_unlock_irq(&r->producer_lock);
0142
0143 return ret;
0144 }
0145
0146 static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
0147 {
0148 unsigned long flags;
0149 int ret;
0150
0151 spin_lock_irqsave(&r->producer_lock, flags);
0152 ret = __ptr_ring_produce(r, ptr);
0153 spin_unlock_irqrestore(&r->producer_lock, flags);
0154
0155 return ret;
0156 }
0157
0158 static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
0159 {
0160 int ret;
0161
0162 spin_lock_bh(&r->producer_lock);
0163 ret = __ptr_ring_produce(r, ptr);
0164 spin_unlock_bh(&r->producer_lock);
0165
0166 return ret;
0167 }
0168
0169 static inline void *__ptr_ring_peek(struct ptr_ring *r)
0170 {
0171 if (likely(r->size))
0172 return READ_ONCE(r->queue[r->consumer_head]);
0173 return NULL;
0174 }
0175
0176
0177
0178
0179
0180
0181
0182
0183
0184
0185
0186
0187
0188
0189
0190
0191
0192
0193
0194 static inline bool __ptr_ring_empty(struct ptr_ring *r)
0195 {
0196 if (likely(r->size))
0197 return !r->queue[READ_ONCE(r->consumer_head)];
0198 return true;
0199 }
0200
0201 static inline bool ptr_ring_empty(struct ptr_ring *r)
0202 {
0203 bool ret;
0204
0205 spin_lock(&r->consumer_lock);
0206 ret = __ptr_ring_empty(r);
0207 spin_unlock(&r->consumer_lock);
0208
0209 return ret;
0210 }
0211
0212 static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
0213 {
0214 bool ret;
0215
0216 spin_lock_irq(&r->consumer_lock);
0217 ret = __ptr_ring_empty(r);
0218 spin_unlock_irq(&r->consumer_lock);
0219
0220 return ret;
0221 }
0222
0223 static inline bool ptr_ring_empty_any(struct ptr_ring *r)
0224 {
0225 unsigned long flags;
0226 bool ret;
0227
0228 spin_lock_irqsave(&r->consumer_lock, flags);
0229 ret = __ptr_ring_empty(r);
0230 spin_unlock_irqrestore(&r->consumer_lock, flags);
0231
0232 return ret;
0233 }
0234
0235 static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
0236 {
0237 bool ret;
0238
0239 spin_lock_bh(&r->consumer_lock);
0240 ret = __ptr_ring_empty(r);
0241 spin_unlock_bh(&r->consumer_lock);
0242
0243 return ret;
0244 }
0245
0246
0247 static inline void __ptr_ring_discard_one(struct ptr_ring *r)
0248 {
0249
0250
0251
0252
0253
0254
0255
0256
0257
0258
0259
0260
0261
0262
0263
0264 int consumer_head = r->consumer_head;
0265 int head = consumer_head++;
0266
0267
0268
0269
0270
0271
0272 if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
0273 consumer_head >= r->size)) {
0274
0275
0276
0277
0278
0279 while (likely(head >= r->consumer_tail))
0280 r->queue[head--] = NULL;
0281 r->consumer_tail = consumer_head;
0282 }
0283 if (unlikely(consumer_head >= r->size)) {
0284 consumer_head = 0;
0285 r->consumer_tail = 0;
0286 }
0287
0288 WRITE_ONCE(r->consumer_head, consumer_head);
0289 }
0290
0291 static inline void *__ptr_ring_consume(struct ptr_ring *r)
0292 {
0293 void *ptr;
0294
0295
0296
0297
0298
0299 ptr = __ptr_ring_peek(r);
0300 if (ptr)
0301 __ptr_ring_discard_one(r);
0302
0303 return ptr;
0304 }
0305
0306 static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
0307 void **array, int n)
0308 {
0309 void *ptr;
0310 int i;
0311
0312 for (i = 0; i < n; i++) {
0313 ptr = __ptr_ring_consume(r);
0314 if (!ptr)
0315 break;
0316 array[i] = ptr;
0317 }
0318
0319 return i;
0320 }
0321
0322
0323
0324
0325
0326
0327 static inline void *ptr_ring_consume(struct ptr_ring *r)
0328 {
0329 void *ptr;
0330
0331 spin_lock(&r->consumer_lock);
0332 ptr = __ptr_ring_consume(r);
0333 spin_unlock(&r->consumer_lock);
0334
0335 return ptr;
0336 }
0337
0338 static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
0339 {
0340 void *ptr;
0341
0342 spin_lock_irq(&r->consumer_lock);
0343 ptr = __ptr_ring_consume(r);
0344 spin_unlock_irq(&r->consumer_lock);
0345
0346 return ptr;
0347 }
0348
0349 static inline void *ptr_ring_consume_any(struct ptr_ring *r)
0350 {
0351 unsigned long flags;
0352 void *ptr;
0353
0354 spin_lock_irqsave(&r->consumer_lock, flags);
0355 ptr = __ptr_ring_consume(r);
0356 spin_unlock_irqrestore(&r->consumer_lock, flags);
0357
0358 return ptr;
0359 }
0360
0361 static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
0362 {
0363 void *ptr;
0364
0365 spin_lock_bh(&r->consumer_lock);
0366 ptr = __ptr_ring_consume(r);
0367 spin_unlock_bh(&r->consumer_lock);
0368
0369 return ptr;
0370 }
0371
0372 static inline int ptr_ring_consume_batched(struct ptr_ring *r,
0373 void **array, int n)
0374 {
0375 int ret;
0376
0377 spin_lock(&r->consumer_lock);
0378 ret = __ptr_ring_consume_batched(r, array, n);
0379 spin_unlock(&r->consumer_lock);
0380
0381 return ret;
0382 }
0383
0384 static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
0385 void **array, int n)
0386 {
0387 int ret;
0388
0389 spin_lock_irq(&r->consumer_lock);
0390 ret = __ptr_ring_consume_batched(r, array, n);
0391 spin_unlock_irq(&r->consumer_lock);
0392
0393 return ret;
0394 }
0395
0396 static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
0397 void **array, int n)
0398 {
0399 unsigned long flags;
0400 int ret;
0401
0402 spin_lock_irqsave(&r->consumer_lock, flags);
0403 ret = __ptr_ring_consume_batched(r, array, n);
0404 spin_unlock_irqrestore(&r->consumer_lock, flags);
0405
0406 return ret;
0407 }
0408
0409 static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
0410 void **array, int n)
0411 {
0412 int ret;
0413
0414 spin_lock_bh(&r->consumer_lock);
0415 ret = __ptr_ring_consume_batched(r, array, n);
0416 spin_unlock_bh(&r->consumer_lock);
0417
0418 return ret;
0419 }
0420
0421
0422
0423
0424
0425 #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
0426
0427 #define PTR_RING_PEEK_CALL(r, f) ({ \
0428 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
0429 \
0430 spin_lock(&(r)->consumer_lock); \
0431 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
0432 spin_unlock(&(r)->consumer_lock); \
0433 __PTR_RING_PEEK_CALL_v; \
0434 })
0435
0436 #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
0437 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
0438 \
0439 spin_lock_irq(&(r)->consumer_lock); \
0440 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
0441 spin_unlock_irq(&(r)->consumer_lock); \
0442 __PTR_RING_PEEK_CALL_v; \
0443 })
0444
0445 #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
0446 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
0447 \
0448 spin_lock_bh(&(r)->consumer_lock); \
0449 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
0450 spin_unlock_bh(&(r)->consumer_lock); \
0451 __PTR_RING_PEEK_CALL_v; \
0452 })
0453
0454 #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
0455 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
0456 unsigned long __PTR_RING_PEEK_CALL_f;\
0457 \
0458 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
0459 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
0460 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
0461 __PTR_RING_PEEK_CALL_v; \
0462 })
0463
0464
0465
0466
0467 static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
0468 {
0469 if (size > KMALLOC_MAX_SIZE / sizeof(void *))
0470 return NULL;
0471 return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
0472 }
0473
0474 static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
0475 {
0476 r->size = size;
0477 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
0478
0479
0480
0481
0482
0483 if (r->batch > r->size / 2 || !r->batch)
0484 r->batch = 1;
0485 }
0486
0487 static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
0488 {
0489 r->queue = __ptr_ring_init_queue_alloc(size, gfp);
0490 if (!r->queue)
0491 return -ENOMEM;
0492
0493 __ptr_ring_set_size(r, size);
0494 r->producer = r->consumer_head = r->consumer_tail = 0;
0495 spin_lock_init(&r->producer_lock);
0496 spin_lock_init(&r->consumer_lock);
0497
0498 return 0;
0499 }
0500
0501
0502
0503
0504
0505
0506
0507
0508
0509
0510
0511 static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
0512 void (*destroy)(void *))
0513 {
0514 unsigned long flags;
0515 int head;
0516
0517 spin_lock_irqsave(&r->consumer_lock, flags);
0518 spin_lock(&r->producer_lock);
0519
0520 if (!r->size)
0521 goto done;
0522
0523
0524
0525
0526
0527 head = r->consumer_head - 1;
0528 while (likely(head >= r->consumer_tail))
0529 r->queue[head--] = NULL;
0530 r->consumer_tail = r->consumer_head;
0531
0532
0533
0534
0535
0536 while (n) {
0537 head = r->consumer_head - 1;
0538 if (head < 0)
0539 head = r->size - 1;
0540 if (r->queue[head]) {
0541
0542 goto done;
0543 }
0544 r->queue[head] = batch[--n];
0545 r->consumer_tail = head;
0546
0547 WRITE_ONCE(r->consumer_head, head);
0548 }
0549
0550 done:
0551
0552 while (n)
0553 destroy(batch[--n]);
0554 spin_unlock(&r->producer_lock);
0555 spin_unlock_irqrestore(&r->consumer_lock, flags);
0556 }
0557
0558 static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
0559 int size, gfp_t gfp,
0560 void (*destroy)(void *))
0561 {
0562 int producer = 0;
0563 void **old;
0564 void *ptr;
0565
0566 while ((ptr = __ptr_ring_consume(r)))
0567 if (producer < size)
0568 queue[producer++] = ptr;
0569 else if (destroy)
0570 destroy(ptr);
0571
0572 if (producer >= size)
0573 producer = 0;
0574 __ptr_ring_set_size(r, size);
0575 r->producer = producer;
0576 r->consumer_head = 0;
0577 r->consumer_tail = 0;
0578 old = r->queue;
0579 r->queue = queue;
0580
0581 return old;
0582 }
0583
0584
0585
0586
0587
0588
0589
0590 static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
0591 void (*destroy)(void *))
0592 {
0593 unsigned long flags;
0594 void **queue = __ptr_ring_init_queue_alloc(size, gfp);
0595 void **old;
0596
0597 if (!queue)
0598 return -ENOMEM;
0599
0600 spin_lock_irqsave(&(r)->consumer_lock, flags);
0601 spin_lock(&(r)->producer_lock);
0602
0603 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
0604
0605 spin_unlock(&(r)->producer_lock);
0606 spin_unlock_irqrestore(&(r)->consumer_lock, flags);
0607
0608 kvfree(old);
0609
0610 return 0;
0611 }
0612
0613
0614
0615
0616
0617
0618
0619 static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
0620 unsigned int nrings,
0621 int size,
0622 gfp_t gfp, void (*destroy)(void *))
0623 {
0624 unsigned long flags;
0625 void ***queues;
0626 int i;
0627
0628 queues = kmalloc_array(nrings, sizeof(*queues), gfp);
0629 if (!queues)
0630 goto noqueues;
0631
0632 for (i = 0; i < nrings; ++i) {
0633 queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
0634 if (!queues[i])
0635 goto nomem;
0636 }
0637
0638 for (i = 0; i < nrings; ++i) {
0639 spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
0640 spin_lock(&(rings[i])->producer_lock);
0641 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
0642 size, gfp, destroy);
0643 spin_unlock(&(rings[i])->producer_lock);
0644 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
0645 }
0646
0647 for (i = 0; i < nrings; ++i)
0648 kvfree(queues[i]);
0649
0650 kfree(queues);
0651
0652 return 0;
0653
0654 nomem:
0655 while (--i >= 0)
0656 kvfree(queues[i]);
0657
0658 kfree(queues);
0659
0660 noqueues:
0661 return -ENOMEM;
0662 }
0663
0664 static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
0665 {
0666 void *ptr;
0667
0668 if (destroy)
0669 while ((ptr = ptr_ring_consume(r)))
0670 destroy(ptr);
0671 kvfree(r->queue);
0672 }
0673
0674 #endif