0001
0002
0003 #define _GNU_SOURCE
0004
0005 #include <arpa/inet.h>
0006 #include <error.h>
0007 #include <errno.h>
0008 #include <limits.h>
0009 #include <linux/errqueue.h>
0010 #include <linux/if_packet.h>
0011 #include <linux/socket.h>
0012 #include <linux/sockios.h>
0013 #include <net/ethernet.h>
0014 #include <net/if.h>
0015 #include <netinet/ip.h>
0016 #include <netinet/ip6.h>
0017 #include <netinet/tcp.h>
0018 #include <netinet/udp.h>
0019 #include <poll.h>
0020 #include <sched.h>
0021 #include <stdbool.h>
0022 #include <stdio.h>
0023 #include <stdint.h>
0024 #include <stdlib.h>
0025 #include <string.h>
0026 #include <sys/ioctl.h>
0027 #include <sys/socket.h>
0028 #include <sys/stat.h>
0029 #include <sys/time.h>
0030 #include <sys/types.h>
0031 #include <sys/wait.h>
0032 #include <unistd.h>
0033
0034 #ifndef UDP_GRO
0035 #define UDP_GRO 104
0036 #endif
0037
0038 static int cfg_port = 8000;
0039 static bool cfg_tcp;
0040 static bool cfg_verify;
0041 static bool cfg_read_all;
0042 static bool cfg_gro_segment;
0043 static int cfg_family = PF_INET6;
0044 static int cfg_alen = sizeof(struct sockaddr_in6);
0045 static int cfg_expected_pkt_nr;
0046 static int cfg_expected_pkt_len;
0047 static int cfg_expected_gso_size;
0048 static int cfg_connect_timeout_ms;
0049 static int cfg_rcv_timeout_ms;
0050 static struct sockaddr_storage cfg_bind_addr;
0051
0052 static bool interrupted;
0053 static unsigned long packets, bytes;
0054
0055 static void sigint_handler(int signum)
0056 {
0057 if (signum == SIGINT)
0058 interrupted = true;
0059 }
0060
0061 static void setup_sockaddr(int domain, const char *str_addr, void *sockaddr)
0062 {
0063 struct sockaddr_in6 *addr6 = (void *) sockaddr;
0064 struct sockaddr_in *addr4 = (void *) sockaddr;
0065
0066 switch (domain) {
0067 case PF_INET:
0068 addr4->sin_family = AF_INET;
0069 addr4->sin_port = htons(cfg_port);
0070 if (inet_pton(AF_INET, str_addr, &(addr4->sin_addr)) != 1)
0071 error(1, 0, "ipv4 parse error: %s", str_addr);
0072 break;
0073 case PF_INET6:
0074 addr6->sin6_family = AF_INET6;
0075 addr6->sin6_port = htons(cfg_port);
0076 if (inet_pton(AF_INET6, str_addr, &(addr6->sin6_addr)) != 1)
0077 error(1, 0, "ipv6 parse error: %s", str_addr);
0078 break;
0079 default:
0080 error(1, 0, "illegal domain");
0081 }
0082 }
0083
0084 static unsigned long gettimeofday_ms(void)
0085 {
0086 struct timeval tv;
0087
0088 gettimeofday(&tv, NULL);
0089 return (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
0090 }
0091
0092 static void do_poll(int fd, int timeout_ms)
0093 {
0094 struct pollfd pfd;
0095 int ret;
0096
0097 pfd.events = POLLIN;
0098 pfd.revents = 0;
0099 pfd.fd = fd;
0100
0101 do {
0102 ret = poll(&pfd, 1, 10);
0103 if (interrupted)
0104 break;
0105 if (ret == -1)
0106 error(1, errno, "poll");
0107 if (ret == 0) {
0108 if (!timeout_ms)
0109 continue;
0110
0111 timeout_ms -= 10;
0112 if (timeout_ms <= 0) {
0113 interrupted = true;
0114 break;
0115 }
0116
0117
0118 continue;
0119 }
0120 if (pfd.revents != POLLIN)
0121 error(1, errno, "poll: 0x%x expected 0x%x\n",
0122 pfd.revents, POLLIN);
0123 } while (!ret);
0124 }
0125
0126 static int do_socket(bool do_tcp)
0127 {
0128 int fd, val;
0129
0130 fd = socket(cfg_family, cfg_tcp ? SOCK_STREAM : SOCK_DGRAM, 0);
0131 if (fd == -1)
0132 error(1, errno, "socket");
0133
0134 val = 1 << 21;
0135 if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val)))
0136 error(1, errno, "setsockopt rcvbuf");
0137 val = 1;
0138 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val)))
0139 error(1, errno, "setsockopt reuseport");
0140
0141 if (bind(fd, (void *)&cfg_bind_addr, cfg_alen))
0142 error(1, errno, "bind");
0143
0144 if (do_tcp) {
0145 int accept_fd = fd;
0146
0147 if (listen(accept_fd, 1))
0148 error(1, errno, "listen");
0149
0150 do_poll(accept_fd, cfg_connect_timeout_ms);
0151 if (interrupted)
0152 exit(0);
0153
0154 fd = accept(accept_fd, NULL, NULL);
0155 if (fd == -1)
0156 error(1, errno, "accept");
0157 if (close(accept_fd))
0158 error(1, errno, "close accept fd");
0159 }
0160
0161 return fd;
0162 }
0163
0164
0165 static void do_flush_tcp(int fd)
0166 {
0167 int ret;
0168
0169 while (true) {
0170
0171 ret = recv(fd, NULL, 1 << 21, MSG_TRUNC | MSG_DONTWAIT);
0172 if (ret == -1 && errno == EAGAIN)
0173 return;
0174 if (ret == -1)
0175 error(1, errno, "flush");
0176 if (ret == 0) {
0177
0178 exit(0);
0179 }
0180
0181 packets++;
0182 bytes += ret;
0183 }
0184
0185 }
0186
0187 static char sanitized_char(char val)
0188 {
0189 return (val >= 'a' && val <= 'z') ? val : '.';
0190 }
0191
0192 static void do_verify_udp(const char *data, int len)
0193 {
0194 char cur = data[0];
0195 int i;
0196
0197
0198 if (cur < 'a' || cur > 'z')
0199 error(1, 0, "data initial byte out of range");
0200
0201 for (i = 1; i < len; i++) {
0202 if (cur == 'z')
0203 cur = 'a';
0204 else
0205 cur++;
0206
0207 if (data[i] != cur)
0208 error(1, 0, "data[%d]: len %d, %c(%hhu) != %c(%hhu)\n",
0209 i, len,
0210 sanitized_char(data[i]), data[i],
0211 sanitized_char(cur), cur);
0212 }
0213 }
0214
0215 static int recv_msg(int fd, char *buf, int len, int *gso_size)
0216 {
0217 char control[CMSG_SPACE(sizeof(uint16_t))] = {0};
0218 struct msghdr msg = {0};
0219 struct iovec iov = {0};
0220 struct cmsghdr *cmsg;
0221 uint16_t *gsosizeptr;
0222 int ret;
0223
0224 iov.iov_base = buf;
0225 iov.iov_len = len;
0226
0227 msg.msg_iov = &iov;
0228 msg.msg_iovlen = 1;
0229
0230 msg.msg_control = control;
0231 msg.msg_controllen = sizeof(control);
0232
0233 *gso_size = -1;
0234 ret = recvmsg(fd, &msg, MSG_TRUNC | MSG_DONTWAIT);
0235 if (ret != -1) {
0236 for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL;
0237 cmsg = CMSG_NXTHDR(&msg, cmsg)) {
0238 if (cmsg->cmsg_level == SOL_UDP
0239 && cmsg->cmsg_type == UDP_GRO) {
0240 gsosizeptr = (uint16_t *) CMSG_DATA(cmsg);
0241 *gso_size = *gsosizeptr;
0242 break;
0243 }
0244 }
0245 }
0246 return ret;
0247 }
0248
0249
0250 static void do_flush_udp(int fd)
0251 {
0252 static char rbuf[ETH_MAX_MTU];
0253 int ret, len, gso_size, budget = 256;
0254
0255 len = cfg_read_all ? sizeof(rbuf) : 0;
0256 while (budget--) {
0257
0258 if (!cfg_expected_gso_size)
0259 ret = recv(fd, rbuf, len, MSG_TRUNC | MSG_DONTWAIT);
0260 else
0261 ret = recv_msg(fd, rbuf, len, &gso_size);
0262 if (ret == -1 && errno == EAGAIN)
0263 break;
0264 if (ret == -1)
0265 error(1, errno, "recv");
0266 if (cfg_expected_pkt_len && ret != cfg_expected_pkt_len)
0267 error(1, 0, "recv: bad packet len, got %d,"
0268 " expected %d\n", ret, cfg_expected_pkt_len);
0269 if (len && cfg_verify) {
0270 if (ret == 0)
0271 error(1, errno, "recv: 0 byte datagram\n");
0272
0273 do_verify_udp(rbuf, ret);
0274 }
0275 if (cfg_expected_gso_size && cfg_expected_gso_size != gso_size)
0276 error(1, 0, "recv: bad gso size, got %d, expected %d "
0277 "(-1 == no gso cmsg))\n", gso_size,
0278 cfg_expected_gso_size);
0279
0280 packets++;
0281 bytes += ret;
0282 if (cfg_expected_pkt_nr && packets >= cfg_expected_pkt_nr)
0283 break;
0284 }
0285 }
0286
0287 static void usage(const char *filepath)
0288 {
0289 error(1, 0, "Usage: %s [-C connect_timeout] [-Grtv] [-b addr] [-p port]"
0290 " [-l pktlen] [-n packetnr] [-R rcv_timeout] [-S gsosize]",
0291 filepath);
0292 }
0293
0294 static void parse_opts(int argc, char **argv)
0295 {
0296 const char *bind_addr = NULL;
0297 int c;
0298
0299 while ((c = getopt(argc, argv, "4b:C:Gl:n:p:rR:S:tv")) != -1) {
0300 switch (c) {
0301 case '4':
0302 cfg_family = PF_INET;
0303 cfg_alen = sizeof(struct sockaddr_in);
0304 break;
0305 case 'b':
0306 bind_addr = optarg;
0307 break;
0308 case 'C':
0309 cfg_connect_timeout_ms = strtoul(optarg, NULL, 0);
0310 break;
0311 case 'G':
0312 cfg_gro_segment = true;
0313 break;
0314 case 'l':
0315 cfg_expected_pkt_len = strtoul(optarg, NULL, 0);
0316 break;
0317 case 'n':
0318 cfg_expected_pkt_nr = strtoul(optarg, NULL, 0);
0319 break;
0320 case 'p':
0321 cfg_port = strtoul(optarg, NULL, 0);
0322 break;
0323 case 'r':
0324 cfg_read_all = true;
0325 break;
0326 case 'R':
0327 cfg_rcv_timeout_ms = strtoul(optarg, NULL, 0);
0328 break;
0329 case 'S':
0330 cfg_expected_gso_size = strtol(optarg, NULL, 0);
0331 break;
0332 case 't':
0333 cfg_tcp = true;
0334 break;
0335 case 'v':
0336 cfg_verify = true;
0337 cfg_read_all = true;
0338 break;
0339 }
0340 }
0341
0342 if (!bind_addr)
0343 bind_addr = cfg_family == PF_INET6 ? "::" : "0.0.0.0";
0344
0345 setup_sockaddr(cfg_family, bind_addr, &cfg_bind_addr);
0346
0347 if (optind != argc)
0348 usage(argv[0]);
0349
0350 if (cfg_tcp && cfg_verify)
0351 error(1, 0, "TODO: implement verify mode for tcp");
0352 }
0353
0354 static void do_recv(void)
0355 {
0356 int timeout_ms = cfg_tcp ? cfg_rcv_timeout_ms : cfg_connect_timeout_ms;
0357 unsigned long tnow, treport;
0358 int fd;
0359
0360 fd = do_socket(cfg_tcp);
0361
0362 if (cfg_gro_segment && !cfg_tcp) {
0363 int val = 1;
0364 if (setsockopt(fd, IPPROTO_UDP, UDP_GRO, &val, sizeof(val)))
0365 error(1, errno, "setsockopt UDP_GRO");
0366 }
0367
0368 treport = gettimeofday_ms() + 1000;
0369 do {
0370 do_poll(fd, timeout_ms);
0371
0372 if (cfg_tcp)
0373 do_flush_tcp(fd);
0374 else
0375 do_flush_udp(fd);
0376
0377 tnow = gettimeofday_ms();
0378 if (tnow > treport) {
0379 if (packets)
0380 fprintf(stderr,
0381 "%s rx: %6lu MB/s %8lu calls/s\n",
0382 cfg_tcp ? "tcp" : "udp",
0383 bytes >> 20, packets);
0384 bytes = packets = 0;
0385 treport = tnow + 1000;
0386 }
0387
0388 timeout_ms = cfg_rcv_timeout_ms;
0389
0390 } while (!interrupted);
0391
0392 if (cfg_expected_pkt_nr && (packets != cfg_expected_pkt_nr))
0393 error(1, 0, "wrong packet number! got %ld, expected %d\n",
0394 packets, cfg_expected_pkt_nr);
0395
0396 if (close(fd))
0397 error(1, errno, "close");
0398 }
0399
0400 int main(int argc, char **argv)
0401 {
0402 parse_opts(argc, argv);
0403
0404 signal(SIGINT, sigint_handler);
0405
0406 do_recv();
0407
0408 return 0;
0409 }