• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* SPDX-License-Identifier: MIT */
2 /* based on linux-kernel/tools/testing/selftests/net/msg_zerocopy.c */
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <stdint.h>
6 #include <assert.h>
7 #include <errno.h>
8 #include <limits.h>
9 #include <fcntl.h>
10 #include <unistd.h>
11 #include <stdbool.h>
12 #include <stdarg.h>
13 #include <string.h>
14 #include <pthread.h>
15 
16 #include <poll.h>
17 #include <sched.h>
18 #include <arpa/inet.h>
19 #include <linux/if_packet.h>
20 #include <linux/ipv6.h>
21 #include <linux/socket.h>
22 #include <linux/sockios.h>
23 #include <net/ethernet.h>
24 #include <net/if.h>
25 #include <netinet/ip.h>
26 #include <netinet/in.h>
27 #include <netinet/ip6.h>
28 #include <netinet/tcp.h>
29 #include <netinet/udp.h>
30 #include <sys/socket.h>
31 #include <sys/time.h>
32 #include <sys/resource.h>
33 #include <sys/un.h>
34 #include <sys/ioctl.h>
35 #include <sys/socket.h>
36 #include <sys/stat.h>
37 #include <sys/time.h>
38 #include <sys/types.h>
39 #include <sys/wait.h>
40 #include <sys/mman.h>
41 #include <linux/mman.h>
42 #include <signal.h>
43 
44 #include "liburing.h"
45 
46 #define ZC_TAG 0xfffffffULL
47 #define MAX_SUBMIT_NR 512
48 #define MAX_THREADS 100
49 
50 struct thread_data {
51 	pthread_t thread;
52 	void *ret;
53 	int idx;
54 	unsigned long long packets;
55 	unsigned long long bytes;
56 	unsigned long long dt_ms;
57 	struct sockaddr_storage dst_addr;
58 	int fd;
59 };
60 
61 static bool cfg_reg_ringfd = true;
62 static bool cfg_fixed_files = 1;
63 static bool cfg_zc = 1;
64 static int  cfg_nr_reqs = 8;
65 static bool cfg_fixed_buf = 1;
66 static bool cfg_hugetlb = 0;
67 static bool cfg_defer_taskrun = 0;
68 static int  cfg_cpu = -1;
69 static bool cfg_rx = 0;
70 static unsigned  cfg_nr_threads = 1;
71 
72 static int  cfg_family		= PF_UNSPEC;
73 static int  cfg_type		= 0;
74 static int  cfg_payload_len;
75 static int  cfg_port		= 8000;
76 static int  cfg_runtime_ms	= 4200;
77 static bool cfg_rx_poll		= false;
78 
79 static socklen_t cfg_alen;
80 static char *str_addr = NULL;
81 
82 static char payload_buf[IP_MAXPACKET] __attribute__((aligned(4096)));
83 static char *payload;
84 static struct thread_data threads[MAX_THREADS];
85 static pthread_barrier_t barrier;
86 
87 static bool should_stop = false;
88 
sigint_handler(int sig)89 static void sigint_handler(__attribute__((__unused__)) int sig)
90 {
91 	/* kill if should_stop can't unblock threads fast enough */
92 	if (should_stop)
93 		_exit(-1);
94 	should_stop = true;
95 }
96 
97 /*
98  * Implementation of error(3), prints an error message and exits.
99  */
t_error(int status,int errnum,const char * format,...)100 static void t_error(int status, int errnum, const char *format, ...)
101 {
102 	va_list args;
103 	va_start(args, format);
104 
105 	vfprintf(stderr, format, args);
106 	if (errnum)
107 		fprintf(stderr, ": %s", strerror(errnum));
108 
109 	fprintf(stderr, "\n");
110 	va_end(args);
111 	exit(status);
112 }
113 
set_cpu_affinity(void)114 static void set_cpu_affinity(void)
115 {
116 	cpu_set_t mask;
117 
118 	if (cfg_cpu == -1)
119 		return;
120 
121 	CPU_ZERO(&mask);
122 	CPU_SET(cfg_cpu, &mask);
123 	if (sched_setaffinity(0, sizeof(mask), &mask))
124 		t_error(1, errno, "unable to pin cpu\n");
125 }
126 
set_iowq_affinity(struct io_uring * ring)127 static void set_iowq_affinity(struct io_uring *ring)
128 {
129 	cpu_set_t mask;
130 	int ret;
131 
132 	if (cfg_cpu == -1)
133 		return;
134 
135 	CPU_ZERO(&mask);
136 	CPU_SET(cfg_cpu, &mask);
137 	ret = io_uring_register_iowq_aff(ring, 1, &mask);
138 	if (ret)
139 		t_error(1, ret, "unabled to set io-wq affinity\n");
140 }
141 
gettimeofday_ms(void)142 static unsigned long gettimeofday_ms(void)
143 {
144 	struct timeval tv;
145 
146 	gettimeofday(&tv, NULL);
147 	return (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
148 }
149 
do_setsockopt(int fd,int level,int optname,int val)150 static void do_setsockopt(int fd, int level, int optname, int val)
151 {
152 	if (setsockopt(fd, level, optname, &val, sizeof(val)))
153 		t_error(1, errno, "setsockopt %d.%d: %d", level, optname, val);
154 }
155 
setup_sockaddr(int domain,const char * str_addr,struct sockaddr_storage * sockaddr)156 static void setup_sockaddr(int domain, const char *str_addr,
157 			   struct sockaddr_storage *sockaddr)
158 {
159 	struct sockaddr_in6 *addr6 = (void *) sockaddr;
160 	struct sockaddr_in *addr4 = (void *) sockaddr;
161 	int port = cfg_port;
162 
163 	switch (domain) {
164 	case PF_INET:
165 		memset(addr4, 0, sizeof(*addr4));
166 		addr4->sin_family = AF_INET;
167 		addr4->sin_port = htons(port);
168 		if (str_addr &&
169 		    inet_pton(AF_INET, str_addr, &(addr4->sin_addr)) != 1)
170 			t_error(1, 0, "ipv4 parse error: %s", str_addr);
171 		break;
172 	case PF_INET6:
173 		memset(addr6, 0, sizeof(*addr6));
174 		addr6->sin6_family = AF_INET6;
175 		addr6->sin6_port = htons(port);
176 		if (str_addr &&
177 		    inet_pton(AF_INET6, str_addr, &(addr6->sin6_addr)) != 1)
178 			t_error(1, 0, "ipv6 parse error: %s", str_addr);
179 		break;
180 	default:
181 		t_error(1, 0, "illegal domain");
182 	}
183 }
184 
do_poll(int fd,int events)185 static int do_poll(int fd, int events)
186 {
187 	struct pollfd pfd;
188 	int ret;
189 
190 	pfd.events = events;
191 	pfd.revents = 0;
192 	pfd.fd = fd;
193 
194 	ret = poll(&pfd, 1, -1);
195 	if (ret == -1)
196 		t_error(1, errno, "poll");
197 
198 	return ret && (pfd.revents & events);
199 }
200 
201 /* Flush all outstanding bytes for the tcp receive queue */
do_flush_tcp(struct thread_data * td,int fd)202 static int do_flush_tcp(struct thread_data *td, int fd)
203 {
204 	int ret;
205 
206 	/* MSG_TRUNC flushes up to len bytes */
207 	ret = recv(fd, NULL, 1 << 21, MSG_TRUNC | MSG_DONTWAIT);
208 	if (ret == -1 && errno == EAGAIN)
209 		return 0;
210 	if (ret == -1)
211 		t_error(1, errno, "flush");
212 	if (!ret)
213 		return 1;
214 
215 	td->packets++;
216 	td->bytes += ret;
217 	return 0;
218 }
219 
220 /* Flush all outstanding datagrams. Verify first few bytes of each. */
do_flush_datagram(struct thread_data * td,int fd)221 static int do_flush_datagram(struct thread_data *td, int fd)
222 {
223 	long ret, off = 0;
224 	char buf[64];
225 
226 	/* MSG_TRUNC will return full datagram length */
227 	ret = recv(fd, buf, sizeof(buf), MSG_DONTWAIT | MSG_TRUNC);
228 	if (ret == -1 && errno == EAGAIN)
229 		return 0;
230 
231 	if (ret == -1)
232 		t_error(1, errno, "recv");
233 	if (ret != cfg_payload_len)
234 		t_error(1, 0, "recv: ret=%u != %u", ret, cfg_payload_len);
235 	if ((unsigned long) ret > sizeof(buf) - off)
236 		ret = sizeof(buf) - off;
237 	if (memcmp(buf + off, payload, ret))
238 		t_error(1, 0, "recv: data mismatch");
239 
240 	td->packets++;
241 	td->bytes += cfg_payload_len;
242 	return 0;
243 }
244 
do_setup_rx(int domain,int type,int protocol)245 static void do_setup_rx(int domain, int type, int protocol)
246 {
247 	struct sockaddr_storage addr = {};
248 	struct thread_data *td;
249 	int listen_fd, fd;
250 	unsigned int i;
251 
252 	fd = socket(domain, type, protocol);
253 	if (fd == -1)
254 		t_error(1, errno, "socket r");
255 
256 	do_setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
257 
258 	setup_sockaddr(cfg_family, str_addr, &addr);
259 
260 	if (bind(fd, (void *)&addr, cfg_alen))
261 		t_error(1, errno, "bind");
262 
263 	if (type != SOCK_STREAM) {
264 		if (cfg_nr_threads != 1)
265 			t_error(1, 0, "udp rx cant multithread");
266 		threads[0].fd = fd;
267 		return;
268 	}
269 
270 	listen_fd = fd;
271 	if (listen(listen_fd, cfg_nr_threads))
272 		t_error(1, errno, "listen");
273 
274 	for (i = 0; i < cfg_nr_threads; i++) {
275 		td = &threads[i];
276 
277 		fd = accept(listen_fd, NULL, NULL);
278 		if (fd == -1)
279 			t_error(1, errno, "accept");
280 		td->fd = fd;
281 	}
282 
283 	if (close(listen_fd))
284 		t_error(1, errno, "close listen sock");
285 }
286 
do_rx(void * arg)287 static void *do_rx(void *arg)
288 {
289 	struct thread_data *td = arg;
290 	const int cfg_receiver_wait_ms = 400;
291 	uint64_t tstop;
292 	int ret, fd = td->fd;
293 
294 	tstop = gettimeofday_ms() + cfg_runtime_ms + cfg_receiver_wait_ms;
295 	do {
296 		if (cfg_type == SOCK_STREAM)
297 			ret = do_flush_tcp(td, fd);
298 		else
299 			ret = do_flush_datagram(td, fd);
300 
301 		if (ret)
302 			break;
303 
304 		do_poll(fd, POLLIN);
305 	} while (gettimeofday_ms() < tstop);
306 
307 	if (close(fd))
308 		t_error(1, errno, "close");
309 	pthread_exit(&td->ret);
310 	return NULL;
311 }
312 
wait_cqe_fast(struct io_uring * ring)313 static inline struct io_uring_cqe *wait_cqe_fast(struct io_uring *ring)
314 {
315 	struct io_uring_cqe *cqe;
316 	unsigned head;
317 	int ret;
318 
319 	io_uring_for_each_cqe(ring, head, cqe)
320 		return cqe;
321 
322 	ret = io_uring_wait_cqe(ring, &cqe);
323 	if (ret)
324 		t_error(1, ret, "wait cqe");
325 	return cqe;
326 }
327 
do_tx(struct thread_data * td,int domain,int type,int protocol)328 static void do_tx(struct thread_data *td, int domain, int type, int protocol)
329 {
330 	const int notif_slack = 128;
331 	struct io_uring ring;
332 	struct iovec iov;
333 	uint64_t tstart;
334 	int i, fd, ret;
335 	int compl_cqes = 0;
336 	int ring_flags = IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER;
337 	unsigned loop = 0;
338 
339 	if (cfg_defer_taskrun)
340 		ring_flags |= IORING_SETUP_DEFER_TASKRUN;
341 
342 	fd = socket(domain, type, protocol);
343 	if (fd == -1)
344 		t_error(1, errno, "socket t");
345 
346 	if (connect(fd, (void *)&td->dst_addr, cfg_alen))
347 		t_error(1, errno, "connect, idx %i", td->idx);
348 
349 	ret = io_uring_queue_init(512, &ring, ring_flags);
350 	if (ret)
351 		t_error(1, ret, "io_uring: queue init");
352 
353 	set_cpu_affinity();
354 	set_iowq_affinity(&ring);
355 
356 	if (cfg_fixed_files) {
357 		ret = io_uring_register_files(&ring, &fd, 1);
358 		if (ret < 0)
359 			t_error(1, ret, "io_uring: files registration");
360 	}
361 	if (cfg_reg_ringfd) {
362 		ret = io_uring_register_ring_fd(&ring);
363 		if (ret < 0)
364 			t_error(1, ret, "io_uring: io_uring_register_ring_fd");
365 	}
366 
367 	iov.iov_base = payload;
368 	iov.iov_len = cfg_payload_len;
369 
370 	ret = io_uring_register_buffers(&ring, &iov, 1);
371 	if (ret)
372 		t_error(1, ret, "io_uring: buffer registration");
373 
374 	if (cfg_rx_poll) {
375 		struct io_uring_sqe *sqe;
376 
377 		sqe = io_uring_get_sqe(&ring);
378 		io_uring_prep_poll_add(sqe, fd, POLLIN);
379 
380 		ret = io_uring_submit(&ring);
381 		if (ret != 1)
382 			t_error(1, ret, "submit poll");
383 	}
384 
385 	pthread_barrier_wait(&barrier);
386 
387 	tstart = gettimeofday_ms();
388 	do {
389 		struct io_uring_sqe *sqe;
390 		struct io_uring_cqe *cqe;
391 		unsigned buf_idx = 0;
392 		unsigned msg_flags = MSG_WAITALL;
393 
394 		for (i = 0; i < cfg_nr_reqs; i++) {
395 			sqe = io_uring_get_sqe(&ring);
396 
397 			if (!cfg_zc)
398 				io_uring_prep_send(sqe, fd, payload,
399 						   cfg_payload_len, 0);
400 			else {
401 				io_uring_prep_send_zc(sqe, fd, payload,
402 						     cfg_payload_len, msg_flags, 0);
403 				if (cfg_fixed_buf) {
404 					sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
405 					sqe->buf_index = buf_idx;
406 				}
407 			}
408 			sqe->user_data = 1;
409 			if (cfg_fixed_files) {
410 				sqe->fd = 0;
411 				sqe->flags |= IOSQE_FIXED_FILE;
412 			}
413 		}
414 
415 		if (cfg_defer_taskrun && compl_cqes >= notif_slack)
416 			ret = io_uring_submit_and_get_events(&ring);
417 		else
418 			ret = io_uring_submit(&ring);
419 
420 		if (ret != cfg_nr_reqs)
421 			t_error(1, ret, "submit");
422 
423 		for (i = 0; i < cfg_nr_reqs; i++) {
424 			cqe = wait_cqe_fast(&ring);
425 
426 			if (cqe->flags & IORING_CQE_F_NOTIF) {
427 				if (cqe->flags & IORING_CQE_F_MORE)
428 					t_error(1, -EINVAL, "F_MORE notif");
429 				compl_cqes--;
430 				i--;
431 				io_uring_cqe_seen(&ring, cqe);
432 				continue;
433 			}
434 			if (cqe->flags & IORING_CQE_F_MORE)
435 				compl_cqes++;
436 
437 			if (cqe->res >= 0) {
438 				td->packets++;
439 				td->bytes += cqe->res;
440 			} else if (cqe->res == -ECONNREFUSED || cqe->res == -EPIPE ||
441 				   cqe->res == -ECONNRESET) {
442 				fprintf(stderr, "Connection failure\n");
443 				goto out_fail;
444 			} else if (cqe->res != -EAGAIN) {
445 				t_error(1, cqe->res, "send failed");
446 			}
447 			io_uring_cqe_seen(&ring, cqe);
448 		}
449 		if (should_stop)
450 			break;
451 	} while ((++loop % 16 != 0) || gettimeofday_ms() < tstart + cfg_runtime_ms);
452 
453 	td->dt_ms = gettimeofday_ms() - tstart;
454 
455 out_fail:
456 	shutdown(fd, SHUT_RDWR);
457 	if (close(fd))
458 		t_error(1, errno, "close");
459 
460 	while (compl_cqes) {
461 		struct io_uring_cqe *cqe = wait_cqe_fast(&ring);
462 
463 		io_uring_cqe_seen(&ring, cqe);
464 		compl_cqes--;
465 	}
466 	io_uring_queue_exit(&ring);
467 }
468 
do_test(void * arg)469 static void *do_test(void *arg)
470 {
471 	struct thread_data *td = arg;
472 	int protocol = 0;
473 
474 	setup_sockaddr(cfg_family, str_addr, &td->dst_addr);
475 
476 	do_tx(td, cfg_family, cfg_type, protocol);
477 	pthread_exit(&td->ret);
478 	return NULL;
479 }
480 
usage(const char * filepath)481 static void usage(const char *filepath)
482 {
483 	printf("Usage:\t%s <protocol> <ip-version> -D<addr> [options]\n", filepath);
484 	printf("\t%s <protocol> <ip-version> -R [options]\n\n", filepath);
485 
486 	printf("  -4\t\tUse IPv4\n");
487 	printf("  -6\t\tUse IPv4\n");
488 	printf("  -D <address>\tDestination address\n");
489 	printf("  -p <port>\tServer port to listen on/connect to\n");
490 	printf("  -s <size>\tBytes per request\n");
491 	printf("  -s <size>\tBytes per request\n");
492 	printf("  -n <nr>\tNumber of parallel requests\n");
493 	printf("  -z <mode>\tZerocopy mode, 0 to disable, enabled otherwise\n");
494 	printf("  -b <mode>\tUse registered buffers\n");
495 	printf("  -l <mode>\tUse huge pages\n");
496 	printf("  -d\t\tUse defer taskrun\n");
497 	printf("  -C <cpu>\tPin to the specified CPU\n");
498 	printf("  -T <nr>\tNumber of threads to use for sending\n");
499 	printf("  -R\t\tPlay the server role\n");
500 	printf("  -t <seconds>\tTime in seconds\n");
501 }
502 
parse_opts(int argc,char ** argv)503 static void parse_opts(int argc, char **argv)
504 {
505 	const int max_payload_len = IP_MAXPACKET -
506 				    sizeof(struct ipv6hdr) -
507 				    sizeof(struct tcphdr) -
508 				    40 /* max tcp options */;
509 	int c;
510 	char *daddr = NULL;
511 
512 	if (argc <= 1) {
513 		usage(argv[0]);
514 		exit(0);
515 	}
516 
517 	cfg_payload_len = max_payload_len;
518 
519 	while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:T:Ry")) != -1) {
520 		switch (c) {
521 		case '4':
522 			if (cfg_family != PF_UNSPEC)
523 				t_error(1, 0, "Pass one of -4 or -6");
524 			cfg_family = PF_INET;
525 			cfg_alen = sizeof(struct sockaddr_in);
526 			break;
527 		case '6':
528 			if (cfg_family != PF_UNSPEC)
529 				t_error(1, 0, "Pass one of -4 or -6");
530 			cfg_family = PF_INET6;
531 			cfg_alen = sizeof(struct sockaddr_in6);
532 			break;
533 		case 'D':
534 			daddr = optarg;
535 			break;
536 		case 'p':
537 			cfg_port = strtoul(optarg, NULL, 0);
538 			break;
539 		case 's':
540 			cfg_payload_len = strtoul(optarg, NULL, 0);
541 			break;
542 		case 't':
543 			cfg_runtime_ms = 200 + strtoul(optarg, NULL, 10) * 1000;
544 			break;
545 		case 'n':
546 			cfg_nr_reqs = strtoul(optarg, NULL, 0);
547 			break;
548 		case 'z':
549 			cfg_zc = strtoul(optarg, NULL, 0);
550 			break;
551 		case 'b':
552 			cfg_fixed_buf = strtoul(optarg, NULL, 0);
553 			break;
554 		case 'l':
555 			cfg_hugetlb = strtoul(optarg, NULL, 0);
556 			break;
557 		case 'd':
558 			cfg_defer_taskrun = 1;
559 			break;
560 		case 'C':
561 			cfg_cpu = strtol(optarg, NULL, 0);
562 			break;
563 		case 'T':
564 			cfg_nr_threads = strtol(optarg, NULL, 0);
565 			if (cfg_nr_threads > MAX_THREADS)
566 				t_error(1, 0, "too many threads\n");
567 			break;
568 		case 'R':
569 			cfg_rx = 1;
570 			break;
571 		case 'y':
572 			cfg_rx_poll = 1;
573 			break;
574 		}
575 	}
576 
577 	if (cfg_nr_reqs > MAX_SUBMIT_NR)
578 		t_error(1, 0, "-n: submit batch nr exceeds max (%d)", MAX_SUBMIT_NR);
579 	if (cfg_payload_len > max_payload_len)
580 		t_error(1, 0, "-s: payload exceeds max (%d)", max_payload_len);
581 
582 	str_addr = daddr;
583 
584 	if (optind != argc - 1)
585 		usage(argv[0]);
586 }
587 
main(int argc,char ** argv)588 int main(int argc, char **argv)
589 {
590 	unsigned long long tsum = 0;
591 	unsigned long long packets = 0, bytes = 0;
592 	struct thread_data *td;
593 	const char *cfg_test;
594 	unsigned int i;
595 	void *res;
596 
597 	parse_opts(argc, argv);
598 	set_cpu_affinity();
599 
600 	payload = payload_buf;
601 	if (cfg_hugetlb) {
602 		payload = mmap(NULL, 2*1024*1024, PROT_READ | PROT_WRITE,
603 				MAP_PRIVATE | MAP_HUGETLB | MAP_HUGE_2MB | MAP_ANONYMOUS,
604 				-1, 0);
605 		if (payload == MAP_FAILED) {
606 			fprintf(stderr, "hugetlb alloc failed\n");
607 			return 1;
608 		}
609 	}
610 
611 	cfg_test = argv[argc - 1];
612 	if (!strcmp(cfg_test, "tcp"))
613 		cfg_type = SOCK_STREAM;
614 	else if (!strcmp(cfg_test, "udp"))
615 		cfg_type = SOCK_DGRAM;
616 	else
617 		t_error(1, 0, "unknown cfg_test %s", cfg_test);
618 
619 	pthread_barrier_init(&barrier, NULL, cfg_nr_threads);
620 
621 	for (i = 0; i < IP_MAXPACKET; i++)
622 		payload[i] = 'a' + (i % 26);
623 
624 	for (i = 0; i < cfg_nr_threads; i++) {
625 		td = &threads[i];
626 		td->idx = i;
627 	}
628 
629 	if (cfg_rx)
630 		do_setup_rx(cfg_family, cfg_type, 0);
631 
632 	if (!cfg_rx)
633 		signal(SIGINT, sigint_handler);
634 
635 	for (i = 0; i < cfg_nr_threads; i++)
636 		pthread_create(&threads[i].thread, NULL,
637 				!cfg_rx ? do_test : do_rx, &threads[i]);
638 
639 	for (i = 0; i < cfg_nr_threads; i++) {
640 		td = &threads[i];
641 		pthread_join(td->thread, &res);
642 		packets += td->packets;
643 		bytes += td->bytes;
644 		tsum += td->dt_ms;
645 	}
646 	tsum = tsum / cfg_nr_threads;
647 
648 	if (!tsum) {
649 		printf("The run is too short, can't gather stats\n");
650 	} else {
651 		printf("packets=%llu (MB=%llu), rps=%llu (MB/s=%llu)\n",
652 			packets, bytes >> 20,
653 			packets * 1000 / tsum,
654 			(bytes >> 20) * 1000 / tsum);
655 	}
656 	pthread_barrier_destroy(&barrier);
657 	return 0;
658 }
659