0001
0002
0003
0004
0005
0006
0007 #define _GNU_SOURCE
0008
0009 #include <arpa/inet.h>
0010 #include <errno.h>
0011 #include <error.h>
0012 #include <linux/filter.h>
0013 #include <linux/bpf.h>
0014 #include <linux/in.h>
0015 #include <linux/unistd.h>
0016 #include <sched.h>
0017 #include <stdio.h>
0018 #include <stdlib.h>
0019 #include <string.h>
0020 #include <sys/epoll.h>
0021 #include <sys/types.h>
0022 #include <sys/socket.h>
0023 #include <unistd.h>
0024 #include <numa.h>
0025
0026 #include "../kselftest.h"
0027
0028 static const int PORT = 8888;
0029
0030 static void build_rcv_group(int *rcv_fd, size_t len, int family, int proto)
0031 {
0032 struct sockaddr_storage addr;
0033 struct sockaddr_in *addr4;
0034 struct sockaddr_in6 *addr6;
0035 size_t i;
0036 int opt;
0037
0038 switch (family) {
0039 case AF_INET:
0040 addr4 = (struct sockaddr_in *)&addr;
0041 addr4->sin_family = AF_INET;
0042 addr4->sin_addr.s_addr = htonl(INADDR_ANY);
0043 addr4->sin_port = htons(PORT);
0044 break;
0045 case AF_INET6:
0046 addr6 = (struct sockaddr_in6 *)&addr;
0047 addr6->sin6_family = AF_INET6;
0048 addr6->sin6_addr = in6addr_any;
0049 addr6->sin6_port = htons(PORT);
0050 break;
0051 default:
0052 error(1, 0, "Unsupported family %d", family);
0053 }
0054
0055 for (i = 0; i < len; ++i) {
0056 rcv_fd[i] = socket(family, proto, 0);
0057 if (rcv_fd[i] < 0)
0058 error(1, errno, "failed to create receive socket");
0059
0060 opt = 1;
0061 if (setsockopt(rcv_fd[i], SOL_SOCKET, SO_REUSEPORT, &opt,
0062 sizeof(opt)))
0063 error(1, errno, "failed to set SO_REUSEPORT");
0064
0065 if (bind(rcv_fd[i], (struct sockaddr *)&addr, sizeof(addr)))
0066 error(1, errno, "failed to bind receive socket");
0067
0068 if (proto == SOCK_STREAM && listen(rcv_fd[i], len * 10))
0069 error(1, errno, "failed to listen on receive port");
0070 }
0071 }
0072
0073 static void attach_bpf(int fd)
0074 {
0075 static char bpf_log_buf[65536];
0076 static const char bpf_license[] = "";
0077
0078 int bpf_fd;
0079 const struct bpf_insn prog[] = {
0080
0081 { BPF_JMP | BPF_CALL, 0, 0, 0, BPF_FUNC_get_numa_node_id },
0082
0083 { BPF_JMP | BPF_EXIT, 0, 0, 0, 0 }
0084 };
0085 union bpf_attr attr;
0086
0087 memset(&attr, 0, sizeof(attr));
0088 attr.prog_type = BPF_PROG_TYPE_SOCKET_FILTER;
0089 attr.insn_cnt = ARRAY_SIZE(prog);
0090 attr.insns = (unsigned long) &prog;
0091 attr.license = (unsigned long) &bpf_license;
0092 attr.log_buf = (unsigned long) &bpf_log_buf;
0093 attr.log_size = sizeof(bpf_log_buf);
0094 attr.log_level = 1;
0095
0096 bpf_fd = syscall(__NR_bpf, BPF_PROG_LOAD, &attr, sizeof(attr));
0097 if (bpf_fd < 0)
0098 error(1, errno, "ebpf error. log:\n%s\n", bpf_log_buf);
0099
0100 if (setsockopt(fd, SOL_SOCKET, SO_ATTACH_REUSEPORT_EBPF, &bpf_fd,
0101 sizeof(bpf_fd)))
0102 error(1, errno, "failed to set SO_ATTACH_REUSEPORT_EBPF");
0103
0104 close(bpf_fd);
0105 }
0106
0107 static void send_from_node(int node_id, int family, int proto)
0108 {
0109 struct sockaddr_storage saddr, daddr;
0110 struct sockaddr_in *saddr4, *daddr4;
0111 struct sockaddr_in6 *saddr6, *daddr6;
0112 int fd;
0113
0114 switch (family) {
0115 case AF_INET:
0116 saddr4 = (struct sockaddr_in *)&saddr;
0117 saddr4->sin_family = AF_INET;
0118 saddr4->sin_addr.s_addr = htonl(INADDR_ANY);
0119 saddr4->sin_port = 0;
0120
0121 daddr4 = (struct sockaddr_in *)&daddr;
0122 daddr4->sin_family = AF_INET;
0123 daddr4->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
0124 daddr4->sin_port = htons(PORT);
0125 break;
0126 case AF_INET6:
0127 saddr6 = (struct sockaddr_in6 *)&saddr;
0128 saddr6->sin6_family = AF_INET6;
0129 saddr6->sin6_addr = in6addr_any;
0130 saddr6->sin6_port = 0;
0131
0132 daddr6 = (struct sockaddr_in6 *)&daddr;
0133 daddr6->sin6_family = AF_INET6;
0134 daddr6->sin6_addr = in6addr_loopback;
0135 daddr6->sin6_port = htons(PORT);
0136 break;
0137 default:
0138 error(1, 0, "Unsupported family %d", family);
0139 }
0140
0141 if (numa_run_on_node(node_id) < 0)
0142 error(1, errno, "failed to pin to node");
0143
0144 fd = socket(family, proto, 0);
0145 if (fd < 0)
0146 error(1, errno, "failed to create send socket");
0147
0148 if (bind(fd, (struct sockaddr *)&saddr, sizeof(saddr)))
0149 error(1, errno, "failed to bind send socket");
0150
0151 if (connect(fd, (struct sockaddr *)&daddr, sizeof(daddr)))
0152 error(1, errno, "failed to connect send socket");
0153
0154 if (send(fd, "a", 1, 0) < 0)
0155 error(1, errno, "failed to send message");
0156
0157 close(fd);
0158 }
0159
0160 static
0161 void receive_on_node(int *rcv_fd, int len, int epfd, int node_id, int proto)
0162 {
0163 struct epoll_event ev;
0164 int i, fd;
0165 char buf[8];
0166
0167 i = epoll_wait(epfd, &ev, 1, -1);
0168 if (i < 0)
0169 error(1, errno, "epoll_wait failed");
0170
0171 if (proto == SOCK_STREAM) {
0172 fd = accept(ev.data.fd, NULL, NULL);
0173 if (fd < 0)
0174 error(1, errno, "failed to accept");
0175 i = recv(fd, buf, sizeof(buf), 0);
0176 close(fd);
0177 } else {
0178 i = recv(ev.data.fd, buf, sizeof(buf), 0);
0179 }
0180
0181 if (i < 0)
0182 error(1, errno, "failed to recv");
0183
0184 for (i = 0; i < len; ++i)
0185 if (ev.data.fd == rcv_fd[i])
0186 break;
0187 if (i == len)
0188 error(1, 0, "failed to find socket");
0189 fprintf(stderr, "send node %d, receive socket %d\n", node_id, i);
0190 if (node_id != i)
0191 error(1, 0, "node id/receive socket mismatch");
0192 }
0193
0194 static void test(int *rcv_fd, int len, int family, int proto)
0195 {
0196 struct epoll_event ev;
0197 int epfd, node;
0198
0199 build_rcv_group(rcv_fd, len, family, proto);
0200 attach_bpf(rcv_fd[0]);
0201
0202 epfd = epoll_create(1);
0203 if (epfd < 0)
0204 error(1, errno, "failed to create epoll");
0205 for (node = 0; node < len; ++node) {
0206 ev.events = EPOLLIN;
0207 ev.data.fd = rcv_fd[node];
0208 if (epoll_ctl(epfd, EPOLL_CTL_ADD, rcv_fd[node], &ev))
0209 error(1, errno, "failed to register sock epoll");
0210 }
0211
0212
0213 for (node = 0; node < len; ++node) {
0214 if (!numa_bitmask_isbitset(numa_nodes_ptr, node))
0215 continue;
0216 send_from_node(node, family, proto);
0217 receive_on_node(rcv_fd, len, epfd, node, proto);
0218 }
0219
0220
0221 for (node = len - 1; node >= 0; --node) {
0222 if (!numa_bitmask_isbitset(numa_nodes_ptr, node))
0223 continue;
0224 send_from_node(node, family, proto);
0225 receive_on_node(rcv_fd, len, epfd, node, proto);
0226 }
0227
0228 close(epfd);
0229 for (node = 0; node < len; ++node)
0230 close(rcv_fd[node]);
0231 }
0232
0233 int main(void)
0234 {
0235 int *rcv_fd, nodes;
0236
0237 if (numa_available() < 0)
0238 ksft_exit_skip("no numa api support\n");
0239
0240 nodes = numa_max_node() + 1;
0241
0242 rcv_fd = calloc(nodes, sizeof(int));
0243 if (!rcv_fd)
0244 error(1, 0, "failed to allocate array");
0245
0246 fprintf(stderr, "---- IPv4 UDP ----\n");
0247 test(rcv_fd, nodes, AF_INET, SOCK_DGRAM);
0248
0249 fprintf(stderr, "---- IPv6 UDP ----\n");
0250 test(rcv_fd, nodes, AF_INET6, SOCK_DGRAM);
0251
0252 fprintf(stderr, "---- IPv4 TCP ----\n");
0253 test(rcv_fd, nodes, AF_INET, SOCK_STREAM);
0254
0255 fprintf(stderr, "---- IPv6 TCP ----\n");
0256 test(rcv_fd, nodes, AF_INET6, SOCK_STREAM);
0257
0258 free(rcv_fd);
0259
0260 fprintf(stderr, "SUCCESS\n");
0261 return 0;
0262 }