Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-only
0002 /*
0003  * Copyright 2018 Google Inc.
0004  * Author: Eric Dumazet (edumazet@google.com)
0005  *
0006  * Reference program demonstrating tcp mmap() usage,
0007  * and SO_RCVLOWAT hints for receiver.
0008  *
0009  * Note : NIC with header split is needed to use mmap() on TCP :
0010  * Each incoming frame must be a multiple of PAGE_SIZE bytes of TCP payload.
0011  *
0012  * How to use on loopback interface :
0013  *
0014  *  ifconfig lo mtu 61512  # 15*4096 + 40 (ipv6 header) + 32 (TCP with TS option header)
0015  *  tcp_mmap -s -z &
0016  *  tcp_mmap -H ::1 -z
0017  *
0018  *  Or leave default lo mtu, but use -M option to set TCP_MAXSEG option to (4096 + 12)
0019  *      (4096 : page size on x86, 12: TCP TS option length)
0020  *  tcp_mmap -s -z -M $((4096+12)) &
0021  *  tcp_mmap -H ::1 -z -M $((4096+12))
0022  *
0023  * Note: -z option on sender uses MSG_ZEROCOPY, which forces a copy when packets go through loopback interface.
0024  *       We might use sendfile() instead, but really this test program is about mmap(), for receivers ;)
0025  *
0026  * $ ./tcp_mmap -s &                                 # Without mmap()
0027  * $ for i in {1..4}; do ./tcp_mmap -H ::1 -z ; done
0028  * received 32768 MB (0 % mmap'ed) in 14.1157 s, 19.4732 Gbit
0029  *   cpu usage user:0.057 sys:7.815, 240.234 usec per MB, 65531 c-switches
0030  * received 32768 MB (0 % mmap'ed) in 14.6833 s, 18.7204 Gbit
0031  *  cpu usage user:0.043 sys:8.103, 248.596 usec per MB, 65524 c-switches
0032  * received 32768 MB (0 % mmap'ed) in 11.143 s, 24.6682 Gbit
0033  *   cpu usage user:0.044 sys:6.576, 202.026 usec per MB, 65519 c-switches
0034  * received 32768 MB (0 % mmap'ed) in 14.9056 s, 18.4413 Gbit
0035  *   cpu usage user:0.036 sys:8.193, 251.129 usec per MB, 65530 c-switches
0036  * $ kill %1   # kill tcp_mmap server
0037  *
0038  * $ ./tcp_mmap -s -z &                              # With mmap()
0039  * $ for i in {1..4}; do ./tcp_mmap -H ::1 -z ; done
0040  * received 32768 MB (99.9939 % mmap'ed) in 6.73792 s, 40.7956 Gbit
0041  *   cpu usage user:0.045 sys:2.827, 87.6465 usec per MB, 65532 c-switches
0042  * received 32768 MB (99.9939 % mmap'ed) in 7.26732 s, 37.8238 Gbit
0043  *   cpu usage user:0.037 sys:3.087, 95.3369 usec per MB, 65532 c-switches
0044  * received 32768 MB (99.9939 % mmap'ed) in 7.61661 s, 36.0893 Gbit
0045  *   cpu usage user:0.046 sys:3.559, 110.016 usec per MB, 65529 c-switches
0046  * received 32768 MB (99.9939 % mmap'ed) in 7.43764 s, 36.9577 Gbit
0047  *   cpu usage user:0.035 sys:3.467, 106.873 usec per MB, 65530 c-switches
0048  */
0049 #define _GNU_SOURCE
0050 #include <pthread.h>
0051 #include <sys/types.h>
0052 #include <fcntl.h>
0053 #include <error.h>
0054 #include <sys/socket.h>
0055 #include <sys/mman.h>
0056 #include <sys/resource.h>
0057 #include <unistd.h>
0058 #include <string.h>
0059 #include <stdlib.h>
0060 #include <stdio.h>
0061 #include <errno.h>
0062 #include <time.h>
0063 #include <sys/time.h>
0064 #include <netinet/in.h>
0065 #include <arpa/inet.h>
0066 #include <poll.h>
0067 #include <linux/tcp.h>
0068 #include <assert.h>
0069 
0070 #ifndef MSG_ZEROCOPY
0071 #define MSG_ZEROCOPY    0x4000000
0072 #endif
0073 
0074 #define FILE_SZ (1ULL << 35)
0075 static int cfg_family = AF_INET6;
0076 static socklen_t cfg_alen = sizeof(struct sockaddr_in6);
0077 static int cfg_port = 8787;
0078 
0079 static int rcvbuf; /* Default: autotuning.  Can be set with -r <integer> option */
0080 static int sndbuf; /* Default: autotuning.  Can be set with -w <integer> option */
0081 static int zflg; /* zero copy option. (MSG_ZEROCOPY for sender, mmap() for receiver */
0082 static int xflg; /* hash received data (simple xor) (-h option) */
0083 static int keepflag; /* -k option: receiver shall keep all received file in memory (no munmap() calls) */
0084 
0085 static size_t chunk_size  = 512*1024;
0086 
0087 static size_t map_align;
0088 
0089 unsigned long htotal;
0090 
0091 static inline void prefetch(const void *x)
0092 {
0093 #if defined(__x86_64__)
0094     asm volatile("prefetcht0 %P0" : : "m" (*(const char *)x));
0095 #endif
0096 }
0097 
0098 void hash_zone(void *zone, unsigned int length)
0099 {
0100     unsigned long temp = htotal;
0101 
0102     while (length >= 8*sizeof(long)) {
0103         prefetch(zone + 384);
0104         temp ^= *(unsigned long *)zone;
0105         temp ^= *(unsigned long *)(zone + sizeof(long));
0106         temp ^= *(unsigned long *)(zone + 2*sizeof(long));
0107         temp ^= *(unsigned long *)(zone + 3*sizeof(long));
0108         temp ^= *(unsigned long *)(zone + 4*sizeof(long));
0109         temp ^= *(unsigned long *)(zone + 5*sizeof(long));
0110         temp ^= *(unsigned long *)(zone + 6*sizeof(long));
0111         temp ^= *(unsigned long *)(zone + 7*sizeof(long));
0112         zone += 8*sizeof(long);
0113         length -= 8*sizeof(long);
0114     }
0115     while (length >= 1) {
0116         temp ^= *(unsigned char *)zone;
0117         zone += 1;
0118         length--;
0119     }
0120     htotal = temp;
0121 }
0122 
0123 #define ALIGN_UP(x, align_to)   (((x) + ((align_to)-1)) & ~((align_to)-1))
0124 #define ALIGN_PTR_UP(p, ptr_align_to)   ((typeof(p))ALIGN_UP((unsigned long)(p), ptr_align_to))
0125 
0126 
0127 static void *mmap_large_buffer(size_t need, size_t *allocated)
0128 {
0129     void *buffer;
0130     size_t sz;
0131 
0132     /* Attempt to use huge pages if possible. */
0133     sz = ALIGN_UP(need, map_align);
0134     buffer = mmap(NULL, sz, PROT_READ | PROT_WRITE,
0135               MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB, -1, 0);
0136 
0137     if (buffer == (void *)-1) {
0138         sz = need;
0139         buffer = mmap(NULL, sz, PROT_READ | PROT_WRITE,
0140                   MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
0141         if (buffer != (void *)-1)
0142             fprintf(stderr, "MAP_HUGETLB attempt failed, look at /sys/kernel/mm/hugepages for optimal performance\n");
0143     }
0144     *allocated = sz;
0145     return buffer;
0146 }
0147 
0148 void *child_thread(void *arg)
0149 {
0150     unsigned long total_mmap = 0, total = 0;
0151     struct tcp_zerocopy_receive zc;
0152     unsigned long delta_usec;
0153     int flags = MAP_SHARED;
0154     struct timeval t0, t1;
0155     char *buffer = NULL;
0156     void *raddr = NULL;
0157     void *addr = NULL;
0158     double throughput;
0159     struct rusage ru;
0160     size_t buffer_sz;
0161     int lu, fd;
0162 
0163     fd = (int)(unsigned long)arg;
0164 
0165     gettimeofday(&t0, NULL);
0166 
0167     fcntl(fd, F_SETFL, O_NDELAY);
0168     buffer = mmap_large_buffer(chunk_size, &buffer_sz);
0169     if (buffer == (void *)-1) {
0170         perror("mmap");
0171         goto error;
0172     }
0173     if (zflg) {
0174         raddr = mmap(NULL, chunk_size + map_align, PROT_READ, flags, fd, 0);
0175         if (raddr == (void *)-1) {
0176             perror("mmap");
0177             zflg = 0;
0178         } else {
0179             addr = ALIGN_PTR_UP(raddr, map_align);
0180         }
0181     }
0182     while (1) {
0183         struct pollfd pfd = { .fd = fd, .events = POLLIN, };
0184         int sub;
0185 
0186         poll(&pfd, 1, 10000);
0187         if (zflg) {
0188             socklen_t zc_len = sizeof(zc);
0189             int res;
0190 
0191             memset(&zc, 0, sizeof(zc));
0192             zc.address = (__u64)((unsigned long)addr);
0193             zc.length = chunk_size;
0194 
0195             res = getsockopt(fd, IPPROTO_TCP, TCP_ZEROCOPY_RECEIVE,
0196                      &zc, &zc_len);
0197             if (res == -1)
0198                 break;
0199 
0200             if (zc.length) {
0201                 assert(zc.length <= chunk_size);
0202                 total_mmap += zc.length;
0203                 if (xflg)
0204                     hash_zone(addr, zc.length);
0205                 /* It is more efficient to unmap the pages right now,
0206                  * instead of doing this in next TCP_ZEROCOPY_RECEIVE.
0207                  */
0208                 madvise(addr, zc.length, MADV_DONTNEED);
0209                 total += zc.length;
0210             }
0211             if (zc.recv_skip_hint) {
0212                 assert(zc.recv_skip_hint <= chunk_size);
0213                 lu = read(fd, buffer, zc.recv_skip_hint);
0214                 if (lu > 0) {
0215                     if (xflg)
0216                         hash_zone(buffer, lu);
0217                     total += lu;
0218                 }
0219             }
0220             continue;
0221         }
0222         sub = 0;
0223         while (sub < chunk_size) {
0224             lu = read(fd, buffer + sub, chunk_size - sub);
0225             if (lu == 0)
0226                 goto end;
0227             if (lu < 0)
0228                 break;
0229             if (xflg)
0230                 hash_zone(buffer + sub, lu);
0231             total += lu;
0232             sub += lu;
0233         }
0234     }
0235 end:
0236     gettimeofday(&t1, NULL);
0237     delta_usec = (t1.tv_sec - t0.tv_sec) * 1000000 + t1.tv_usec - t0.tv_usec;
0238 
0239     throughput = 0;
0240     if (delta_usec)
0241         throughput = total * 8.0 / (double)delta_usec / 1000.0;
0242     getrusage(RUSAGE_THREAD, &ru);
0243     if (total > 1024*1024) {
0244         unsigned long total_usec;
0245         unsigned long mb = total >> 20;
0246         total_usec = 1000000*ru.ru_utime.tv_sec + ru.ru_utime.tv_usec +
0247                  1000000*ru.ru_stime.tv_sec + ru.ru_stime.tv_usec;
0248         printf("received %lg MB (%lg %% mmap'ed) in %lg s, %lg Gbit\n"
0249                "  cpu usage user:%lg sys:%lg, %lg usec per MB, %lu c-switches\n",
0250                 total / (1024.0 * 1024.0),
0251                 100.0*total_mmap/total,
0252                 (double)delta_usec / 1000000.0,
0253                 throughput,
0254                 (double)ru.ru_utime.tv_sec + (double)ru.ru_utime.tv_usec / 1000000.0,
0255                 (double)ru.ru_stime.tv_sec + (double)ru.ru_stime.tv_usec / 1000000.0,
0256                 (double)total_usec/mb,
0257                 ru.ru_nvcsw);
0258     }
0259 error:
0260     munmap(buffer, buffer_sz);
0261     close(fd);
0262     if (zflg)
0263         munmap(raddr, chunk_size + map_align);
0264     pthread_exit(0);
0265 }
0266 
0267 static void apply_rcvsnd_buf(int fd)
0268 {
0269     if (rcvbuf && setsockopt(fd, SOL_SOCKET,
0270                  SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)) == -1) {
0271         perror("setsockopt SO_RCVBUF");
0272     }
0273 
0274     if (sndbuf && setsockopt(fd, SOL_SOCKET,
0275                  SO_SNDBUF, &sndbuf, sizeof(sndbuf)) == -1) {
0276         perror("setsockopt SO_SNDBUF");
0277     }
0278 }
0279 
0280 
0281 static void setup_sockaddr(int domain, const char *str_addr,
0282                struct sockaddr_storage *sockaddr)
0283 {
0284     struct sockaddr_in6 *addr6 = (void *) sockaddr;
0285     struct sockaddr_in *addr4 = (void *) sockaddr;
0286 
0287     switch (domain) {
0288     case PF_INET:
0289         memset(addr4, 0, sizeof(*addr4));
0290         addr4->sin_family = AF_INET;
0291         addr4->sin_port = htons(cfg_port);
0292         if (str_addr &&
0293             inet_pton(AF_INET, str_addr, &(addr4->sin_addr)) != 1)
0294             error(1, 0, "ipv4 parse error: %s", str_addr);
0295         break;
0296     case PF_INET6:
0297         memset(addr6, 0, sizeof(*addr6));
0298         addr6->sin6_family = AF_INET6;
0299         addr6->sin6_port = htons(cfg_port);
0300         if (str_addr &&
0301             inet_pton(AF_INET6, str_addr, &(addr6->sin6_addr)) != 1)
0302             error(1, 0, "ipv6 parse error: %s", str_addr);
0303         break;
0304     default:
0305         error(1, 0, "illegal domain");
0306     }
0307 }
0308 
0309 static void do_accept(int fdlisten)
0310 {
0311     pthread_attr_t attr;
0312     int rcvlowat;
0313 
0314     pthread_attr_init(&attr);
0315     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
0316 
0317     rcvlowat = chunk_size;
0318     if (setsockopt(fdlisten, SOL_SOCKET, SO_RCVLOWAT,
0319                &rcvlowat, sizeof(rcvlowat)) == -1) {
0320         perror("setsockopt SO_RCVLOWAT");
0321     }
0322 
0323     apply_rcvsnd_buf(fdlisten);
0324 
0325     while (1) {
0326         struct sockaddr_in addr;
0327         socklen_t addrlen = sizeof(addr);
0328         pthread_t th;
0329         int fd, res;
0330 
0331         fd = accept(fdlisten, (struct sockaddr *)&addr, &addrlen);
0332         if (fd == -1) {
0333             perror("accept");
0334             continue;
0335         }
0336         res = pthread_create(&th, &attr, child_thread,
0337                      (void *)(unsigned long)fd);
0338         if (res) {
0339             errno = res;
0340             perror("pthread_create");
0341             close(fd);
0342         }
0343     }
0344 }
0345 
0346 /* Each thread should reserve a big enough vma to avoid
0347  * spinlock collisions in ptl locks.
0348  * This size is 2MB on x86_64, and is exported in /proc/meminfo.
0349  */
0350 static unsigned long default_huge_page_size(void)
0351 {
0352     FILE *f = fopen("/proc/meminfo", "r");
0353     unsigned long hps = 0;
0354     size_t linelen = 0;
0355     char *line = NULL;
0356 
0357     if (!f)
0358         return 0;
0359     while (getline(&line, &linelen, f) > 0) {
0360         if (sscanf(line, "Hugepagesize:       %lu kB", &hps) == 1) {
0361             hps <<= 10;
0362             break;
0363         }
0364     }
0365     free(line);
0366     fclose(f);
0367     return hps;
0368 }
0369 
0370 int main(int argc, char *argv[])
0371 {
0372     struct sockaddr_storage listenaddr, addr;
0373     unsigned int max_pacing_rate = 0;
0374     uint64_t total = 0;
0375     char *host = NULL;
0376     int fd, c, on = 1;
0377     size_t buffer_sz;
0378     char *buffer;
0379     int sflg = 0;
0380     int mss = 0;
0381 
0382     while ((c = getopt(argc, argv, "46p:svr:w:H:zxkP:M:C:a:")) != -1) {
0383         switch (c) {
0384         case '4':
0385             cfg_family = PF_INET;
0386             cfg_alen = sizeof(struct sockaddr_in);
0387             break;
0388         case '6':
0389             cfg_family = PF_INET6;
0390             cfg_alen = sizeof(struct sockaddr_in6);
0391             break;
0392         case 'p':
0393             cfg_port = atoi(optarg);
0394             break;
0395         case 'H':
0396             host = optarg;
0397             break;
0398         case 's': /* server : listen for incoming connections */
0399             sflg++;
0400             break;
0401         case 'r':
0402             rcvbuf = atoi(optarg);
0403             break;
0404         case 'w':
0405             sndbuf = atoi(optarg);
0406             break;
0407         case 'z':
0408             zflg = 1;
0409             break;
0410         case 'M':
0411             mss = atoi(optarg);
0412             break;
0413         case 'x':
0414             xflg = 1;
0415             break;
0416         case 'k':
0417             keepflag = 1;
0418             break;
0419         case 'P':
0420             max_pacing_rate = atoi(optarg) ;
0421             break;
0422         case 'C':
0423             chunk_size = atol(optarg);
0424             break;
0425         case 'a':
0426             map_align = atol(optarg);
0427             break;
0428         default:
0429             exit(1);
0430         }
0431     }
0432     if (!map_align) {
0433         map_align = default_huge_page_size();
0434         /* if really /proc/meminfo is not helping,
0435          * we use the default x86_64 hugepagesize.
0436          */
0437         if (!map_align)
0438             map_align = 2*1024*1024;
0439     }
0440     if (sflg) {
0441         int fdlisten = socket(cfg_family, SOCK_STREAM, 0);
0442 
0443         if (fdlisten == -1) {
0444             perror("socket");
0445             exit(1);
0446         }
0447         apply_rcvsnd_buf(fdlisten);
0448         setsockopt(fdlisten, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
0449 
0450         setup_sockaddr(cfg_family, host, &listenaddr);
0451 
0452         if (mss &&
0453             setsockopt(fdlisten, IPPROTO_TCP, TCP_MAXSEG,
0454                    &mss, sizeof(mss)) == -1) {
0455             perror("setsockopt TCP_MAXSEG");
0456             exit(1);
0457         }
0458         if (bind(fdlisten, (const struct sockaddr *)&listenaddr, cfg_alen) == -1) {
0459             perror("bind");
0460             exit(1);
0461         }
0462         if (listen(fdlisten, 128) == -1) {
0463             perror("listen");
0464             exit(1);
0465         }
0466         do_accept(fdlisten);
0467     }
0468 
0469     buffer = mmap_large_buffer(chunk_size, &buffer_sz);
0470     if (buffer == (char *)-1) {
0471         perror("mmap");
0472         exit(1);
0473     }
0474 
0475     fd = socket(cfg_family, SOCK_STREAM, 0);
0476     if (fd == -1) {
0477         perror("socket");
0478         exit(1);
0479     }
0480     apply_rcvsnd_buf(fd);
0481 
0482     setup_sockaddr(cfg_family, host, &addr);
0483 
0484     if (mss &&
0485         setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, &mss, sizeof(mss)) == -1) {
0486         perror("setsockopt TCP_MAXSEG");
0487         exit(1);
0488     }
0489     if (connect(fd, (const struct sockaddr *)&addr, cfg_alen) == -1) {
0490         perror("connect");
0491         exit(1);
0492     }
0493     if (max_pacing_rate &&
0494         setsockopt(fd, SOL_SOCKET, SO_MAX_PACING_RATE,
0495                &max_pacing_rate, sizeof(max_pacing_rate)) == -1)
0496         perror("setsockopt SO_MAX_PACING_RATE");
0497 
0498     if (zflg && setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY,
0499                    &on, sizeof(on)) == -1) {
0500         perror("setsockopt SO_ZEROCOPY, (-z option disabled)");
0501         zflg = 0;
0502     }
0503     while (total < FILE_SZ) {
0504         int64_t wr = FILE_SZ - total;
0505 
0506         if (wr > chunk_size)
0507             wr = chunk_size;
0508         /* Note : we just want to fill the pipe with 0 bytes */
0509         wr = send(fd, buffer, (size_t)wr, zflg ? MSG_ZEROCOPY : 0);
0510         if (wr <= 0)
0511             break;
0512         total += wr;
0513     }
0514     close(fd);
0515     munmap(buffer, buffer_sz);
0516     return 0;
0517 }