Back to home page

OSCL-LXR

 
 

    


0001 /* SPDX-License-Identifier: GPL-2.0 */
0002 /* XDP user-space ring structure
0003  * Copyright(c) 2018 Intel Corporation.
0004  */
0005 
0006 #ifndef _LINUX_XSK_QUEUE_H
0007 #define _LINUX_XSK_QUEUE_H
0008 
0009 #include <linux/types.h>
0010 #include <linux/if_xdp.h>
0011 #include <net/xdp_sock.h>
0012 #include <net/xsk_buff_pool.h>
0013 
0014 #include "xsk.h"
0015 
0016 struct xdp_ring {
0017     u32 producer ____cacheline_aligned_in_smp;
0018     /* Hinder the adjacent cache prefetcher to prefetch the consumer
0019      * pointer if the producer pointer is touched and vice versa.
0020      */
0021     u32 pad1 ____cacheline_aligned_in_smp;
0022     u32 consumer ____cacheline_aligned_in_smp;
0023     u32 pad2 ____cacheline_aligned_in_smp;
0024     u32 flags;
0025     u32 pad3 ____cacheline_aligned_in_smp;
0026 };
0027 
0028 /* Used for the RX and TX queues for packets */
0029 struct xdp_rxtx_ring {
0030     struct xdp_ring ptrs;
0031     struct xdp_desc desc[] ____cacheline_aligned_in_smp;
0032 };
0033 
0034 /* Used for the fill and completion queues for buffers */
0035 struct xdp_umem_ring {
0036     struct xdp_ring ptrs;
0037     u64 desc[] ____cacheline_aligned_in_smp;
0038 };
0039 
0040 struct xsk_queue {
0041     u32 ring_mask;
0042     u32 nentries;
0043     u32 cached_prod;
0044     u32 cached_cons;
0045     struct xdp_ring *ring;
0046     u64 invalid_descs;
0047     u64 queue_empty_descs;
0048 };
0049 
0050 /* The structure of the shared state of the rings are a simple
0051  * circular buffer, as outlined in
0052  * Documentation/core-api/circular-buffers.rst. For the Rx and
0053  * completion ring, the kernel is the producer and user space is the
0054  * consumer. For the Tx and fill rings, the kernel is the consumer and
0055  * user space is the producer.
0056  *
0057  * producer                         consumer
0058  *
0059  * if (LOAD ->consumer) {  (A)      LOAD.acq ->producer  (C)
0060  *    STORE $data                   LOAD $data
0061  *    STORE.rel ->producer (B)      STORE.rel ->consumer (D)
0062  * }
0063  *
0064  * (A) pairs with (D), and (B) pairs with (C).
0065  *
0066  * Starting with (B), it protects the data from being written after
0067  * the producer pointer. If this barrier was missing, the consumer
0068  * could observe the producer pointer being set and thus load the data
0069  * before the producer has written the new data. The consumer would in
0070  * this case load the old data.
0071  *
0072  * (C) protects the consumer from speculatively loading the data before
0073  * the producer pointer actually has been read. If we do not have this
0074  * barrier, some architectures could load old data as speculative loads
0075  * are not discarded as the CPU does not know there is a dependency
0076  * between ->producer and data.
0077  *
0078  * (A) is a control dependency that separates the load of ->consumer
0079  * from the stores of $data. In case ->consumer indicates there is no
0080  * room in the buffer to store $data we do not. The dependency will
0081  * order both of the stores after the loads. So no barrier is needed.
0082  *
0083  * (D) protects the load of the data to be observed to happen after the
0084  * store of the consumer pointer. If we did not have this memory
0085  * barrier, the producer could observe the consumer pointer being set
0086  * and overwrite the data with a new value before the consumer got the
0087  * chance to read the old value. The consumer would thus miss reading
0088  * the old entry and very likely read the new entry twice, once right
0089  * now and again after circling through the ring.
0090  */
0091 
0092 /* The operations on the rings are the following:
0093  *
0094  * producer                           consumer
0095  *
0096  * RESERVE entries                    PEEK in the ring for entries
0097  * WRITE data into the ring           READ data from the ring
0098  * SUBMIT entries                     RELEASE entries
0099  *
0100  * The producer reserves one or more entries in the ring. It can then
0101  * fill in these entries and finally submit them so that they can be
0102  * seen and read by the consumer.
0103  *
0104  * The consumer peeks into the ring to see if the producer has written
0105  * any new entries. If so, the consumer can then read these entries
0106  * and when it is done reading them release them back to the producer
0107  * so that the producer can use these slots to fill in new entries.
0108  *
0109  * The function names below reflect these operations.
0110  */
0111 
0112 /* Functions that read and validate content from consumer rings. */
0113 
0114 static inline void __xskq_cons_read_addr_unchecked(struct xsk_queue *q, u32 cached_cons, u64 *addr)
0115 {
0116     struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
0117     u32 idx = cached_cons & q->ring_mask;
0118 
0119     *addr = ring->desc[idx];
0120 }
0121 
0122 static inline bool xskq_cons_read_addr_unchecked(struct xsk_queue *q, u64 *addr)
0123 {
0124     if (q->cached_cons != q->cached_prod) {
0125         __xskq_cons_read_addr_unchecked(q, q->cached_cons, addr);
0126         return true;
0127     }
0128 
0129     return false;
0130 }
0131 
0132 static inline bool xp_aligned_validate_desc(struct xsk_buff_pool *pool,
0133                         struct xdp_desc *desc)
0134 {
0135     u64 chunk, chunk_end;
0136 
0137     chunk = xp_aligned_extract_addr(pool, desc->addr);
0138     if (likely(desc->len)) {
0139         chunk_end = xp_aligned_extract_addr(pool, desc->addr + desc->len - 1);
0140         if (chunk != chunk_end)
0141             return false;
0142     }
0143 
0144     if (chunk >= pool->addrs_cnt)
0145         return false;
0146 
0147     if (desc->options)
0148         return false;
0149     return true;
0150 }
0151 
0152 static inline bool xp_unaligned_validate_desc(struct xsk_buff_pool *pool,
0153                           struct xdp_desc *desc)
0154 {
0155     u64 addr, base_addr;
0156 
0157     base_addr = xp_unaligned_extract_addr(desc->addr);
0158     addr = xp_unaligned_add_offset_to_addr(desc->addr);
0159 
0160     if (desc->len > pool->chunk_size)
0161         return false;
0162 
0163     if (base_addr >= pool->addrs_cnt || addr >= pool->addrs_cnt ||
0164         xp_desc_crosses_non_contig_pg(pool, addr, desc->len))
0165         return false;
0166 
0167     if (desc->options)
0168         return false;
0169     return true;
0170 }
0171 
0172 static inline bool xp_validate_desc(struct xsk_buff_pool *pool,
0173                     struct xdp_desc *desc)
0174 {
0175     return pool->unaligned ? xp_unaligned_validate_desc(pool, desc) :
0176         xp_aligned_validate_desc(pool, desc);
0177 }
0178 
0179 static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q,
0180                        struct xdp_desc *d,
0181                        struct xsk_buff_pool *pool)
0182 {
0183     if (!xp_validate_desc(pool, d)) {
0184         q->invalid_descs++;
0185         return false;
0186     }
0187     return true;
0188 }
0189 
0190 static inline bool xskq_cons_read_desc(struct xsk_queue *q,
0191                        struct xdp_desc *desc,
0192                        struct xsk_buff_pool *pool)
0193 {
0194     while (q->cached_cons != q->cached_prod) {
0195         struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
0196         u32 idx = q->cached_cons & q->ring_mask;
0197 
0198         *desc = ring->desc[idx];
0199         if (xskq_cons_is_valid_desc(q, desc, pool))
0200             return true;
0201 
0202         q->cached_cons++;
0203     }
0204 
0205     return false;
0206 }
0207 
0208 static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool,
0209                         u32 max)
0210 {
0211     u32 cached_cons = q->cached_cons, nb_entries = 0;
0212     struct xdp_desc *descs = pool->tx_descs;
0213 
0214     while (cached_cons != q->cached_prod && nb_entries < max) {
0215         struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
0216         u32 idx = cached_cons & q->ring_mask;
0217 
0218         descs[nb_entries] = ring->desc[idx];
0219         if (unlikely(!xskq_cons_is_valid_desc(q, &descs[nb_entries], pool))) {
0220             /* Skip the entry */
0221             cached_cons++;
0222             continue;
0223         }
0224 
0225         nb_entries++;
0226         cached_cons++;
0227     }
0228 
0229     return nb_entries;
0230 }
0231 
0232 /* Functions for consumers */
0233 
0234 static inline void __xskq_cons_release(struct xsk_queue *q)
0235 {
0236     smp_store_release(&q->ring->consumer, q->cached_cons); /* D, matchees A */
0237 }
0238 
0239 static inline void __xskq_cons_peek(struct xsk_queue *q)
0240 {
0241     /* Refresh the local pointer */
0242     q->cached_prod = smp_load_acquire(&q->ring->producer);  /* C, matches B */
0243 }
0244 
0245 static inline void xskq_cons_get_entries(struct xsk_queue *q)
0246 {
0247     __xskq_cons_release(q);
0248     __xskq_cons_peek(q);
0249 }
0250 
0251 static inline u32 xskq_cons_nb_entries(struct xsk_queue *q, u32 max)
0252 {
0253     u32 entries = q->cached_prod - q->cached_cons;
0254 
0255     if (entries >= max)
0256         return max;
0257 
0258     __xskq_cons_peek(q);
0259     entries = q->cached_prod - q->cached_cons;
0260 
0261     return entries >= max ? max : entries;
0262 }
0263 
0264 static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
0265 {
0266     return xskq_cons_nb_entries(q, cnt) >= cnt;
0267 }
0268 
0269 static inline bool xskq_cons_peek_addr_unchecked(struct xsk_queue *q, u64 *addr)
0270 {
0271     if (q->cached_prod == q->cached_cons)
0272         xskq_cons_get_entries(q);
0273     return xskq_cons_read_addr_unchecked(q, addr);
0274 }
0275 
0276 static inline bool xskq_cons_peek_desc(struct xsk_queue *q,
0277                        struct xdp_desc *desc,
0278                        struct xsk_buff_pool *pool)
0279 {
0280     if (q->cached_prod == q->cached_cons)
0281         xskq_cons_get_entries(q);
0282     return xskq_cons_read_desc(q, desc, pool);
0283 }
0284 
0285 /* To improve performance in the xskq_cons_release functions, only update local state here.
0286  * Reflect this to global state when we get new entries from the ring in
0287  * xskq_cons_get_entries() and whenever Rx or Tx processing are completed in the NAPI loop.
0288  */
0289 static inline void xskq_cons_release(struct xsk_queue *q)
0290 {
0291     q->cached_cons++;
0292 }
0293 
0294 static inline void xskq_cons_release_n(struct xsk_queue *q, u32 cnt)
0295 {
0296     q->cached_cons += cnt;
0297 }
0298 
0299 static inline u32 xskq_cons_present_entries(struct xsk_queue *q)
0300 {
0301     /* No barriers needed since data is not accessed */
0302     return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer);
0303 }
0304 
0305 /* Functions for producers */
0306 
0307 static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max)
0308 {
0309     u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
0310 
0311     if (free_entries >= max)
0312         return max;
0313 
0314     /* Refresh the local tail pointer */
0315     q->cached_cons = READ_ONCE(q->ring->consumer);
0316     free_entries = q->nentries - (q->cached_prod - q->cached_cons);
0317 
0318     return free_entries >= max ? max : free_entries;
0319 }
0320 
0321 static inline bool xskq_prod_is_full(struct xsk_queue *q)
0322 {
0323     return xskq_prod_nb_free(q, 1) ? false : true;
0324 }
0325 
0326 static inline void xskq_prod_cancel(struct xsk_queue *q)
0327 {
0328     q->cached_prod--;
0329 }
0330 
0331 static inline int xskq_prod_reserve(struct xsk_queue *q)
0332 {
0333     if (xskq_prod_is_full(q))
0334         return -ENOSPC;
0335 
0336     /* A, matches D */
0337     q->cached_prod++;
0338     return 0;
0339 }
0340 
0341 static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
0342 {
0343     struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
0344 
0345     if (xskq_prod_is_full(q))
0346         return -ENOSPC;
0347 
0348     /* A, matches D */
0349     ring->desc[q->cached_prod++ & q->ring_mask] = addr;
0350     return 0;
0351 }
0352 
0353 static inline u32 xskq_prod_reserve_addr_batch(struct xsk_queue *q, struct xdp_desc *descs,
0354                            u32 max)
0355 {
0356     struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
0357     u32 nb_entries, i, cached_prod;
0358 
0359     nb_entries = xskq_prod_nb_free(q, max);
0360 
0361     /* A, matches D */
0362     cached_prod = q->cached_prod;
0363     for (i = 0; i < nb_entries; i++)
0364         ring->desc[cached_prod++ & q->ring_mask] = descs[i].addr;
0365     q->cached_prod = cached_prod;
0366 
0367     return nb_entries;
0368 }
0369 
0370 static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
0371                      u64 addr, u32 len)
0372 {
0373     struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
0374     u32 idx;
0375 
0376     if (xskq_prod_is_full(q))
0377         return -ENOBUFS;
0378 
0379     /* A, matches D */
0380     idx = q->cached_prod++ & q->ring_mask;
0381     ring->desc[idx].addr = addr;
0382     ring->desc[idx].len = len;
0383 
0384     return 0;
0385 }
0386 
0387 static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
0388 {
0389     smp_store_release(&q->ring->producer, idx); /* B, matches C */
0390 }
0391 
0392 static inline void xskq_prod_submit(struct xsk_queue *q)
0393 {
0394     __xskq_prod_submit(q, q->cached_prod);
0395 }
0396 
0397 static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
0398 {
0399     struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
0400     u32 idx = q->ring->producer;
0401 
0402     ring->desc[idx++ & q->ring_mask] = addr;
0403 
0404     __xskq_prod_submit(q, idx);
0405 }
0406 
0407 static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
0408 {
0409     __xskq_prod_submit(q, q->ring->producer + nb_entries);
0410 }
0411 
0412 static inline bool xskq_prod_is_empty(struct xsk_queue *q)
0413 {
0414     /* No barriers needed since data is not accessed */
0415     return READ_ONCE(q->ring->consumer) == READ_ONCE(q->ring->producer);
0416 }
0417 
0418 /* For both producers and consumers */
0419 
0420 static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
0421 {
0422     return q ? q->invalid_descs : 0;
0423 }
0424 
0425 static inline u64 xskq_nb_queue_empty_descs(struct xsk_queue *q)
0426 {
0427     return q ? q->queue_empty_descs : 0;
0428 }
0429 
0430 struct xsk_queue *xskq_create(u32 nentries, bool umem_queue);
0431 void xskq_destroy(struct xsk_queue *q_ops);
0432 
0433 #endif /* _LINUX_XSK_QUEUE_H */