1 /* SPDX-License-Identifier: MIT */
2 /* based on linux-kernel/tools/testing/selftests/net/msg_zerocopy.c */
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <stdint.h>
6 #include <assert.h>
7 #include <errno.h>
8 #include <limits.h>
9 #include <fcntl.h>
10 #include <unistd.h>
11 #include <stdbool.h>
12 #include <stdarg.h>
13 #include <string.h>
14 #include <pthread.h>
15
16 #include <poll.h>
17 #include <sched.h>
18 #include <arpa/inet.h>
19 #include <linux/if_packet.h>
20 #include <linux/ipv6.h>
21 #include <linux/socket.h>
22 #include <linux/sockios.h>
23 #include <net/ethernet.h>
24 #include <net/if.h>
25 #include <netinet/ip.h>
26 #include <netinet/in.h>
27 #include <netinet/ip6.h>
28 #include <netinet/tcp.h>
29 #include <netinet/udp.h>
30 #include <sys/socket.h>
31 #include <sys/time.h>
32 #include <sys/resource.h>
33 #include <sys/un.h>
34 #include <sys/ioctl.h>
35 #include <sys/socket.h>
36 #include <sys/stat.h>
37 #include <sys/time.h>
38 #include <sys/types.h>
39 #include <sys/wait.h>
40 #include <sys/mman.h>
41 #include <linux/mman.h>
42 #include <signal.h>
43
44 #include "liburing.h"
45
46 #define ZC_TAG 0xfffffffULL
47 #define MAX_SUBMIT_NR 512
48 #define MAX_THREADS 100
49
50 struct thread_data {
51 pthread_t thread;
52 void *ret;
53 int idx;
54 unsigned long long packets;
55 unsigned long long bytes;
56 unsigned long long dt_ms;
57 struct sockaddr_storage dst_addr;
58 int fd;
59 };
60
61 static bool cfg_reg_ringfd = true;
62 static bool cfg_fixed_files = 1;
63 static bool cfg_zc = 1;
64 static int cfg_nr_reqs = 8;
65 static bool cfg_fixed_buf = 1;
66 static bool cfg_hugetlb = 0;
67 static bool cfg_defer_taskrun = 0;
68 static int cfg_cpu = -1;
69 static bool cfg_rx = 0;
70 static unsigned cfg_nr_threads = 1;
71
72 static int cfg_family = PF_UNSPEC;
73 static int cfg_type = 0;
74 static int cfg_payload_len;
75 static int cfg_port = 8000;
76 static int cfg_runtime_ms = 4200;
77 static bool cfg_rx_poll = false;
78
79 static socklen_t cfg_alen;
80 static char *str_addr = NULL;
81
82 static char payload_buf[IP_MAXPACKET] __attribute__((aligned(4096)));
83 static char *payload;
84 static struct thread_data threads[MAX_THREADS];
85 static pthread_barrier_t barrier;
86
87 static bool should_stop = false;
88
sigint_handler(int sig)89 static void sigint_handler(__attribute__((__unused__)) int sig)
90 {
91 /* kill if should_stop can't unblock threads fast enough */
92 if (should_stop)
93 _exit(-1);
94 should_stop = true;
95 }
96
97 /*
98 * Implementation of error(3), prints an error message and exits.
99 */
t_error(int status,int errnum,const char * format,...)100 static void t_error(int status, int errnum, const char *format, ...)
101 {
102 va_list args;
103 va_start(args, format);
104
105 vfprintf(stderr, format, args);
106 if (errnum)
107 fprintf(stderr, ": %s", strerror(errnum));
108
109 fprintf(stderr, "\n");
110 va_end(args);
111 exit(status);
112 }
113
set_cpu_affinity(void)114 static void set_cpu_affinity(void)
115 {
116 cpu_set_t mask;
117
118 if (cfg_cpu == -1)
119 return;
120
121 CPU_ZERO(&mask);
122 CPU_SET(cfg_cpu, &mask);
123 if (sched_setaffinity(0, sizeof(mask), &mask))
124 t_error(1, errno, "unable to pin cpu\n");
125 }
126
set_iowq_affinity(struct io_uring * ring)127 static void set_iowq_affinity(struct io_uring *ring)
128 {
129 cpu_set_t mask;
130 int ret;
131
132 if (cfg_cpu == -1)
133 return;
134
135 CPU_ZERO(&mask);
136 CPU_SET(cfg_cpu, &mask);
137 ret = io_uring_register_iowq_aff(ring, 1, &mask);
138 if (ret)
139 t_error(1, ret, "unabled to set io-wq affinity\n");
140 }
141
gettimeofday_ms(void)142 static unsigned long gettimeofday_ms(void)
143 {
144 struct timeval tv;
145
146 gettimeofday(&tv, NULL);
147 return (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
148 }
149
do_setsockopt(int fd,int level,int optname,int val)150 static void do_setsockopt(int fd, int level, int optname, int val)
151 {
152 if (setsockopt(fd, level, optname, &val, sizeof(val)))
153 t_error(1, errno, "setsockopt %d.%d: %d", level, optname, val);
154 }
155
setup_sockaddr(int domain,const char * str_addr,struct sockaddr_storage * sockaddr)156 static void setup_sockaddr(int domain, const char *str_addr,
157 struct sockaddr_storage *sockaddr)
158 {
159 struct sockaddr_in6 *addr6 = (void *) sockaddr;
160 struct sockaddr_in *addr4 = (void *) sockaddr;
161 int port = cfg_port;
162
163 switch (domain) {
164 case PF_INET:
165 memset(addr4, 0, sizeof(*addr4));
166 addr4->sin_family = AF_INET;
167 addr4->sin_port = htons(port);
168 if (str_addr &&
169 inet_pton(AF_INET, str_addr, &(addr4->sin_addr)) != 1)
170 t_error(1, 0, "ipv4 parse error: %s", str_addr);
171 break;
172 case PF_INET6:
173 memset(addr6, 0, sizeof(*addr6));
174 addr6->sin6_family = AF_INET6;
175 addr6->sin6_port = htons(port);
176 if (str_addr &&
177 inet_pton(AF_INET6, str_addr, &(addr6->sin6_addr)) != 1)
178 t_error(1, 0, "ipv6 parse error: %s", str_addr);
179 break;
180 default:
181 t_error(1, 0, "illegal domain");
182 }
183 }
184
do_poll(int fd,int events)185 static int do_poll(int fd, int events)
186 {
187 struct pollfd pfd;
188 int ret;
189
190 pfd.events = events;
191 pfd.revents = 0;
192 pfd.fd = fd;
193
194 ret = poll(&pfd, 1, -1);
195 if (ret == -1)
196 t_error(1, errno, "poll");
197
198 return ret && (pfd.revents & events);
199 }
200
201 /* Flush all outstanding bytes for the tcp receive queue */
do_flush_tcp(struct thread_data * td,int fd)202 static int do_flush_tcp(struct thread_data *td, int fd)
203 {
204 int ret;
205
206 /* MSG_TRUNC flushes up to len bytes */
207 ret = recv(fd, NULL, 1 << 21, MSG_TRUNC | MSG_DONTWAIT);
208 if (ret == -1 && errno == EAGAIN)
209 return 0;
210 if (ret == -1)
211 t_error(1, errno, "flush");
212 if (!ret)
213 return 1;
214
215 td->packets++;
216 td->bytes += ret;
217 return 0;
218 }
219
220 /* Flush all outstanding datagrams. Verify first few bytes of each. */
do_flush_datagram(struct thread_data * td,int fd)221 static int do_flush_datagram(struct thread_data *td, int fd)
222 {
223 long ret, off = 0;
224 char buf[64];
225
226 /* MSG_TRUNC will return full datagram length */
227 ret = recv(fd, buf, sizeof(buf), MSG_DONTWAIT | MSG_TRUNC);
228 if (ret == -1 && errno == EAGAIN)
229 return 0;
230
231 if (ret == -1)
232 t_error(1, errno, "recv");
233 if (ret != cfg_payload_len)
234 t_error(1, 0, "recv: ret=%u != %u", ret, cfg_payload_len);
235 if ((unsigned long) ret > sizeof(buf) - off)
236 ret = sizeof(buf) - off;
237 if (memcmp(buf + off, payload, ret))
238 t_error(1, 0, "recv: data mismatch");
239
240 td->packets++;
241 td->bytes += cfg_payload_len;
242 return 0;
243 }
244
do_setup_rx(int domain,int type,int protocol)245 static void do_setup_rx(int domain, int type, int protocol)
246 {
247 struct sockaddr_storage addr = {};
248 struct thread_data *td;
249 int listen_fd, fd;
250 unsigned int i;
251
252 fd = socket(domain, type, protocol);
253 if (fd == -1)
254 t_error(1, errno, "socket r");
255
256 do_setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
257
258 setup_sockaddr(cfg_family, str_addr, &addr);
259
260 if (bind(fd, (void *)&addr, cfg_alen))
261 t_error(1, errno, "bind");
262
263 if (type != SOCK_STREAM) {
264 if (cfg_nr_threads != 1)
265 t_error(1, 0, "udp rx cant multithread");
266 threads[0].fd = fd;
267 return;
268 }
269
270 listen_fd = fd;
271 if (listen(listen_fd, cfg_nr_threads))
272 t_error(1, errno, "listen");
273
274 for (i = 0; i < cfg_nr_threads; i++) {
275 td = &threads[i];
276
277 fd = accept(listen_fd, NULL, NULL);
278 if (fd == -1)
279 t_error(1, errno, "accept");
280 td->fd = fd;
281 }
282
283 if (close(listen_fd))
284 t_error(1, errno, "close listen sock");
285 }
286
do_rx(void * arg)287 static void *do_rx(void *arg)
288 {
289 struct thread_data *td = arg;
290 const int cfg_receiver_wait_ms = 400;
291 uint64_t tstop;
292 int ret, fd = td->fd;
293
294 tstop = gettimeofday_ms() + cfg_runtime_ms + cfg_receiver_wait_ms;
295 do {
296 if (cfg_type == SOCK_STREAM)
297 ret = do_flush_tcp(td, fd);
298 else
299 ret = do_flush_datagram(td, fd);
300
301 if (ret)
302 break;
303
304 do_poll(fd, POLLIN);
305 } while (gettimeofday_ms() < tstop);
306
307 if (close(fd))
308 t_error(1, errno, "close");
309 pthread_exit(&td->ret);
310 return NULL;
311 }
312
wait_cqe_fast(struct io_uring * ring)313 static inline struct io_uring_cqe *wait_cqe_fast(struct io_uring *ring)
314 {
315 struct io_uring_cqe *cqe;
316 unsigned head;
317 int ret;
318
319 io_uring_for_each_cqe(ring, head, cqe)
320 return cqe;
321
322 ret = io_uring_wait_cqe(ring, &cqe);
323 if (ret)
324 t_error(1, ret, "wait cqe");
325 return cqe;
326 }
327
do_tx(struct thread_data * td,int domain,int type,int protocol)328 static void do_tx(struct thread_data *td, int domain, int type, int protocol)
329 {
330 const int notif_slack = 128;
331 struct io_uring ring;
332 struct iovec iov;
333 uint64_t tstart;
334 int i, fd, ret;
335 int compl_cqes = 0;
336 int ring_flags = IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER;
337 unsigned loop = 0;
338
339 if (cfg_defer_taskrun)
340 ring_flags |= IORING_SETUP_DEFER_TASKRUN;
341
342 fd = socket(domain, type, protocol);
343 if (fd == -1)
344 t_error(1, errno, "socket t");
345
346 if (connect(fd, (void *)&td->dst_addr, cfg_alen))
347 t_error(1, errno, "connect, idx %i", td->idx);
348
349 ret = io_uring_queue_init(512, &ring, ring_flags);
350 if (ret)
351 t_error(1, ret, "io_uring: queue init");
352
353 set_cpu_affinity();
354 set_iowq_affinity(&ring);
355
356 if (cfg_fixed_files) {
357 ret = io_uring_register_files(&ring, &fd, 1);
358 if (ret < 0)
359 t_error(1, ret, "io_uring: files registration");
360 }
361 if (cfg_reg_ringfd) {
362 ret = io_uring_register_ring_fd(&ring);
363 if (ret < 0)
364 t_error(1, ret, "io_uring: io_uring_register_ring_fd");
365 }
366
367 iov.iov_base = payload;
368 iov.iov_len = cfg_payload_len;
369
370 ret = io_uring_register_buffers(&ring, &iov, 1);
371 if (ret)
372 t_error(1, ret, "io_uring: buffer registration");
373
374 if (cfg_rx_poll) {
375 struct io_uring_sqe *sqe;
376
377 sqe = io_uring_get_sqe(&ring);
378 io_uring_prep_poll_add(sqe, fd, POLLIN);
379
380 ret = io_uring_submit(&ring);
381 if (ret != 1)
382 t_error(1, ret, "submit poll");
383 }
384
385 pthread_barrier_wait(&barrier);
386
387 tstart = gettimeofday_ms();
388 do {
389 struct io_uring_sqe *sqe;
390 struct io_uring_cqe *cqe;
391 unsigned buf_idx = 0;
392 unsigned msg_flags = MSG_WAITALL;
393
394 for (i = 0; i < cfg_nr_reqs; i++) {
395 sqe = io_uring_get_sqe(&ring);
396
397 if (!cfg_zc)
398 io_uring_prep_send(sqe, fd, payload,
399 cfg_payload_len, 0);
400 else {
401 io_uring_prep_send_zc(sqe, fd, payload,
402 cfg_payload_len, msg_flags, 0);
403 if (cfg_fixed_buf) {
404 sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
405 sqe->buf_index = buf_idx;
406 }
407 }
408 sqe->user_data = 1;
409 if (cfg_fixed_files) {
410 sqe->fd = 0;
411 sqe->flags |= IOSQE_FIXED_FILE;
412 }
413 }
414
415 if (cfg_defer_taskrun && compl_cqes >= notif_slack)
416 ret = io_uring_submit_and_get_events(&ring);
417 else
418 ret = io_uring_submit(&ring);
419
420 if (ret != cfg_nr_reqs)
421 t_error(1, ret, "submit");
422
423 for (i = 0; i < cfg_nr_reqs; i++) {
424 cqe = wait_cqe_fast(&ring);
425
426 if (cqe->flags & IORING_CQE_F_NOTIF) {
427 if (cqe->flags & IORING_CQE_F_MORE)
428 t_error(1, -EINVAL, "F_MORE notif");
429 compl_cqes--;
430 i--;
431 io_uring_cqe_seen(&ring, cqe);
432 continue;
433 }
434 if (cqe->flags & IORING_CQE_F_MORE)
435 compl_cqes++;
436
437 if (cqe->res >= 0) {
438 td->packets++;
439 td->bytes += cqe->res;
440 } else if (cqe->res == -ECONNREFUSED || cqe->res == -EPIPE ||
441 cqe->res == -ECONNRESET) {
442 fprintf(stderr, "Connection failure\n");
443 goto out_fail;
444 } else if (cqe->res != -EAGAIN) {
445 t_error(1, cqe->res, "send failed");
446 }
447 io_uring_cqe_seen(&ring, cqe);
448 }
449 if (should_stop)
450 break;
451 } while ((++loop % 16 != 0) || gettimeofday_ms() < tstart + cfg_runtime_ms);
452
453 td->dt_ms = gettimeofday_ms() - tstart;
454
455 out_fail:
456 shutdown(fd, SHUT_RDWR);
457 if (close(fd))
458 t_error(1, errno, "close");
459
460 while (compl_cqes) {
461 struct io_uring_cqe *cqe = wait_cqe_fast(&ring);
462
463 io_uring_cqe_seen(&ring, cqe);
464 compl_cqes--;
465 }
466 io_uring_queue_exit(&ring);
467 }
468
do_test(void * arg)469 static void *do_test(void *arg)
470 {
471 struct thread_data *td = arg;
472 int protocol = 0;
473
474 setup_sockaddr(cfg_family, str_addr, &td->dst_addr);
475
476 do_tx(td, cfg_family, cfg_type, protocol);
477 pthread_exit(&td->ret);
478 return NULL;
479 }
480
usage(const char * filepath)481 static void usage(const char *filepath)
482 {
483 printf("Usage:\t%s <protocol> <ip-version> -D<addr> [options]\n", filepath);
484 printf("\t%s <protocol> <ip-version> -R [options]\n\n", filepath);
485
486 printf(" -4\t\tUse IPv4\n");
487 printf(" -6\t\tUse IPv4\n");
488 printf(" -D <address>\tDestination address\n");
489 printf(" -p <port>\tServer port to listen on/connect to\n");
490 printf(" -s <size>\tBytes per request\n");
491 printf(" -s <size>\tBytes per request\n");
492 printf(" -n <nr>\tNumber of parallel requests\n");
493 printf(" -z <mode>\tZerocopy mode, 0 to disable, enabled otherwise\n");
494 printf(" -b <mode>\tUse registered buffers\n");
495 printf(" -l <mode>\tUse huge pages\n");
496 printf(" -d\t\tUse defer taskrun\n");
497 printf(" -C <cpu>\tPin to the specified CPU\n");
498 printf(" -T <nr>\tNumber of threads to use for sending\n");
499 printf(" -R\t\tPlay the server role\n");
500 printf(" -t <seconds>\tTime in seconds\n");
501 }
502
parse_opts(int argc,char ** argv)503 static void parse_opts(int argc, char **argv)
504 {
505 const int max_payload_len = IP_MAXPACKET -
506 sizeof(struct ipv6hdr) -
507 sizeof(struct tcphdr) -
508 40 /* max tcp options */;
509 int c;
510 char *daddr = NULL;
511
512 if (argc <= 1) {
513 usage(argv[0]);
514 exit(0);
515 }
516
517 cfg_payload_len = max_payload_len;
518
519 while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:T:Ry")) != -1) {
520 switch (c) {
521 case '4':
522 if (cfg_family != PF_UNSPEC)
523 t_error(1, 0, "Pass one of -4 or -6");
524 cfg_family = PF_INET;
525 cfg_alen = sizeof(struct sockaddr_in);
526 break;
527 case '6':
528 if (cfg_family != PF_UNSPEC)
529 t_error(1, 0, "Pass one of -4 or -6");
530 cfg_family = PF_INET6;
531 cfg_alen = sizeof(struct sockaddr_in6);
532 break;
533 case 'D':
534 daddr = optarg;
535 break;
536 case 'p':
537 cfg_port = strtoul(optarg, NULL, 0);
538 break;
539 case 's':
540 cfg_payload_len = strtoul(optarg, NULL, 0);
541 break;
542 case 't':
543 cfg_runtime_ms = 200 + strtoul(optarg, NULL, 10) * 1000;
544 break;
545 case 'n':
546 cfg_nr_reqs = strtoul(optarg, NULL, 0);
547 break;
548 case 'z':
549 cfg_zc = strtoul(optarg, NULL, 0);
550 break;
551 case 'b':
552 cfg_fixed_buf = strtoul(optarg, NULL, 0);
553 break;
554 case 'l':
555 cfg_hugetlb = strtoul(optarg, NULL, 0);
556 break;
557 case 'd':
558 cfg_defer_taskrun = 1;
559 break;
560 case 'C':
561 cfg_cpu = strtol(optarg, NULL, 0);
562 break;
563 case 'T':
564 cfg_nr_threads = strtol(optarg, NULL, 0);
565 if (cfg_nr_threads > MAX_THREADS)
566 t_error(1, 0, "too many threads\n");
567 break;
568 case 'R':
569 cfg_rx = 1;
570 break;
571 case 'y':
572 cfg_rx_poll = 1;
573 break;
574 }
575 }
576
577 if (cfg_nr_reqs > MAX_SUBMIT_NR)
578 t_error(1, 0, "-n: submit batch nr exceeds max (%d)", MAX_SUBMIT_NR);
579 if (cfg_payload_len > max_payload_len)
580 t_error(1, 0, "-s: payload exceeds max (%d)", max_payload_len);
581
582 str_addr = daddr;
583
584 if (optind != argc - 1)
585 usage(argv[0]);
586 }
587
main(int argc,char ** argv)588 int main(int argc, char **argv)
589 {
590 unsigned long long tsum = 0;
591 unsigned long long packets = 0, bytes = 0;
592 struct thread_data *td;
593 const char *cfg_test;
594 unsigned int i;
595 void *res;
596
597 parse_opts(argc, argv);
598 set_cpu_affinity();
599
600 payload = payload_buf;
601 if (cfg_hugetlb) {
602 payload = mmap(NULL, 2*1024*1024, PROT_READ | PROT_WRITE,
603 MAP_PRIVATE | MAP_HUGETLB | MAP_HUGE_2MB | MAP_ANONYMOUS,
604 -1, 0);
605 if (payload == MAP_FAILED) {
606 fprintf(stderr, "hugetlb alloc failed\n");
607 return 1;
608 }
609 }
610
611 cfg_test = argv[argc - 1];
612 if (!strcmp(cfg_test, "tcp"))
613 cfg_type = SOCK_STREAM;
614 else if (!strcmp(cfg_test, "udp"))
615 cfg_type = SOCK_DGRAM;
616 else
617 t_error(1, 0, "unknown cfg_test %s", cfg_test);
618
619 pthread_barrier_init(&barrier, NULL, cfg_nr_threads);
620
621 for (i = 0; i < IP_MAXPACKET; i++)
622 payload[i] = 'a' + (i % 26);
623
624 for (i = 0; i < cfg_nr_threads; i++) {
625 td = &threads[i];
626 td->idx = i;
627 }
628
629 if (cfg_rx)
630 do_setup_rx(cfg_family, cfg_type, 0);
631
632 if (!cfg_rx)
633 signal(SIGINT, sigint_handler);
634
635 for (i = 0; i < cfg_nr_threads; i++)
636 pthread_create(&threads[i].thread, NULL,
637 !cfg_rx ? do_test : do_rx, &threads[i]);
638
639 for (i = 0; i < cfg_nr_threads; i++) {
640 td = &threads[i];
641 pthread_join(td->thread, &res);
642 packets += td->packets;
643 bytes += td->bytes;
644 tsum += td->dt_ms;
645 }
646 tsum = tsum / cfg_nr_threads;
647
648 if (!tsum) {
649 printf("The run is too short, can't gather stats\n");
650 } else {
651 printf("packets=%llu (MB=%llu), rps=%llu (MB/s=%llu)\n",
652 packets, bytes >> 20,
653 packets * 1000 / tsum,
654 (bytes >> 20) * 1000 / tsum);
655 }
656 pthread_barrier_destroy(&barrier);
657 return 0;
658 }
659