0001
0002
0003 #define _GNU_SOURCE
0004
0005 #include <errno.h>
0006 #include <limits.h>
0007 #include <fcntl.h>
0008 #include <string.h>
0009 #include <stdarg.h>
0010 #include <stdbool.h>
0011 #include <stdint.h>
0012 #include <stdio.h>
0013 #include <stdlib.h>
0014 #include <strings.h>
0015 #include <signal.h>
0016 #include <unistd.h>
0017 #include <time.h>
0018
0019 #include <sys/ioctl.h>
0020 #include <sys/poll.h>
0021 #include <sys/sendfile.h>
0022 #include <sys/stat.h>
0023 #include <sys/socket.h>
0024 #include <sys/types.h>
0025 #include <sys/mman.h>
0026
0027 #include <netdb.h>
0028 #include <netinet/in.h>
0029
0030 #include <linux/tcp.h>
0031 #include <linux/time_types.h>
0032 #include <linux/sockios.h>
0033
0034 extern int optind;
0035
0036 #ifndef IPPROTO_MPTCP
0037 #define IPPROTO_MPTCP 262
0038 #endif
0039 #ifndef TCP_ULP
0040 #define TCP_ULP 31
0041 #endif
0042
0043 static int poll_timeout = 10 * 1000;
0044 static bool listen_mode;
0045 static bool quit;
0046
0047 enum cfg_mode {
0048 CFG_MODE_POLL,
0049 CFG_MODE_MMAP,
0050 CFG_MODE_SENDFILE,
0051 };
0052
0053 enum cfg_peek {
0054 CFG_NONE_PEEK,
0055 CFG_WITH_PEEK,
0056 CFG_AFTER_PEEK,
0057 };
0058
0059 static enum cfg_mode cfg_mode = CFG_MODE_POLL;
0060 static enum cfg_peek cfg_peek = CFG_NONE_PEEK;
0061 static const char *cfg_host;
0062 static const char *cfg_port = "12000";
0063 static int cfg_sock_proto = IPPROTO_MPTCP;
0064 static int pf = AF_INET;
0065 static int cfg_sndbuf;
0066 static int cfg_rcvbuf;
0067 static bool cfg_join;
0068 static bool cfg_remove;
0069 static unsigned int cfg_time;
0070 static unsigned int cfg_do_w;
0071 static int cfg_wait;
0072 static uint32_t cfg_mark;
0073 static char *cfg_input;
0074 static int cfg_repeat = 1;
0075
0076 struct cfg_cmsg_types {
0077 unsigned int cmsg_enabled:1;
0078 unsigned int timestampns:1;
0079 unsigned int tcp_inq:1;
0080 };
0081
0082 struct cfg_sockopt_types {
0083 unsigned int transparent:1;
0084 };
0085
0086 struct tcp_inq_state {
0087 unsigned int last;
0088 bool expect_eof;
0089 };
0090
0091 static struct tcp_inq_state tcp_inq;
0092
0093 static struct cfg_cmsg_types cfg_cmsg_types;
0094 static struct cfg_sockopt_types cfg_sockopt_types;
0095
0096 static void die_usage(void)
0097 {
0098 fprintf(stderr, "Usage: mptcp_connect [-6] [-c cmsg] [-i file] [-I num] [-j] [-l] "
0099 "[-m mode] [-M mark] [-o option] [-p port] [-P mode] [-j] [-l] [-r num] "
0100 "[-s MPTCP|TCP] [-S num] [-r num] [-t num] [-T num] [-u] [-w sec] connect_address\n");
0101 fprintf(stderr, "\t-6 use ipv6\n");
0102 fprintf(stderr, "\t-c cmsg -- test cmsg type <cmsg>\n");
0103 fprintf(stderr, "\t-i file -- read the data to send from the given file instead of stdin");
0104 fprintf(stderr, "\t-I num -- repeat the transfer 'num' times. In listen mode accepts num "
0105 "incoming connections, in client mode, disconnect and reconnect to the server\n");
0106 fprintf(stderr, "\t-j -- add additional sleep at connection start and tear down "
0107 "-- for MPJ tests\n");
0108 fprintf(stderr, "\t-l -- listens mode, accepts incoming connection\n");
0109 fprintf(stderr, "\t-m [poll|mmap|sendfile] -- use poll(default)/mmap+write/sendfile\n");
0110 fprintf(stderr, "\t-M mark -- set socket packet mark\n");
0111 fprintf(stderr, "\t-o option -- test sockopt <option>\n");
0112 fprintf(stderr, "\t-p num -- use port num\n");
0113 fprintf(stderr,
0114 "\t-P [saveWithPeek|saveAfterPeek] -- save data with/after MSG_PEEK form tcp socket\n");
0115 fprintf(stderr, "\t-t num -- set poll timeout to num\n");
0116 fprintf(stderr, "\t-T num -- set expected runtime to num ms\n");
0117 fprintf(stderr, "\t-r num -- enable slow mode, limiting each write to num bytes "
0118 "-- for remove addr tests\n");
0119 fprintf(stderr, "\t-R num -- set SO_RCVBUF to num\n");
0120 fprintf(stderr, "\t-s [MPTCP|TCP] -- use mptcp(default) or tcp sockets\n");
0121 fprintf(stderr, "\t-S num -- set SO_SNDBUF to num\n");
0122 fprintf(stderr, "\t-w num -- wait num sec before closing the socket\n");
0123 exit(1);
0124 }
0125
0126 static void xerror(const char *fmt, ...)
0127 {
0128 va_list ap;
0129
0130 va_start(ap, fmt);
0131 vfprintf(stderr, fmt, ap);
0132 va_end(ap);
0133 exit(1);
0134 }
0135
0136 static void handle_signal(int nr)
0137 {
0138 quit = true;
0139 }
0140
0141 static const char *getxinfo_strerr(int err)
0142 {
0143 if (err == EAI_SYSTEM)
0144 return strerror(errno);
0145
0146 return gai_strerror(err);
0147 }
0148
0149 static void xgetnameinfo(const struct sockaddr *addr, socklen_t addrlen,
0150 char *host, socklen_t hostlen,
0151 char *serv, socklen_t servlen)
0152 {
0153 int flags = NI_NUMERICHOST | NI_NUMERICSERV;
0154 int err = getnameinfo(addr, addrlen, host, hostlen, serv, servlen,
0155 flags);
0156
0157 if (err) {
0158 const char *errstr = getxinfo_strerr(err);
0159
0160 fprintf(stderr, "Fatal: getnameinfo: %s\n", errstr);
0161 exit(1);
0162 }
0163 }
0164
0165 static void xgetaddrinfo(const char *node, const char *service,
0166 const struct addrinfo *hints,
0167 struct addrinfo **res)
0168 {
0169 int err = getaddrinfo(node, service, hints, res);
0170
0171 if (err) {
0172 const char *errstr = getxinfo_strerr(err);
0173
0174 fprintf(stderr, "Fatal: getaddrinfo(%s:%s): %s\n",
0175 node ? node : "", service ? service : "", errstr);
0176 exit(1);
0177 }
0178 }
0179
0180 static void set_rcvbuf(int fd, unsigned int size)
0181 {
0182 int err;
0183
0184 err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size));
0185 if (err) {
0186 perror("set SO_RCVBUF");
0187 exit(1);
0188 }
0189 }
0190
0191 static void set_sndbuf(int fd, unsigned int size)
0192 {
0193 int err;
0194
0195 err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size));
0196 if (err) {
0197 perror("set SO_SNDBUF");
0198 exit(1);
0199 }
0200 }
0201
0202 static void set_mark(int fd, uint32_t mark)
0203 {
0204 int err;
0205
0206 err = setsockopt(fd, SOL_SOCKET, SO_MARK, &mark, sizeof(mark));
0207 if (err) {
0208 perror("set SO_MARK");
0209 exit(1);
0210 }
0211 }
0212
0213 static void set_transparent(int fd, int pf)
0214 {
0215 int one = 1;
0216
0217 switch (pf) {
0218 case AF_INET:
0219 if (-1 == setsockopt(fd, SOL_IP, IP_TRANSPARENT, &one, sizeof(one)))
0220 perror("IP_TRANSPARENT");
0221 break;
0222 case AF_INET6:
0223 if (-1 == setsockopt(fd, IPPROTO_IPV6, IPV6_TRANSPARENT, &one, sizeof(one)))
0224 perror("IPV6_TRANSPARENT");
0225 break;
0226 }
0227 }
0228
0229 static int do_ulp_so(int sock, const char *name)
0230 {
0231 return setsockopt(sock, IPPROTO_TCP, TCP_ULP, name, strlen(name));
0232 }
0233
0234 #define X(m) xerror("%s:%u: %s: failed for proto %d at line %u", __FILE__, __LINE__, (m), proto, line)
0235 static void sock_test_tcpulp(int sock, int proto, unsigned int line)
0236 {
0237 socklen_t buflen = 8;
0238 char buf[8] = "";
0239 int ret = getsockopt(sock, IPPROTO_TCP, TCP_ULP, buf, &buflen);
0240
0241 if (ret != 0)
0242 X("getsockopt");
0243
0244 if (buflen > 0) {
0245 if (strcmp(buf, "mptcp") != 0)
0246 xerror("unexpected ULP '%s' for proto %d at line %u", buf, proto, line);
0247 ret = do_ulp_so(sock, "tls");
0248 if (ret == 0)
0249 X("setsockopt");
0250 } else if (proto == IPPROTO_MPTCP) {
0251 ret = do_ulp_so(sock, "tls");
0252 if (ret != -1)
0253 X("setsockopt");
0254 }
0255
0256 ret = do_ulp_so(sock, "mptcp");
0257 if (ret != -1)
0258 X("setsockopt");
0259
0260 #undef X
0261 }
0262
0263 #define SOCK_TEST_TCPULP(s, p) sock_test_tcpulp((s), (p), __LINE__)
0264
0265 static int sock_listen_mptcp(const char * const listenaddr,
0266 const char * const port)
0267 {
0268 int sock = -1;
0269 struct addrinfo hints = {
0270 .ai_protocol = IPPROTO_TCP,
0271 .ai_socktype = SOCK_STREAM,
0272 .ai_flags = AI_PASSIVE | AI_NUMERICHOST
0273 };
0274
0275 hints.ai_family = pf;
0276
0277 struct addrinfo *a, *addr;
0278 int one = 1;
0279
0280 xgetaddrinfo(listenaddr, port, &hints, &addr);
0281 hints.ai_family = pf;
0282
0283 for (a = addr; a; a = a->ai_next) {
0284 sock = socket(a->ai_family, a->ai_socktype, cfg_sock_proto);
0285 if (sock < 0)
0286 continue;
0287
0288 SOCK_TEST_TCPULP(sock, cfg_sock_proto);
0289
0290 if (-1 == setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &one,
0291 sizeof(one)))
0292 perror("setsockopt");
0293
0294 if (cfg_sockopt_types.transparent)
0295 set_transparent(sock, pf);
0296
0297 if (bind(sock, a->ai_addr, a->ai_addrlen) == 0)
0298 break;
0299
0300 perror("bind");
0301 close(sock);
0302 sock = -1;
0303 }
0304
0305 freeaddrinfo(addr);
0306
0307 if (sock < 0) {
0308 fprintf(stderr, "Could not create listen socket\n");
0309 return sock;
0310 }
0311
0312 SOCK_TEST_TCPULP(sock, cfg_sock_proto);
0313
0314 if (listen(sock, 20)) {
0315 perror("listen");
0316 close(sock);
0317 return -1;
0318 }
0319
0320 SOCK_TEST_TCPULP(sock, cfg_sock_proto);
0321
0322 return sock;
0323 }
0324
0325 static int sock_connect_mptcp(const char * const remoteaddr,
0326 const char * const port, int proto,
0327 struct addrinfo **peer)
0328 {
0329 struct addrinfo hints = {
0330 .ai_protocol = IPPROTO_TCP,
0331 .ai_socktype = SOCK_STREAM,
0332 };
0333 struct addrinfo *a, *addr;
0334 int sock = -1;
0335
0336 hints.ai_family = pf;
0337
0338 xgetaddrinfo(remoteaddr, port, &hints, &addr);
0339 for (a = addr; a; a = a->ai_next) {
0340 sock = socket(a->ai_family, a->ai_socktype, proto);
0341 if (sock < 0) {
0342 perror("socket");
0343 continue;
0344 }
0345
0346 SOCK_TEST_TCPULP(sock, proto);
0347
0348 if (cfg_mark)
0349 set_mark(sock, cfg_mark);
0350
0351 if (connect(sock, a->ai_addr, a->ai_addrlen) == 0) {
0352 *peer = a;
0353 break;
0354 }
0355
0356 perror("connect()");
0357 close(sock);
0358 sock = -1;
0359 }
0360
0361 freeaddrinfo(addr);
0362 if (sock != -1)
0363 SOCK_TEST_TCPULP(sock, proto);
0364 return sock;
0365 }
0366
0367 static size_t do_rnd_write(const int fd, char *buf, const size_t len)
0368 {
0369 static bool first = true;
0370 unsigned int do_w;
0371 ssize_t bw;
0372
0373 do_w = rand() & 0xffff;
0374 if (do_w == 0 || do_w > len)
0375 do_w = len;
0376
0377 if (cfg_join && first && do_w > 100)
0378 do_w = 100;
0379
0380 if (cfg_remove && do_w > cfg_do_w)
0381 do_w = cfg_do_w;
0382
0383 bw = write(fd, buf, do_w);
0384 if (bw < 0)
0385 perror("write");
0386
0387
0388 if (cfg_join && first) {
0389 usleep(200000);
0390 first = false;
0391 }
0392
0393 if (cfg_remove)
0394 usleep(200000);
0395
0396 return bw;
0397 }
0398
0399 static size_t do_write(const int fd, char *buf, const size_t len)
0400 {
0401 size_t offset = 0;
0402
0403 while (offset < len) {
0404 size_t written;
0405 ssize_t bw;
0406
0407 bw = write(fd, buf + offset, len - offset);
0408 if (bw < 0) {
0409 perror("write");
0410 return 0;
0411 }
0412
0413 written = (size_t)bw;
0414 offset += written;
0415 }
0416
0417 return offset;
0418 }
0419
0420 static void process_cmsg(struct msghdr *msgh)
0421 {
0422 struct __kernel_timespec ts;
0423 bool inq_found = false;
0424 bool ts_found = false;
0425 unsigned int inq = 0;
0426 struct cmsghdr *cmsg;
0427
0428 for (cmsg = CMSG_FIRSTHDR(msgh); cmsg ; cmsg = CMSG_NXTHDR(msgh, cmsg)) {
0429 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SO_TIMESTAMPNS_NEW) {
0430 memcpy(&ts, CMSG_DATA(cmsg), sizeof(ts));
0431 ts_found = true;
0432 continue;
0433 }
0434 if (cmsg->cmsg_level == IPPROTO_TCP && cmsg->cmsg_type == TCP_CM_INQ) {
0435 memcpy(&inq, CMSG_DATA(cmsg), sizeof(inq));
0436 inq_found = true;
0437 continue;
0438 }
0439
0440 }
0441
0442 if (cfg_cmsg_types.timestampns) {
0443 if (!ts_found)
0444 xerror("TIMESTAMPNS not present\n");
0445 }
0446
0447 if (cfg_cmsg_types.tcp_inq) {
0448 if (!inq_found)
0449 xerror("TCP_INQ not present\n");
0450
0451 if (inq > 1024)
0452 xerror("tcp_inq %u is larger than one kbyte\n", inq);
0453 tcp_inq.last = inq;
0454 }
0455 }
0456
0457 static ssize_t do_recvmsg_cmsg(const int fd, char *buf, const size_t len)
0458 {
0459 char msg_buf[8192];
0460 struct iovec iov = {
0461 .iov_base = buf,
0462 .iov_len = len,
0463 };
0464 struct msghdr msg = {
0465 .msg_iov = &iov,
0466 .msg_iovlen = 1,
0467 .msg_control = msg_buf,
0468 .msg_controllen = sizeof(msg_buf),
0469 };
0470 int flags = 0;
0471 unsigned int last_hint = tcp_inq.last;
0472 int ret = recvmsg(fd, &msg, flags);
0473
0474 if (ret <= 0) {
0475 if (ret == 0 && tcp_inq.expect_eof)
0476 return ret;
0477
0478 if (ret == 0 && cfg_cmsg_types.tcp_inq)
0479 if (last_hint != 1 && last_hint != 0)
0480 xerror("EOF but last tcp_inq hint was %u\n", last_hint);
0481
0482 return ret;
0483 }
0484
0485 if (tcp_inq.expect_eof)
0486 xerror("expected EOF, last_hint %u, now %u\n",
0487 last_hint, tcp_inq.last);
0488
0489 if (msg.msg_controllen && !cfg_cmsg_types.cmsg_enabled)
0490 xerror("got %lu bytes of cmsg data, expected 0\n",
0491 (unsigned long)msg.msg_controllen);
0492
0493 if (msg.msg_controllen == 0 && cfg_cmsg_types.cmsg_enabled)
0494 xerror("%s\n", "got no cmsg data");
0495
0496 if (msg.msg_controllen)
0497 process_cmsg(&msg);
0498
0499 if (cfg_cmsg_types.tcp_inq) {
0500 if ((size_t)ret < len && last_hint > (unsigned int)ret) {
0501 if (ret + 1 != (int)last_hint) {
0502 int next = read(fd, msg_buf, sizeof(msg_buf));
0503
0504 xerror("read %u of %u, last_hint was %u tcp_inq hint now %u next_read returned %d/%m\n",
0505 ret, (unsigned int)len, last_hint, tcp_inq.last, next);
0506 } else {
0507 tcp_inq.expect_eof = true;
0508 }
0509 }
0510 }
0511
0512 return ret;
0513 }
0514
0515 static ssize_t do_rnd_read(const int fd, char *buf, const size_t len)
0516 {
0517 int ret = 0;
0518 char tmp[16384];
0519 size_t cap = rand();
0520
0521 cap &= 0xffff;
0522
0523 if (cap == 0)
0524 cap = 1;
0525 else if (cap > len)
0526 cap = len;
0527
0528 if (cfg_peek == CFG_WITH_PEEK) {
0529 ret = recv(fd, buf, cap, MSG_PEEK);
0530 ret = (ret < 0) ? ret : read(fd, tmp, ret);
0531 } else if (cfg_peek == CFG_AFTER_PEEK) {
0532 ret = recv(fd, buf, cap, MSG_PEEK);
0533 ret = (ret < 0) ? ret : read(fd, buf, cap);
0534 } else if (cfg_cmsg_types.cmsg_enabled) {
0535 ret = do_recvmsg_cmsg(fd, buf, cap);
0536 } else {
0537 ret = read(fd, buf, cap);
0538 }
0539
0540 return ret;
0541 }
0542
0543 static void set_nonblock(int fd, bool nonblock)
0544 {
0545 int flags = fcntl(fd, F_GETFL);
0546
0547 if (flags == -1)
0548 return;
0549
0550 if (nonblock)
0551 fcntl(fd, F_SETFL, flags | O_NONBLOCK);
0552 else
0553 fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
0554 }
0555
0556 static void shut_wr(int fd)
0557 {
0558
0559
0560
0561
0562 if (cfg_wait)
0563 usleep(cfg_wait);
0564
0565 shutdown(fd, SHUT_WR);
0566 }
0567
0568 static int copyfd_io_poll(int infd, int peerfd, int outfd, bool *in_closed_after_out)
0569 {
0570 struct pollfd fds = {
0571 .fd = peerfd,
0572 .events = POLLIN | POLLOUT,
0573 };
0574 unsigned int woff = 0, wlen = 0;
0575 char wbuf[8192];
0576
0577 set_nonblock(peerfd, true);
0578
0579 for (;;) {
0580 char rbuf[8192];
0581 ssize_t len;
0582
0583 if (fds.events == 0)
0584 break;
0585
0586 switch (poll(&fds, 1, poll_timeout)) {
0587 case -1:
0588 if (errno == EINTR)
0589 continue;
0590 perror("poll");
0591 return 1;
0592 case 0:
0593 fprintf(stderr, "%s: poll timed out (events: "
0594 "POLLIN %u, POLLOUT %u)\n", __func__,
0595 fds.events & POLLIN, fds.events & POLLOUT);
0596 return 2;
0597 }
0598
0599 if (fds.revents & POLLIN) {
0600 len = do_rnd_read(peerfd, rbuf, sizeof(rbuf));
0601 if (len == 0) {
0602
0603
0604
0605 fds.events &= ~POLLIN;
0606
0607 if ((fds.events & POLLOUT) == 0) {
0608 *in_closed_after_out = true;
0609
0610 break;
0611 }
0612
0613
0614 } else if (len < 0) {
0615 perror("read");
0616 return 3;
0617 }
0618
0619 do_write(outfd, rbuf, len);
0620 }
0621
0622 if (fds.revents & POLLOUT) {
0623 if (wlen == 0) {
0624 woff = 0;
0625 wlen = read(infd, wbuf, sizeof(wbuf));
0626 }
0627
0628 if (wlen > 0) {
0629 ssize_t bw;
0630
0631 bw = do_rnd_write(peerfd, wbuf + woff, wlen);
0632 if (bw < 0)
0633 return 111;
0634
0635 woff += bw;
0636 wlen -= bw;
0637 } else if (wlen == 0) {
0638
0639 fds.events &= ~POLLOUT;
0640
0641 if ((fds.events & POLLIN) == 0)
0642
0643 break;
0644
0645 shut_wr(peerfd);
0646 } else {
0647 if (errno == EINTR)
0648 continue;
0649 perror("read");
0650 return 4;
0651 }
0652 }
0653
0654 if (fds.revents & (POLLERR | POLLNVAL)) {
0655 fprintf(stderr, "Unexpected revents: "
0656 "POLLERR/POLLNVAL(%x)\n", fds.revents);
0657 return 5;
0658 }
0659 }
0660
0661
0662 if (cfg_remove)
0663 usleep(cfg_wait);
0664
0665 return 0;
0666 }
0667
0668 static int do_recvfile(int infd, int outfd)
0669 {
0670 ssize_t r;
0671
0672 do {
0673 char buf[16384];
0674
0675 r = do_rnd_read(infd, buf, sizeof(buf));
0676 if (r > 0) {
0677 if (write(outfd, buf, r) != r)
0678 break;
0679 } else if (r < 0) {
0680 perror("read");
0681 }
0682 } while (r > 0);
0683
0684 return (int)r;
0685 }
0686
0687 static int do_mmap(int infd, int outfd, unsigned int size)
0688 {
0689 char *inbuf = mmap(NULL, size, PROT_READ, MAP_SHARED, infd, 0);
0690 ssize_t ret = 0, off = 0;
0691 size_t rem;
0692
0693 if (inbuf == MAP_FAILED) {
0694 perror("mmap");
0695 return 1;
0696 }
0697
0698 rem = size;
0699
0700 while (rem > 0) {
0701 ret = write(outfd, inbuf + off, rem);
0702
0703 if (ret < 0) {
0704 perror("write");
0705 break;
0706 }
0707
0708 off += ret;
0709 rem -= ret;
0710 }
0711
0712 munmap(inbuf, size);
0713 return rem;
0714 }
0715
0716 static int get_infd_size(int fd)
0717 {
0718 struct stat sb;
0719 ssize_t count;
0720 int err;
0721
0722 err = fstat(fd, &sb);
0723 if (err < 0) {
0724 perror("fstat");
0725 return -1;
0726 }
0727
0728 if ((sb.st_mode & S_IFMT) != S_IFREG) {
0729 fprintf(stderr, "%s: stdin is not a regular file\n", __func__);
0730 return -2;
0731 }
0732
0733 count = sb.st_size;
0734 if (count > INT_MAX) {
0735 fprintf(stderr, "File too large: %zu\n", count);
0736 return -3;
0737 }
0738
0739 return (int)count;
0740 }
0741
0742 static int do_sendfile(int infd, int outfd, unsigned int count)
0743 {
0744 while (count > 0) {
0745 ssize_t r;
0746
0747 r = sendfile(outfd, infd, NULL, count);
0748 if (r < 0) {
0749 perror("sendfile");
0750 return 3;
0751 }
0752
0753 count -= r;
0754 }
0755
0756 return 0;
0757 }
0758
0759 static int copyfd_io_mmap(int infd, int peerfd, int outfd,
0760 unsigned int size, bool *in_closed_after_out)
0761 {
0762 int err;
0763
0764 if (listen_mode) {
0765 err = do_recvfile(peerfd, outfd);
0766 if (err)
0767 return err;
0768
0769 err = do_mmap(infd, peerfd, size);
0770 } else {
0771 err = do_mmap(infd, peerfd, size);
0772 if (err)
0773 return err;
0774
0775 shut_wr(peerfd);
0776
0777 err = do_recvfile(peerfd, outfd);
0778 *in_closed_after_out = true;
0779 }
0780
0781 return err;
0782 }
0783
0784 static int copyfd_io_sendfile(int infd, int peerfd, int outfd,
0785 unsigned int size, bool *in_closed_after_out)
0786 {
0787 int err;
0788
0789 if (listen_mode) {
0790 err = do_recvfile(peerfd, outfd);
0791 if (err)
0792 return err;
0793
0794 err = do_sendfile(infd, peerfd, size);
0795 } else {
0796 err = do_sendfile(infd, peerfd, size);
0797 if (err)
0798 return err;
0799
0800 shut_wr(peerfd);
0801
0802 err = do_recvfile(peerfd, outfd);
0803 *in_closed_after_out = true;
0804 }
0805
0806 return err;
0807 }
0808
0809 static int copyfd_io(int infd, int peerfd, int outfd, bool close_peerfd)
0810 {
0811 bool in_closed_after_out = false;
0812 struct timespec start, end;
0813 int file_size;
0814 int ret;
0815
0816 if (cfg_time && (clock_gettime(CLOCK_MONOTONIC, &start) < 0))
0817 xerror("can not fetch start time %d", errno);
0818
0819 switch (cfg_mode) {
0820 case CFG_MODE_POLL:
0821 ret = copyfd_io_poll(infd, peerfd, outfd, &in_closed_after_out);
0822 break;
0823
0824 case CFG_MODE_MMAP:
0825 file_size = get_infd_size(infd);
0826 if (file_size < 0)
0827 return file_size;
0828 ret = copyfd_io_mmap(infd, peerfd, outfd, file_size, &in_closed_after_out);
0829 break;
0830
0831 case CFG_MODE_SENDFILE:
0832 file_size = get_infd_size(infd);
0833 if (file_size < 0)
0834 return file_size;
0835 ret = copyfd_io_sendfile(infd, peerfd, outfd, file_size, &in_closed_after_out);
0836 break;
0837
0838 default:
0839 fprintf(stderr, "Invalid mode %d\n", cfg_mode);
0840
0841 die_usage();
0842 return 1;
0843 }
0844
0845 if (ret)
0846 return ret;
0847
0848 if (close_peerfd)
0849 close(peerfd);
0850
0851 if (cfg_time) {
0852 unsigned int delta_ms;
0853
0854 if (clock_gettime(CLOCK_MONOTONIC, &end) < 0)
0855 xerror("can not fetch end time %d", errno);
0856 delta_ms = (end.tv_sec - start.tv_sec) * 1000 + (end.tv_nsec - start.tv_nsec) / 1000000;
0857 if (delta_ms > cfg_time) {
0858 xerror("transfer slower than expected! runtime %d ms, expected %d ms",
0859 delta_ms, cfg_time);
0860 }
0861
0862
0863
0864
0865 if (in_closed_after_out)
0866 fprintf(stderr, "%d", delta_ms);
0867 }
0868
0869 return 0;
0870 }
0871
0872 static void check_sockaddr(int pf, struct sockaddr_storage *ss,
0873 socklen_t salen)
0874 {
0875 struct sockaddr_in6 *sin6;
0876 struct sockaddr_in *sin;
0877 socklen_t wanted_size = 0;
0878
0879 switch (pf) {
0880 case AF_INET:
0881 wanted_size = sizeof(*sin);
0882 sin = (void *)ss;
0883 if (!sin->sin_port)
0884 fprintf(stderr, "accept: something wrong: ip connection from port 0");
0885 break;
0886 case AF_INET6:
0887 wanted_size = sizeof(*sin6);
0888 sin6 = (void *)ss;
0889 if (!sin6->sin6_port)
0890 fprintf(stderr, "accept: something wrong: ipv6 connection from port 0");
0891 break;
0892 default:
0893 fprintf(stderr, "accept: Unknown pf %d, salen %u\n", pf, salen);
0894 return;
0895 }
0896
0897 if (salen != wanted_size)
0898 fprintf(stderr, "accept: size mismatch, got %d expected %d\n",
0899 (int)salen, wanted_size);
0900
0901 if (ss->ss_family != pf)
0902 fprintf(stderr, "accept: pf mismatch, expect %d, ss_family is %d\n",
0903 (int)ss->ss_family, pf);
0904 }
0905
0906 static void check_getpeername(int fd, struct sockaddr_storage *ss, socklen_t salen)
0907 {
0908 struct sockaddr_storage peerss;
0909 socklen_t peersalen = sizeof(peerss);
0910
0911 if (getpeername(fd, (struct sockaddr *)&peerss, &peersalen) < 0) {
0912 perror("getpeername");
0913 return;
0914 }
0915
0916 if (peersalen != salen) {
0917 fprintf(stderr, "%s: %d vs %d\n", __func__, peersalen, salen);
0918 return;
0919 }
0920
0921 if (memcmp(ss, &peerss, peersalen)) {
0922 char a[INET6_ADDRSTRLEN];
0923 char b[INET6_ADDRSTRLEN];
0924 char c[INET6_ADDRSTRLEN];
0925 char d[INET6_ADDRSTRLEN];
0926
0927 xgetnameinfo((struct sockaddr *)ss, salen,
0928 a, sizeof(a), b, sizeof(b));
0929
0930 xgetnameinfo((struct sockaddr *)&peerss, peersalen,
0931 c, sizeof(c), d, sizeof(d));
0932
0933 fprintf(stderr, "%s: memcmp failure: accept %s vs peername %s, %s vs %s salen %d vs %d\n",
0934 __func__, a, c, b, d, peersalen, salen);
0935 }
0936 }
0937
0938 static void check_getpeername_connect(int fd)
0939 {
0940 struct sockaddr_storage ss;
0941 socklen_t salen = sizeof(ss);
0942 char a[INET6_ADDRSTRLEN];
0943 char b[INET6_ADDRSTRLEN];
0944
0945 if (getpeername(fd, (struct sockaddr *)&ss, &salen) < 0) {
0946 perror("getpeername");
0947 return;
0948 }
0949
0950 xgetnameinfo((struct sockaddr *)&ss, salen,
0951 a, sizeof(a), b, sizeof(b));
0952
0953 if (strcmp(cfg_host, a) || strcmp(cfg_port, b))
0954 fprintf(stderr, "%s: %s vs %s, %s vs %s\n", __func__,
0955 cfg_host, a, cfg_port, b);
0956 }
0957
0958 static void maybe_close(int fd)
0959 {
0960 unsigned int r = rand();
0961
0962 if (!(cfg_join || cfg_remove || cfg_repeat > 1) && (r & 1))
0963 close(fd);
0964 }
0965
0966 int main_loop_s(int listensock)
0967 {
0968 struct sockaddr_storage ss;
0969 struct pollfd polls;
0970 socklen_t salen;
0971 int remotesock;
0972 int fd = 0;
0973
0974 again:
0975 polls.fd = listensock;
0976 polls.events = POLLIN;
0977
0978 switch (poll(&polls, 1, poll_timeout)) {
0979 case -1:
0980 perror("poll");
0981 return 1;
0982 case 0:
0983 fprintf(stderr, "%s: timed out\n", __func__);
0984 close(listensock);
0985 return 2;
0986 }
0987
0988 salen = sizeof(ss);
0989 remotesock = accept(listensock, (struct sockaddr *)&ss, &salen);
0990 if (remotesock >= 0) {
0991 maybe_close(listensock);
0992 check_sockaddr(pf, &ss, salen);
0993 check_getpeername(remotesock, &ss, salen);
0994
0995 if (cfg_input) {
0996 fd = open(cfg_input, O_RDONLY);
0997 if (fd < 0)
0998 xerror("can't open %s: %d", cfg_input, errno);
0999 }
1000
1001 SOCK_TEST_TCPULP(remotesock, 0);
1002
1003 copyfd_io(fd, remotesock, 1, true);
1004 } else {
1005 perror("accept");
1006 return 1;
1007 }
1008
1009 if (--cfg_repeat > 0) {
1010 if (cfg_input)
1011 close(fd);
1012 goto again;
1013 }
1014
1015 return 0;
1016 }
1017
1018 static void init_rng(void)
1019 {
1020 int fd = open("/dev/urandom", O_RDONLY);
1021 unsigned int foo;
1022
1023 if (fd > 0) {
1024 int ret = read(fd, &foo, sizeof(foo));
1025
1026 if (ret < 0)
1027 srand(fd + foo);
1028 close(fd);
1029 }
1030
1031 srand(foo);
1032 }
1033
1034 static void xsetsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen)
1035 {
1036 int err;
1037
1038 err = setsockopt(fd, level, optname, optval, optlen);
1039 if (err) {
1040 perror("setsockopt");
1041 exit(1);
1042 }
1043 }
1044
1045 static void apply_cmsg_types(int fd, const struct cfg_cmsg_types *cmsg)
1046 {
1047 static const unsigned int on = 1;
1048
1049 if (cmsg->timestampns)
1050 xsetsockopt(fd, SOL_SOCKET, SO_TIMESTAMPNS_NEW, &on, sizeof(on));
1051 if (cmsg->tcp_inq)
1052 xsetsockopt(fd, IPPROTO_TCP, TCP_INQ, &on, sizeof(on));
1053 }
1054
1055 static void parse_cmsg_types(const char *type)
1056 {
1057 char *next = strchr(type, ',');
1058 unsigned int len = 0;
1059
1060 cfg_cmsg_types.cmsg_enabled = 1;
1061
1062 if (next) {
1063 parse_cmsg_types(next + 1);
1064 len = next - type;
1065 } else {
1066 len = strlen(type);
1067 }
1068
1069 if (strncmp(type, "TIMESTAMPNS", len) == 0) {
1070 cfg_cmsg_types.timestampns = 1;
1071 return;
1072 }
1073
1074 if (strncmp(type, "TCPINQ", len) == 0) {
1075 cfg_cmsg_types.tcp_inq = 1;
1076 return;
1077 }
1078
1079 fprintf(stderr, "Unrecognized cmsg option %s\n", type);
1080 exit(1);
1081 }
1082
1083 static void parse_setsock_options(const char *name)
1084 {
1085 char *next = strchr(name, ',');
1086 unsigned int len = 0;
1087
1088 if (next) {
1089 parse_setsock_options(next + 1);
1090 len = next - name;
1091 } else {
1092 len = strlen(name);
1093 }
1094
1095 if (strncmp(name, "TRANSPARENT", len) == 0) {
1096 cfg_sockopt_types.transparent = 1;
1097 return;
1098 }
1099
1100 fprintf(stderr, "Unrecognized setsockopt option %s\n", name);
1101 exit(1);
1102 }
1103
1104 void xdisconnect(int fd, int addrlen)
1105 {
1106 struct sockaddr_storage empty;
1107 int msec_sleep = 10;
1108 int queued = 1;
1109 int i;
1110
1111 shutdown(fd, SHUT_WR);
1112
1113
1114
1115
1116 for (i = 0; ; i += msec_sleep) {
1117 if (ioctl(fd, SIOCOUTQ, &queued) < 0)
1118 xerror("can't query out socket queue: %d", errno);
1119
1120 if (!queued)
1121 break;
1122
1123 if (i > poll_timeout)
1124 xerror("timeout while waiting for spool to complete");
1125 usleep(msec_sleep * 1000);
1126 }
1127
1128 memset(&empty, 0, sizeof(empty));
1129 empty.ss_family = AF_UNSPEC;
1130 if (connect(fd, (struct sockaddr *)&empty, addrlen) < 0)
1131 xerror("can't disconnect: %d", errno);
1132 }
1133
1134 int main_loop(void)
1135 {
1136 int fd, ret, fd_in = 0;
1137 struct addrinfo *peer;
1138
1139
1140 fd = sock_connect_mptcp(cfg_host, cfg_port, cfg_sock_proto, &peer);
1141 if (fd < 0)
1142 return 2;
1143
1144 again:
1145 check_getpeername_connect(fd);
1146
1147 SOCK_TEST_TCPULP(fd, cfg_sock_proto);
1148
1149 if (cfg_rcvbuf)
1150 set_rcvbuf(fd, cfg_rcvbuf);
1151 if (cfg_sndbuf)
1152 set_sndbuf(fd, cfg_sndbuf);
1153 if (cfg_cmsg_types.cmsg_enabled)
1154 apply_cmsg_types(fd, &cfg_cmsg_types);
1155
1156 if (cfg_input) {
1157 fd_in = open(cfg_input, O_RDONLY);
1158 if (fd < 0)
1159 xerror("can't open %s:%d", cfg_input, errno);
1160 }
1161
1162
1163 ret = copyfd_io(fd_in, fd, 1, cfg_repeat == 1);
1164 if (ret)
1165 return ret;
1166
1167 if (--cfg_repeat > 0) {
1168 xdisconnect(fd, peer->ai_addrlen);
1169
1170
1171
1172
1173 set_nonblock(fd, false);
1174 if (connect(fd, peer->ai_addr, peer->ai_addrlen))
1175 xerror("can't reconnect: %d", errno);
1176 if (cfg_input)
1177 close(fd_in);
1178 goto again;
1179 }
1180 return 0;
1181 }
1182
1183 int parse_proto(const char *proto)
1184 {
1185 if (!strcasecmp(proto, "MPTCP"))
1186 return IPPROTO_MPTCP;
1187 if (!strcasecmp(proto, "TCP"))
1188 return IPPROTO_TCP;
1189
1190 fprintf(stderr, "Unknown protocol: %s\n.", proto);
1191 die_usage();
1192
1193
1194 return 0;
1195 }
1196
1197 int parse_mode(const char *mode)
1198 {
1199 if (!strcasecmp(mode, "poll"))
1200 return CFG_MODE_POLL;
1201 if (!strcasecmp(mode, "mmap"))
1202 return CFG_MODE_MMAP;
1203 if (!strcasecmp(mode, "sendfile"))
1204 return CFG_MODE_SENDFILE;
1205
1206 fprintf(stderr, "Unknown test mode: %s\n", mode);
1207 fprintf(stderr, "Supported modes are:\n");
1208 fprintf(stderr, "\t\t\"poll\" - interleaved read/write using poll()\n");
1209 fprintf(stderr, "\t\t\"mmap\" - send entire input file (mmap+write), then read response (-l will read input first)\n");
1210 fprintf(stderr, "\t\t\"sendfile\" - send entire input file (sendfile), then read response (-l will read input first)\n");
1211
1212 die_usage();
1213
1214
1215 return 0;
1216 }
1217
1218 int parse_peek(const char *mode)
1219 {
1220 if (!strcasecmp(mode, "saveWithPeek"))
1221 return CFG_WITH_PEEK;
1222 if (!strcasecmp(mode, "saveAfterPeek"))
1223 return CFG_AFTER_PEEK;
1224
1225 fprintf(stderr, "Unknown: %s\n", mode);
1226 fprintf(stderr, "Supported MSG_PEEK mode are:\n");
1227 fprintf(stderr,
1228 "\t\t\"saveWithPeek\" - recv data with flags 'MSG_PEEK' and save the peek data into file\n");
1229 fprintf(stderr,
1230 "\t\t\"saveAfterPeek\" - read and save data into file after recv with flags 'MSG_PEEK'\n");
1231
1232 die_usage();
1233
1234
1235 return 0;
1236 }
1237
1238 static int parse_int(const char *size)
1239 {
1240 unsigned long s;
1241
1242 errno = 0;
1243
1244 s = strtoul(size, NULL, 0);
1245
1246 if (errno) {
1247 fprintf(stderr, "Invalid sndbuf size %s (%s)\n",
1248 size, strerror(errno));
1249 die_usage();
1250 }
1251
1252 if (s > INT_MAX) {
1253 fprintf(stderr, "Invalid sndbuf size %s (%s)\n",
1254 size, strerror(ERANGE));
1255 die_usage();
1256 }
1257
1258 return (int)s;
1259 }
1260
1261 static void parse_opts(int argc, char **argv)
1262 {
1263 int c;
1264
1265 while ((c = getopt(argc, argv, "6c:hi:I:jlm:M:o:p:P:r:R:s:S:t:T:w:")) != -1) {
1266 switch (c) {
1267 case 'j':
1268 cfg_join = true;
1269 cfg_mode = CFG_MODE_POLL;
1270 break;
1271 case 'r':
1272 cfg_remove = true;
1273 cfg_mode = CFG_MODE_POLL;
1274 cfg_wait = 400000;
1275 cfg_do_w = atoi(optarg);
1276 if (cfg_do_w <= 0)
1277 cfg_do_w = 50;
1278 break;
1279 case 'i':
1280 cfg_input = optarg;
1281 break;
1282 case 'I':
1283 cfg_repeat = atoi(optarg);
1284 break;
1285 case 'l':
1286 listen_mode = true;
1287 break;
1288 case 'p':
1289 cfg_port = optarg;
1290 break;
1291 case 's':
1292 cfg_sock_proto = parse_proto(optarg);
1293 break;
1294 case 'h':
1295 die_usage();
1296 break;
1297 case '6':
1298 pf = AF_INET6;
1299 break;
1300 case 't':
1301 poll_timeout = atoi(optarg) * 1000;
1302 if (poll_timeout <= 0)
1303 poll_timeout = -1;
1304 break;
1305 case 'T':
1306 cfg_time = atoi(optarg);
1307 break;
1308 case 'm':
1309 cfg_mode = parse_mode(optarg);
1310 break;
1311 case 'S':
1312 cfg_sndbuf = parse_int(optarg);
1313 break;
1314 case 'R':
1315 cfg_rcvbuf = parse_int(optarg);
1316 break;
1317 case 'w':
1318 cfg_wait = atoi(optarg)*1000000;
1319 break;
1320 case 'M':
1321 cfg_mark = strtol(optarg, NULL, 0);
1322 break;
1323 case 'P':
1324 cfg_peek = parse_peek(optarg);
1325 break;
1326 case 'c':
1327 parse_cmsg_types(optarg);
1328 break;
1329 case 'o':
1330 parse_setsock_options(optarg);
1331 break;
1332 }
1333 }
1334
1335 if (optind + 1 != argc)
1336 die_usage();
1337 cfg_host = argv[optind];
1338
1339 if (strchr(cfg_host, ':'))
1340 pf = AF_INET6;
1341 }
1342
1343 int main(int argc, char *argv[])
1344 {
1345 init_rng();
1346
1347 signal(SIGUSR1, handle_signal);
1348 parse_opts(argc, argv);
1349
1350 if (listen_mode) {
1351 int fd = sock_listen_mptcp(cfg_host, cfg_port);
1352
1353 if (fd < 0)
1354 return 1;
1355
1356 if (cfg_rcvbuf)
1357 set_rcvbuf(fd, cfg_rcvbuf);
1358 if (cfg_sndbuf)
1359 set_sndbuf(fd, cfg_sndbuf);
1360 if (cfg_mark)
1361 set_mark(fd, cfg_mark);
1362 if (cfg_cmsg_types.cmsg_enabled)
1363 apply_cmsg_types(fd, &cfg_cmsg_types);
1364
1365 return main_loop_s(fd);
1366 }
1367
1368 return main_loop();
1369 }