0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049 #define _GNU_SOURCE
0050 #include <pthread.h>
0051 #include <sys/types.h>
0052 #include <fcntl.h>
0053 #include <error.h>
0054 #include <sys/socket.h>
0055 #include <sys/mman.h>
0056 #include <sys/resource.h>
0057 #include <unistd.h>
0058 #include <string.h>
0059 #include <stdlib.h>
0060 #include <stdio.h>
0061 #include <errno.h>
0062 #include <time.h>
0063 #include <sys/time.h>
0064 #include <netinet/in.h>
0065 #include <arpa/inet.h>
0066 #include <poll.h>
0067 #include <linux/tcp.h>
0068 #include <assert.h>
0069
0070 #ifndef MSG_ZEROCOPY
0071 #define MSG_ZEROCOPY 0x4000000
0072 #endif
0073
0074 #define FILE_SZ (1ULL << 35)
0075 static int cfg_family = AF_INET6;
0076 static socklen_t cfg_alen = sizeof(struct sockaddr_in6);
0077 static int cfg_port = 8787;
0078
0079 static int rcvbuf;
0080 static int sndbuf;
0081 static int zflg;
0082 static int xflg;
0083 static int keepflag;
0084
0085 static size_t chunk_size = 512*1024;
0086
0087 static size_t map_align;
0088
0089 unsigned long htotal;
0090
0091 static inline void prefetch(const void *x)
0092 {
0093 #if defined(__x86_64__)
0094 asm volatile("prefetcht0 %P0" : : "m" (*(const char *)x));
0095 #endif
0096 }
0097
0098 void hash_zone(void *zone, unsigned int length)
0099 {
0100 unsigned long temp = htotal;
0101
0102 while (length >= 8*sizeof(long)) {
0103 prefetch(zone + 384);
0104 temp ^= *(unsigned long *)zone;
0105 temp ^= *(unsigned long *)(zone + sizeof(long));
0106 temp ^= *(unsigned long *)(zone + 2*sizeof(long));
0107 temp ^= *(unsigned long *)(zone + 3*sizeof(long));
0108 temp ^= *(unsigned long *)(zone + 4*sizeof(long));
0109 temp ^= *(unsigned long *)(zone + 5*sizeof(long));
0110 temp ^= *(unsigned long *)(zone + 6*sizeof(long));
0111 temp ^= *(unsigned long *)(zone + 7*sizeof(long));
0112 zone += 8*sizeof(long);
0113 length -= 8*sizeof(long);
0114 }
0115 while (length >= 1) {
0116 temp ^= *(unsigned char *)zone;
0117 zone += 1;
0118 length--;
0119 }
0120 htotal = temp;
0121 }
0122
0123 #define ALIGN_UP(x, align_to) (((x) + ((align_to)-1)) & ~((align_to)-1))
0124 #define ALIGN_PTR_UP(p, ptr_align_to) ((typeof(p))ALIGN_UP((unsigned long)(p), ptr_align_to))
0125
0126
0127 static void *mmap_large_buffer(size_t need, size_t *allocated)
0128 {
0129 void *buffer;
0130 size_t sz;
0131
0132
0133 sz = ALIGN_UP(need, map_align);
0134 buffer = mmap(NULL, sz, PROT_READ | PROT_WRITE,
0135 MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB, -1, 0);
0136
0137 if (buffer == (void *)-1) {
0138 sz = need;
0139 buffer = mmap(NULL, sz, PROT_READ | PROT_WRITE,
0140 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
0141 if (buffer != (void *)-1)
0142 fprintf(stderr, "MAP_HUGETLB attempt failed, look at /sys/kernel/mm/hugepages for optimal performance\n");
0143 }
0144 *allocated = sz;
0145 return buffer;
0146 }
0147
0148 void *child_thread(void *arg)
0149 {
0150 unsigned long total_mmap = 0, total = 0;
0151 struct tcp_zerocopy_receive zc;
0152 unsigned long delta_usec;
0153 int flags = MAP_SHARED;
0154 struct timeval t0, t1;
0155 char *buffer = NULL;
0156 void *raddr = NULL;
0157 void *addr = NULL;
0158 double throughput;
0159 struct rusage ru;
0160 size_t buffer_sz;
0161 int lu, fd;
0162
0163 fd = (int)(unsigned long)arg;
0164
0165 gettimeofday(&t0, NULL);
0166
0167 fcntl(fd, F_SETFL, O_NDELAY);
0168 buffer = mmap_large_buffer(chunk_size, &buffer_sz);
0169 if (buffer == (void *)-1) {
0170 perror("mmap");
0171 goto error;
0172 }
0173 if (zflg) {
0174 raddr = mmap(NULL, chunk_size + map_align, PROT_READ, flags, fd, 0);
0175 if (raddr == (void *)-1) {
0176 perror("mmap");
0177 zflg = 0;
0178 } else {
0179 addr = ALIGN_PTR_UP(raddr, map_align);
0180 }
0181 }
0182 while (1) {
0183 struct pollfd pfd = { .fd = fd, .events = POLLIN, };
0184 int sub;
0185
0186 poll(&pfd, 1, 10000);
0187 if (zflg) {
0188 socklen_t zc_len = sizeof(zc);
0189 int res;
0190
0191 memset(&zc, 0, sizeof(zc));
0192 zc.address = (__u64)((unsigned long)addr);
0193 zc.length = chunk_size;
0194
0195 res = getsockopt(fd, IPPROTO_TCP, TCP_ZEROCOPY_RECEIVE,
0196 &zc, &zc_len);
0197 if (res == -1)
0198 break;
0199
0200 if (zc.length) {
0201 assert(zc.length <= chunk_size);
0202 total_mmap += zc.length;
0203 if (xflg)
0204 hash_zone(addr, zc.length);
0205
0206
0207
0208 madvise(addr, zc.length, MADV_DONTNEED);
0209 total += zc.length;
0210 }
0211 if (zc.recv_skip_hint) {
0212 assert(zc.recv_skip_hint <= chunk_size);
0213 lu = read(fd, buffer, zc.recv_skip_hint);
0214 if (lu > 0) {
0215 if (xflg)
0216 hash_zone(buffer, lu);
0217 total += lu;
0218 }
0219 }
0220 continue;
0221 }
0222 sub = 0;
0223 while (sub < chunk_size) {
0224 lu = read(fd, buffer + sub, chunk_size - sub);
0225 if (lu == 0)
0226 goto end;
0227 if (lu < 0)
0228 break;
0229 if (xflg)
0230 hash_zone(buffer + sub, lu);
0231 total += lu;
0232 sub += lu;
0233 }
0234 }
0235 end:
0236 gettimeofday(&t1, NULL);
0237 delta_usec = (t1.tv_sec - t0.tv_sec) * 1000000 + t1.tv_usec - t0.tv_usec;
0238
0239 throughput = 0;
0240 if (delta_usec)
0241 throughput = total * 8.0 / (double)delta_usec / 1000.0;
0242 getrusage(RUSAGE_THREAD, &ru);
0243 if (total > 1024*1024) {
0244 unsigned long total_usec;
0245 unsigned long mb = total >> 20;
0246 total_usec = 1000000*ru.ru_utime.tv_sec + ru.ru_utime.tv_usec +
0247 1000000*ru.ru_stime.tv_sec + ru.ru_stime.tv_usec;
0248 printf("received %lg MB (%lg %% mmap'ed) in %lg s, %lg Gbit\n"
0249 " cpu usage user:%lg sys:%lg, %lg usec per MB, %lu c-switches\n",
0250 total / (1024.0 * 1024.0),
0251 100.0*total_mmap/total,
0252 (double)delta_usec / 1000000.0,
0253 throughput,
0254 (double)ru.ru_utime.tv_sec + (double)ru.ru_utime.tv_usec / 1000000.0,
0255 (double)ru.ru_stime.tv_sec + (double)ru.ru_stime.tv_usec / 1000000.0,
0256 (double)total_usec/mb,
0257 ru.ru_nvcsw);
0258 }
0259 error:
0260 munmap(buffer, buffer_sz);
0261 close(fd);
0262 if (zflg)
0263 munmap(raddr, chunk_size + map_align);
0264 pthread_exit(0);
0265 }
0266
0267 static void apply_rcvsnd_buf(int fd)
0268 {
0269 if (rcvbuf && setsockopt(fd, SOL_SOCKET,
0270 SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)) == -1) {
0271 perror("setsockopt SO_RCVBUF");
0272 }
0273
0274 if (sndbuf && setsockopt(fd, SOL_SOCKET,
0275 SO_SNDBUF, &sndbuf, sizeof(sndbuf)) == -1) {
0276 perror("setsockopt SO_SNDBUF");
0277 }
0278 }
0279
0280
0281 static void setup_sockaddr(int domain, const char *str_addr,
0282 struct sockaddr_storage *sockaddr)
0283 {
0284 struct sockaddr_in6 *addr6 = (void *) sockaddr;
0285 struct sockaddr_in *addr4 = (void *) sockaddr;
0286
0287 switch (domain) {
0288 case PF_INET:
0289 memset(addr4, 0, sizeof(*addr4));
0290 addr4->sin_family = AF_INET;
0291 addr4->sin_port = htons(cfg_port);
0292 if (str_addr &&
0293 inet_pton(AF_INET, str_addr, &(addr4->sin_addr)) != 1)
0294 error(1, 0, "ipv4 parse error: %s", str_addr);
0295 break;
0296 case PF_INET6:
0297 memset(addr6, 0, sizeof(*addr6));
0298 addr6->sin6_family = AF_INET6;
0299 addr6->sin6_port = htons(cfg_port);
0300 if (str_addr &&
0301 inet_pton(AF_INET6, str_addr, &(addr6->sin6_addr)) != 1)
0302 error(1, 0, "ipv6 parse error: %s", str_addr);
0303 break;
0304 default:
0305 error(1, 0, "illegal domain");
0306 }
0307 }
0308
0309 static void do_accept(int fdlisten)
0310 {
0311 pthread_attr_t attr;
0312 int rcvlowat;
0313
0314 pthread_attr_init(&attr);
0315 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
0316
0317 rcvlowat = chunk_size;
0318 if (setsockopt(fdlisten, SOL_SOCKET, SO_RCVLOWAT,
0319 &rcvlowat, sizeof(rcvlowat)) == -1) {
0320 perror("setsockopt SO_RCVLOWAT");
0321 }
0322
0323 apply_rcvsnd_buf(fdlisten);
0324
0325 while (1) {
0326 struct sockaddr_in addr;
0327 socklen_t addrlen = sizeof(addr);
0328 pthread_t th;
0329 int fd, res;
0330
0331 fd = accept(fdlisten, (struct sockaddr *)&addr, &addrlen);
0332 if (fd == -1) {
0333 perror("accept");
0334 continue;
0335 }
0336 res = pthread_create(&th, &attr, child_thread,
0337 (void *)(unsigned long)fd);
0338 if (res) {
0339 errno = res;
0340 perror("pthread_create");
0341 close(fd);
0342 }
0343 }
0344 }
0345
0346
0347
0348
0349
0350 static unsigned long default_huge_page_size(void)
0351 {
0352 FILE *f = fopen("/proc/meminfo", "r");
0353 unsigned long hps = 0;
0354 size_t linelen = 0;
0355 char *line = NULL;
0356
0357 if (!f)
0358 return 0;
0359 while (getline(&line, &linelen, f) > 0) {
0360 if (sscanf(line, "Hugepagesize: %lu kB", &hps) == 1) {
0361 hps <<= 10;
0362 break;
0363 }
0364 }
0365 free(line);
0366 fclose(f);
0367 return hps;
0368 }
0369
0370 int main(int argc, char *argv[])
0371 {
0372 struct sockaddr_storage listenaddr, addr;
0373 unsigned int max_pacing_rate = 0;
0374 uint64_t total = 0;
0375 char *host = NULL;
0376 int fd, c, on = 1;
0377 size_t buffer_sz;
0378 char *buffer;
0379 int sflg = 0;
0380 int mss = 0;
0381
0382 while ((c = getopt(argc, argv, "46p:svr:w:H:zxkP:M:C:a:")) != -1) {
0383 switch (c) {
0384 case '4':
0385 cfg_family = PF_INET;
0386 cfg_alen = sizeof(struct sockaddr_in);
0387 break;
0388 case '6':
0389 cfg_family = PF_INET6;
0390 cfg_alen = sizeof(struct sockaddr_in6);
0391 break;
0392 case 'p':
0393 cfg_port = atoi(optarg);
0394 break;
0395 case 'H':
0396 host = optarg;
0397 break;
0398 case 's':
0399 sflg++;
0400 break;
0401 case 'r':
0402 rcvbuf = atoi(optarg);
0403 break;
0404 case 'w':
0405 sndbuf = atoi(optarg);
0406 break;
0407 case 'z':
0408 zflg = 1;
0409 break;
0410 case 'M':
0411 mss = atoi(optarg);
0412 break;
0413 case 'x':
0414 xflg = 1;
0415 break;
0416 case 'k':
0417 keepflag = 1;
0418 break;
0419 case 'P':
0420 max_pacing_rate = atoi(optarg) ;
0421 break;
0422 case 'C':
0423 chunk_size = atol(optarg);
0424 break;
0425 case 'a':
0426 map_align = atol(optarg);
0427 break;
0428 default:
0429 exit(1);
0430 }
0431 }
0432 if (!map_align) {
0433 map_align = default_huge_page_size();
0434
0435
0436
0437 if (!map_align)
0438 map_align = 2*1024*1024;
0439 }
0440 if (sflg) {
0441 int fdlisten = socket(cfg_family, SOCK_STREAM, 0);
0442
0443 if (fdlisten == -1) {
0444 perror("socket");
0445 exit(1);
0446 }
0447 apply_rcvsnd_buf(fdlisten);
0448 setsockopt(fdlisten, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
0449
0450 setup_sockaddr(cfg_family, host, &listenaddr);
0451
0452 if (mss &&
0453 setsockopt(fdlisten, IPPROTO_TCP, TCP_MAXSEG,
0454 &mss, sizeof(mss)) == -1) {
0455 perror("setsockopt TCP_MAXSEG");
0456 exit(1);
0457 }
0458 if (bind(fdlisten, (const struct sockaddr *)&listenaddr, cfg_alen) == -1) {
0459 perror("bind");
0460 exit(1);
0461 }
0462 if (listen(fdlisten, 128) == -1) {
0463 perror("listen");
0464 exit(1);
0465 }
0466 do_accept(fdlisten);
0467 }
0468
0469 buffer = mmap_large_buffer(chunk_size, &buffer_sz);
0470 if (buffer == (char *)-1) {
0471 perror("mmap");
0472 exit(1);
0473 }
0474
0475 fd = socket(cfg_family, SOCK_STREAM, 0);
0476 if (fd == -1) {
0477 perror("socket");
0478 exit(1);
0479 }
0480 apply_rcvsnd_buf(fd);
0481
0482 setup_sockaddr(cfg_family, host, &addr);
0483
0484 if (mss &&
0485 setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, &mss, sizeof(mss)) == -1) {
0486 perror("setsockopt TCP_MAXSEG");
0487 exit(1);
0488 }
0489 if (connect(fd, (const struct sockaddr *)&addr, cfg_alen) == -1) {
0490 perror("connect");
0491 exit(1);
0492 }
0493 if (max_pacing_rate &&
0494 setsockopt(fd, SOL_SOCKET, SO_MAX_PACING_RATE,
0495 &max_pacing_rate, sizeof(max_pacing_rate)) == -1)
0496 perror("setsockopt SO_MAX_PACING_RATE");
0497
0498 if (zflg && setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY,
0499 &on, sizeof(on)) == -1) {
0500 perror("setsockopt SO_ZEROCOPY, (-z option disabled)");
0501 zflg = 0;
0502 }
0503 while (total < FILE_SZ) {
0504 int64_t wr = FILE_SZ - total;
0505
0506 if (wr > chunk_size)
0507 wr = chunk_size;
0508
0509 wr = send(fd, buffer, (size_t)wr, zflg ? MSG_ZEROCOPY : 0);
0510 if (wr <= 0)
0511 break;
0512 total += wr;
0513 }
0514 close(fd);
0515 munmap(buffer, buffer_sz);
0516 return 0;
0517 }