Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
0002 /*
0003  * Ring buffer operations.
0004  *
0005  * Copyright (C) 2020 Facebook, Inc.
0006  */
0007 #ifndef _GNU_SOURCE
0008 #define _GNU_SOURCE
0009 #endif
0010 #include <stdlib.h>
0011 #include <stdio.h>
0012 #include <errno.h>
0013 #include <unistd.h>
0014 #include <linux/err.h>
0015 #include <linux/bpf.h>
0016 #include <asm/barrier.h>
0017 #include <sys/mman.h>
0018 #include <sys/epoll.h>
0019 
0020 #include "libbpf.h"
0021 #include "libbpf_internal.h"
0022 #include "bpf.h"
0023 
0024 struct ring {
0025     ring_buffer_sample_fn sample_cb;
0026     void *ctx;
0027     void *data;
0028     unsigned long *consumer_pos;
0029     unsigned long *producer_pos;
0030     unsigned long mask;
0031     int map_fd;
0032 };
0033 
0034 struct ring_buffer {
0035     struct epoll_event *events;
0036     struct ring *rings;
0037     size_t page_size;
0038     int epoll_fd;
0039     int ring_cnt;
0040 };
0041 
0042 static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r)
0043 {
0044     if (r->consumer_pos) {
0045         munmap(r->consumer_pos, rb->page_size);
0046         r->consumer_pos = NULL;
0047     }
0048     if (r->producer_pos) {
0049         munmap(r->producer_pos, rb->page_size + 2 * (r->mask + 1));
0050         r->producer_pos = NULL;
0051     }
0052 }
0053 
0054 /* Add extra RINGBUF maps to this ring buffer manager */
0055 int ring_buffer__add(struct ring_buffer *rb, int map_fd,
0056              ring_buffer_sample_fn sample_cb, void *ctx)
0057 {
0058     struct bpf_map_info info;
0059     __u32 len = sizeof(info);
0060     struct epoll_event *e;
0061     struct ring *r;
0062     void *tmp;
0063     int err;
0064 
0065     memset(&info, 0, sizeof(info));
0066 
0067     err = bpf_obj_get_info_by_fd(map_fd, &info, &len);
0068     if (err) {
0069         err = -errno;
0070         pr_warn("ringbuf: failed to get map info for fd=%d: %d\n",
0071             map_fd, err);
0072         return libbpf_err(err);
0073     }
0074 
0075     if (info.type != BPF_MAP_TYPE_RINGBUF) {
0076         pr_warn("ringbuf: map fd=%d is not BPF_MAP_TYPE_RINGBUF\n",
0077             map_fd);
0078         return libbpf_err(-EINVAL);
0079     }
0080 
0081     tmp = libbpf_reallocarray(rb->rings, rb->ring_cnt + 1, sizeof(*rb->rings));
0082     if (!tmp)
0083         return libbpf_err(-ENOMEM);
0084     rb->rings = tmp;
0085 
0086     tmp = libbpf_reallocarray(rb->events, rb->ring_cnt + 1, sizeof(*rb->events));
0087     if (!tmp)
0088         return libbpf_err(-ENOMEM);
0089     rb->events = tmp;
0090 
0091     r = &rb->rings[rb->ring_cnt];
0092     memset(r, 0, sizeof(*r));
0093 
0094     r->map_fd = map_fd;
0095     r->sample_cb = sample_cb;
0096     r->ctx = ctx;
0097     r->mask = info.max_entries - 1;
0098 
0099     /* Map writable consumer page */
0100     tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED,
0101            map_fd, 0);
0102     if (tmp == MAP_FAILED) {
0103         err = -errno;
0104         pr_warn("ringbuf: failed to mmap consumer page for map fd=%d: %d\n",
0105             map_fd, err);
0106         return libbpf_err(err);
0107     }
0108     r->consumer_pos = tmp;
0109 
0110     /* Map read-only producer page and data pages. We map twice as big
0111      * data size to allow simple reading of samples that wrap around the
0112      * end of a ring buffer. See kernel implementation for details.
0113      * */
0114     tmp = mmap(NULL, rb->page_size + 2 * info.max_entries, PROT_READ,
0115            MAP_SHARED, map_fd, rb->page_size);
0116     if (tmp == MAP_FAILED) {
0117         err = -errno;
0118         ringbuf_unmap_ring(rb, r);
0119         pr_warn("ringbuf: failed to mmap data pages for map fd=%d: %d\n",
0120             map_fd, err);
0121         return libbpf_err(err);
0122     }
0123     r->producer_pos = tmp;
0124     r->data = tmp + rb->page_size;
0125 
0126     e = &rb->events[rb->ring_cnt];
0127     memset(e, 0, sizeof(*e));
0128 
0129     e->events = EPOLLIN;
0130     e->data.fd = rb->ring_cnt;
0131     if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, e) < 0) {
0132         err = -errno;
0133         ringbuf_unmap_ring(rb, r);
0134         pr_warn("ringbuf: failed to epoll add map fd=%d: %d\n",
0135             map_fd, err);
0136         return libbpf_err(err);
0137     }
0138 
0139     rb->ring_cnt++;
0140     return 0;
0141 }
0142 
0143 void ring_buffer__free(struct ring_buffer *rb)
0144 {
0145     int i;
0146 
0147     if (!rb)
0148         return;
0149 
0150     for (i = 0; i < rb->ring_cnt; ++i)
0151         ringbuf_unmap_ring(rb, &rb->rings[i]);
0152     if (rb->epoll_fd >= 0)
0153         close(rb->epoll_fd);
0154 
0155     free(rb->events);
0156     free(rb->rings);
0157     free(rb);
0158 }
0159 
0160 struct ring_buffer *
0161 ring_buffer__new(int map_fd, ring_buffer_sample_fn sample_cb, void *ctx,
0162          const struct ring_buffer_opts *opts)
0163 {
0164     struct ring_buffer *rb;
0165     int err;
0166 
0167     if (!OPTS_VALID(opts, ring_buffer_opts))
0168         return errno = EINVAL, NULL;
0169 
0170     rb = calloc(1, sizeof(*rb));
0171     if (!rb)
0172         return errno = ENOMEM, NULL;
0173 
0174     rb->page_size = getpagesize();
0175 
0176     rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
0177     if (rb->epoll_fd < 0) {
0178         err = -errno;
0179         pr_warn("ringbuf: failed to create epoll instance: %d\n", err);
0180         goto err_out;
0181     }
0182 
0183     err = ring_buffer__add(rb, map_fd, sample_cb, ctx);
0184     if (err)
0185         goto err_out;
0186 
0187     return rb;
0188 
0189 err_out:
0190     ring_buffer__free(rb);
0191     return errno = -err, NULL;
0192 }
0193 
0194 static inline int roundup_len(__u32 len)
0195 {
0196     /* clear out top 2 bits (discard and busy, if set) */
0197     len <<= 2;
0198     len >>= 2;
0199     /* add length prefix */
0200     len += BPF_RINGBUF_HDR_SZ;
0201     /* round up to 8 byte alignment */
0202     return (len + 7) / 8 * 8;
0203 }
0204 
0205 static int64_t ringbuf_process_ring(struct ring* r)
0206 {
0207     int *len_ptr, len, err;
0208     /* 64-bit to avoid overflow in case of extreme application behavior */
0209     int64_t cnt = 0;
0210     unsigned long cons_pos, prod_pos;
0211     bool got_new_data;
0212     void *sample;
0213 
0214     cons_pos = smp_load_acquire(r->consumer_pos);
0215     do {
0216         got_new_data = false;
0217         prod_pos = smp_load_acquire(r->producer_pos);
0218         while (cons_pos < prod_pos) {
0219             len_ptr = r->data + (cons_pos & r->mask);
0220             len = smp_load_acquire(len_ptr);
0221 
0222             /* sample not committed yet, bail out for now */
0223             if (len & BPF_RINGBUF_BUSY_BIT)
0224                 goto done;
0225 
0226             got_new_data = true;
0227             cons_pos += roundup_len(len);
0228 
0229             if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) {
0230                 sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ;
0231                 err = r->sample_cb(r->ctx, sample, len);
0232                 if (err < 0) {
0233                     /* update consumer pos and bail out */
0234                     smp_store_release(r->consumer_pos,
0235                               cons_pos);
0236                     return err;
0237                 }
0238                 cnt++;
0239             }
0240 
0241             smp_store_release(r->consumer_pos, cons_pos);
0242         }
0243     } while (got_new_data);
0244 done:
0245     return cnt;
0246 }
0247 
0248 /* Consume available ring buffer(s) data without event polling.
0249  * Returns number of records consumed across all registered ring buffers (or
0250  * INT_MAX, whichever is less), or negative number if any of the callbacks
0251  * return error.
0252  */
0253 int ring_buffer__consume(struct ring_buffer *rb)
0254 {
0255     int64_t err, res = 0;
0256     int i;
0257 
0258     for (i = 0; i < rb->ring_cnt; i++) {
0259         struct ring *ring = &rb->rings[i];
0260 
0261         err = ringbuf_process_ring(ring);
0262         if (err < 0)
0263             return libbpf_err(err);
0264         res += err;
0265     }
0266     if (res > INT_MAX)
0267         return INT_MAX;
0268     return res;
0269 }
0270 
0271 /* Poll for available data and consume records, if any are available.
0272  * Returns number of records consumed (or INT_MAX, whichever is less), or
0273  * negative number, if any of the registered callbacks returned error.
0274  */
0275 int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms)
0276 {
0277     int i, cnt;
0278     int64_t err, res = 0;
0279 
0280     cnt = epoll_wait(rb->epoll_fd, rb->events, rb->ring_cnt, timeout_ms);
0281     if (cnt < 0)
0282         return libbpf_err(-errno);
0283 
0284     for (i = 0; i < cnt; i++) {
0285         __u32 ring_id = rb->events[i].data.fd;
0286         struct ring *ring = &rb->rings[ring_id];
0287 
0288         err = ringbuf_process_ring(ring);
0289         if (err < 0)
0290             return libbpf_err(err);
0291         res += err;
0292     }
0293     if (res > INT_MAX)
0294         return INT_MAX;
0295     return res;
0296 }
0297 
0298 /* Get an fd that can be used to sleep until data is available in the ring(s) */
0299 int ring_buffer__epoll_fd(const struct ring_buffer *rb)
0300 {
0301     return rb->epoll_fd;
0302 }