• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* SPDX-License-Identifier: MIT */
2 /*
3  * Simple test case showing using send and recv bundles
4  */
5 #include <errno.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <unistd.h>
10 #include <arpa/inet.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <pthread.h>
14 
15 #define MSG_SIZE 128
16 #define NR_MIN_MSGS	4
17 #define NR_MAX_MSGS	32
18 #define SEQ_SIZE	(MSG_SIZE / sizeof(unsigned long))
19 
20 static int nr_msgs;
21 static int use_tcp;
22 
23 #define RECV_BIDS	8192
24 #define RECV_BID_MASK	(RECV_BIDS - 1)
25 
26 #include "liburing.h"
27 #include "helpers.h"
28 
29 #define PORT	10202
30 #define HOST	"127.0.0.1"
31 
32 static int use_port = PORT;
33 
34 #define SEND_BGID	7
35 #define RECV_BGID	8
36 
37 static int no_send_mshot;
38 
39 struct recv_data {
40 	pthread_barrier_t connect;
41 	pthread_barrier_t startup;
42 	pthread_barrier_t barrier;
43 	pthread_barrier_t finish;
44 	unsigned long seq;
45 	int recv_bytes;
46 	int accept_fd;
47 	int abort;
48 	unsigned int max_sends;
49 	int to_eagain;
50 	void *recv_buf;
51 
52 	int send_bundle;
53 	int recv_bundle;
54 };
55 
arm_recv(struct io_uring * ring,struct recv_data * rd)56 static int arm_recv(struct io_uring *ring, struct recv_data *rd)
57 {
58 	struct io_uring_sqe *sqe;
59 	int ret;
60 
61 	sqe = io_uring_get_sqe(ring);
62 	io_uring_prep_recv_multishot(sqe, rd->accept_fd, NULL, 0, 0);
63 	if (rd->recv_bundle && use_tcp)
64 		sqe->ioprio |= IORING_RECVSEND_BUNDLE;
65 	sqe->buf_group = RECV_BGID;
66 	sqe->flags |= IOSQE_BUFFER_SELECT;
67 	sqe->user_data = 2;
68 
69 	ret = io_uring_submit(ring);
70 	if (ret != 1) {
71 		fprintf(stderr, "submit failed: %d\n", ret);
72 		return 1;
73 	}
74 
75 	return 0;
76 }
77 
recv_prep(struct io_uring * ring,struct recv_data * rd,int * sock)78 static int recv_prep(struct io_uring *ring, struct recv_data *rd, int *sock)
79 {
80 	struct sockaddr_in saddr;
81 	int sockfd, ret, val, use_fd;
82 	socklen_t socklen;
83 
84 	memset(&saddr, 0, sizeof(saddr));
85 	saddr.sin_family = AF_INET;
86 	saddr.sin_addr.s_addr = htonl(INADDR_ANY);
87 	saddr.sin_port = htons(use_port);
88 
89 	if (use_tcp)
90 		sockfd = socket(AF_INET, SOCK_STREAM, 0);
91 	else
92 		sockfd = socket(AF_INET, SOCK_DGRAM, 0);
93 	if (sockfd < 0) {
94 		perror("socket");
95 		return 1;
96 	}
97 
98 	val = 1;
99 	setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
100 
101 	ret = bind(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
102 	if (ret < 0) {
103 		perror("bind");
104 		goto err;
105 	}
106 
107 	if (use_tcp) {
108 		ret = listen(sockfd, 1);
109 		if (ret < 0) {
110 			perror("listen");
111 			goto err;
112 		}
113 
114 		pthread_barrier_wait(&rd->connect);
115 
116 		socklen = sizeof(saddr);
117 		use_fd = accept(sockfd, (struct sockaddr *)&saddr, &socklen);
118 		if (use_fd < 0) {
119 			perror("accept");
120 			goto err;
121 		}
122 	} else {
123 		use_fd = sockfd;
124 		pthread_barrier_wait(&rd->connect);
125 	}
126 
127 	rd->accept_fd = use_fd;
128 	pthread_barrier_wait(&rd->startup);
129 	pthread_barrier_wait(&rd->barrier);
130 
131 	if (arm_recv(ring, rd))
132 		goto err;
133 
134 	*sock = sockfd;
135 	return 0;
136 err:
137 	close(sockfd);
138 	return 1;
139 }
140 
verify_seq(struct recv_data * rd,void * verify_ptr,int verify_sz,int start_bid)141 static int verify_seq(struct recv_data *rd, void *verify_ptr, int verify_sz,
142 		      int start_bid)
143 {
144 	unsigned long *seqp;
145 	int seq_size = verify_sz / sizeof(unsigned long);
146 	int i;
147 
148 	seqp = verify_ptr;
149 	for (i = 0; i < seq_size; i++) {
150 		if (rd->seq != *seqp) {
151 			fprintf(stderr, "bid=%d, got seq %lu, wanted %lu, offset %d\n", start_bid, *seqp, rd->seq, i);
152 			return 0;
153 		}
154 		seqp++;
155 		rd->seq++;
156 	}
157 
158 	return 1;
159 }
160 
recv_get_cqe(struct io_uring * ring,struct recv_data * rd,struct io_uring_cqe ** cqe)161 static int recv_get_cqe(struct io_uring *ring, struct recv_data *rd,
162 			struct io_uring_cqe **cqe)
163 {
164 	struct __kernel_timespec ts = { .tv_sec = 0, .tv_nsec = 100000000LL };
165 	int ret;
166 
167 	do {
168 		ret = io_uring_wait_cqe_timeout(ring, cqe, &ts);
169 		if (!ret)
170 			return 0;
171 		if (ret == -ETIME) {
172 			if (rd->abort)
173 				break;
174 			continue;
175 		}
176 		fprintf(stderr, "wait recv: %d\n", ret);
177 		break;
178 	} while (1);
179 
180 	return 1;
181 }
182 
do_recv(struct io_uring * ring,struct recv_data * rd)183 static int do_recv(struct io_uring *ring, struct recv_data *rd)
184 {
185 	struct io_uring_cqe *cqe;
186 	int bid, next_bid = 0;
187 	void *verify_ptr;
188 	int verify_sz = 0;
189 	int verify_bid = 0;
190 
191 	verify_ptr = malloc(rd->recv_bytes);
192 
193 	do {
194 		if (recv_get_cqe(ring, rd, &cqe))
195 			break;
196 		if (cqe->res == -EINVAL) {
197 			fprintf(stdout, "recv not supported, skipping\n");
198 			return 0;
199 		}
200 		if (cqe->res < 0) {
201 			fprintf(stderr, "failed recv cqe: %d\n", cqe->res);
202 			goto err;
203 		}
204 		if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
205 			fprintf(stderr, "no buffer set in recv\n");
206 			goto err;
207 		}
208 		bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
209 		if (bid != next_bid) {
210 			fprintf(stderr, "got bid %d, wanted %d\n", bid, next_bid);
211 			goto err;
212 		}
213 		if (!rd->recv_bundle && cqe->res != MSG_SIZE) {
214 			fprintf(stderr, "recv got wrong length: %d\n", cqe->res);
215 			goto err;
216 		}
217 		if (!(verify_sz % MSG_SIZE)) {
218 			if (!verify_seq(rd, verify_ptr, verify_sz, verify_bid))
219 				goto err;
220 			verify_bid += verify_sz / MSG_SIZE;
221 			verify_bid &= RECV_BID_MASK;
222 			verify_sz = 0;
223 		} else {
224 			memcpy(verify_ptr + verify_sz, rd->recv_buf + (bid * MSG_SIZE), cqe->res);
225 			verify_sz += cqe->res;
226 		}
227 		next_bid = bid + ((cqe->res + MSG_SIZE - 1) / MSG_SIZE);
228 		next_bid &= RECV_BID_MASK;
229 		rd->recv_bytes -= cqe->res;
230 		io_uring_cqe_seen(ring, cqe);
231 		if (!(cqe->flags & IORING_CQE_F_MORE) && rd->recv_bytes) {
232 			if (arm_recv(ring, rd))
233 				goto err;
234 		}
235 	} while (rd->recv_bytes);
236 
237 	if (verify_sz && !(verify_sz % MSG_SIZE) &&
238 	    !verify_seq(rd, verify_ptr, verify_sz, verify_bid))
239 		goto err;
240 
241 	pthread_barrier_wait(&rd->finish);
242 	return 0;
243 err:
244 	pthread_barrier_wait(&rd->finish);
245 	return 1;
246 }
247 
recv_fn(void * data)248 static void *recv_fn(void *data)
249 {
250 	struct recv_data *rd = data;
251 	struct io_uring_params p = { };
252 	struct io_uring ring;
253 	struct io_uring_buf_ring *br;
254 	void *buf, *ptr;
255 	int ret, sock, i;
256 
257 	p.cq_entries = 4096;
258 	p.flags = IORING_SETUP_CQSIZE;
259 	ret = t_create_ring_params(16, &ring, &p);
260 	if (ret == T_SETUP_SKIP) {
261 		ret = 0;
262 		goto err;
263 	} else if (ret < 0) {
264 		goto err;
265 	}
266 
267 	if (posix_memalign(&buf, 4096, MSG_SIZE * RECV_BIDS))
268 		goto err;
269 
270 	br = io_uring_setup_buf_ring(&ring, RECV_BIDS, RECV_BGID, 0, &ret);
271 	if (!br) {
272 		fprintf(stderr, "failed setting up recv ring %d\n", ret);
273 		goto err;
274 	}
275 
276 	ptr = buf;
277 	for (i = 0; i < RECV_BIDS; i++) {
278 		io_uring_buf_ring_add(br, ptr, MSG_SIZE, i, RECV_BID_MASK, i);
279 		ptr += MSG_SIZE;
280 	}
281 	io_uring_buf_ring_advance(br, RECV_BIDS);
282 	rd->recv_buf = buf;
283 
284 	ret = recv_prep(&ring, rd, &sock);
285 	if (ret) {
286 		fprintf(stderr, "recv_prep failed: %d\n", ret);
287 		goto err;
288 	}
289 
290 	ret = do_recv(&ring, rd);
291 
292 	close(sock);
293 	close(rd->accept_fd);
294 	io_uring_queue_exit(&ring);
295 err:
296 	return (void *)(intptr_t)ret;
297 }
298 
__do_send_bundle(struct recv_data * rd,struct io_uring * ring,int sockfd)299 static int __do_send_bundle(struct recv_data *rd, struct io_uring *ring, int sockfd)
300 {
301 	struct io_uring_cqe *cqe;
302 	struct io_uring_sqe *sqe;
303 	int bytes_needed = MSG_SIZE * nr_msgs;
304 	int i, ret;
305 
306 	sqe = io_uring_get_sqe(ring);
307 	io_uring_prep_send_bundle(sqe, sockfd, 0, 0);
308 	sqe->flags |= IOSQE_BUFFER_SELECT;
309 	sqe->buf_group = SEND_BGID;
310 	sqe->user_data = 1;
311 
312 	ret = io_uring_submit(ring);
313 	if (ret != 1)
314 		return 1;
315 
316 	pthread_barrier_wait(&rd->barrier);
317 
318 	for (i = 0; i < nr_msgs; i++) {
319 		ret = io_uring_wait_cqe(ring, &cqe);
320 		if (ret) {
321 			fprintf(stderr, "wait send: %d\n", ret);
322 			return 1;
323 		}
324 		if (!i && cqe->res == -EINVAL) {
325 			rd->abort = 1;
326 			no_send_mshot = 1;
327 			break;
328 		}
329 		if (cqe->res < 0) {
330 			fprintf(stderr, "bad send cqe res: %d\n", cqe->res);
331 			return 1;
332 		}
333 		bytes_needed -= cqe->res;
334 		if (!bytes_needed) {
335 			io_uring_cqe_seen(ring, cqe);
336 			break;
337 		}
338 		if (!(cqe->flags & IORING_CQE_F_MORE)) {
339 			fprintf(stderr, "expected more, but MORE not set\n");
340 			return 1;
341 		}
342 		io_uring_cqe_seen(ring, cqe);
343 	}
344 
345 	return 0;
346 }
347 
__do_send(struct recv_data * rd,struct io_uring * ring,int sockfd)348 static int __do_send(struct recv_data *rd, struct io_uring *ring, int sockfd)
349 {
350 	struct io_uring_cqe *cqe;
351 	struct io_uring_sqe *sqe;
352 	int bytes_needed = MSG_SIZE * nr_msgs;
353 	int i, ret;
354 
355 	for (i = 0; i < nr_msgs; i++) {
356 		sqe = io_uring_get_sqe(ring);
357 		io_uring_prep_send(sqe, sockfd, NULL, 0, 0);
358 		sqe->user_data = 10 + i;
359 		sqe->flags |= IOSQE_BUFFER_SELECT;
360 		sqe->buf_group = SEND_BGID;
361 
362 		ret = io_uring_submit(ring);
363 		if (ret != 1)
364 			return 1;
365 
366 		if (!i)
367 			pthread_barrier_wait(&rd->barrier);
368 		ret = io_uring_wait_cqe(ring, &cqe);
369 		if (ret) {
370 			fprintf(stderr, "send wait cqe %d\n", ret);
371 			return 1;
372 		}
373 
374 		if (!i && cqe->res == -EINVAL) {
375 			rd->abort = 1;
376 			no_send_mshot = 1;
377 			break;
378 		}
379 		if (cqe->res != MSG_SIZE) {
380 			fprintf(stderr, "send failed cqe: %d\n", cqe->res);
381 			return 1;
382 		}
383 		if (cqe->res < 0) {
384 			fprintf(stderr, "bad send cqe res: %d\n", cqe->res);
385 			return 1;
386 		}
387 		bytes_needed -= cqe->res;
388 		io_uring_cqe_seen(ring, cqe);
389 		if (!bytes_needed)
390 			break;
391 	}
392 
393 	return 0;
394 }
395 
do_send(struct recv_data * rd)396 static int do_send(struct recv_data *rd)
397 {
398 	struct sockaddr_in saddr;
399 	struct io_uring ring;
400 	unsigned long seq_buf[SEQ_SIZE], send_seq;
401 	struct io_uring_params p = { };
402 	struct io_uring_buf_ring *br;
403 	int sockfd, ret, len, i;
404 	socklen_t optlen;
405 	void *buf, *ptr;
406 
407 	ret = io_uring_queue_init_params(16, &ring, &p);
408 	if (ret) {
409 		fprintf(stderr, "queue init failed: %d\n", ret);
410 		return 1;
411 	}
412 	if (!(p.features & IORING_FEAT_RECVSEND_BUNDLE)) {
413 		no_send_mshot = 1;
414 		return 0;
415 	}
416 
417 	if (posix_memalign(&buf, 4096, MSG_SIZE * nr_msgs))
418 		return 1;
419 
420 	br = io_uring_setup_buf_ring(&ring, nr_msgs, SEND_BGID, 0, &ret);
421 	if (!br) {
422 		if (ret == -EINVAL) {
423 			fprintf(stderr, "einval on br setup\n");
424 			return 0;
425 		}
426 		fprintf(stderr, "failed setting up send ring %d\n", ret);
427 		return 1;
428 	}
429 
430 	ptr = buf;
431 	for (i = 0; i < nr_msgs; i++) {
432 		io_uring_buf_ring_add(br, ptr, MSG_SIZE, i, nr_msgs - 1, i);
433 		ptr += MSG_SIZE;
434 	}
435 	io_uring_buf_ring_advance(br, nr_msgs);
436 
437 	memset(&saddr, 0, sizeof(saddr));
438 	saddr.sin_family = AF_INET;
439 	saddr.sin_port = htons(use_port);
440 	inet_pton(AF_INET, HOST, &saddr.sin_addr);
441 
442 	if (use_tcp)
443 		sockfd = socket(AF_INET, SOCK_STREAM, 0);
444 	else
445 		sockfd = socket(AF_INET, SOCK_DGRAM, 0);
446 	if (sockfd < 0) {
447 		perror("socket");
448 		goto err2;
449 	}
450 
451 	pthread_barrier_wait(&rd->connect);
452 
453 	ret = connect(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
454 	if (ret < 0) {
455 		perror("connect");
456 		goto err;
457 	}
458 
459 	pthread_barrier_wait(&rd->startup);
460 
461 	optlen = sizeof(len);
462 	len = 1024 * MSG_SIZE;
463 	setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &len, optlen);
464 
465 	/* almost fill queue, leave room for one message */
466 	send_seq = 0;
467 	rd->to_eagain = 0;
468 	while (rd->max_sends && rd->max_sends--) {
469 		for (i = 0; i < SEQ_SIZE; i++)
470 			seq_buf[i] = send_seq++;
471 
472 		ret = send(sockfd, seq_buf, sizeof(seq_buf), MSG_DONTWAIT);
473 		if (ret < 0) {
474 			if (errno == EAGAIN) {
475 				send_seq -= SEQ_SIZE;
476 				break;
477 			}
478 			perror("send");
479 			return 1;
480 		} else if (ret != sizeof(seq_buf)) {
481 			fprintf(stderr, "short %d send\n", ret);
482 			return 1;
483 		}
484 
485 		rd->to_eagain++;
486 		rd->recv_bytes += sizeof(seq_buf);
487 	}
488 
489 	ptr = buf;
490 	for (i = 0; i < nr_msgs; i++) {
491 		unsigned long *pseq = ptr;
492 		int j;
493 
494 		for (j = 0; j < SEQ_SIZE; j++)
495 			pseq[j] = send_seq++;
496 		ptr += MSG_SIZE;
497 	}
498 
499 	/* prepare more messages, sending with bundle */
500 	rd->recv_bytes += (nr_msgs * MSG_SIZE);
501 	if (rd->send_bundle && use_tcp)
502 		ret = __do_send_bundle(rd, &ring, sockfd);
503 	else
504 		ret = __do_send(rd, &ring, sockfd);
505 	if (ret)
506 		goto err;
507 
508 	pthread_barrier_wait(&rd->finish);
509 
510 	close(sockfd);
511 	io_uring_queue_exit(&ring);
512 	return 0;
513 
514 err:
515 	close(sockfd);
516 err2:
517 	io_uring_queue_exit(&ring);
518 	pthread_barrier_wait(&rd->finish);
519 	return 1;
520 }
521 
test(int backlog,unsigned int max_sends,int * to_eagain,int send_bundle,int recv_bundle)522 static int test(int backlog, unsigned int max_sends, int *to_eagain,
523 		int send_bundle, int recv_bundle)
524 {
525 	pthread_t recv_thread;
526 	struct recv_data rd;
527 	int ret;
528 	void *retval;
529 
530 	/* backlog not reliable on UDP, skip it */
531 	if ((backlog || max_sends) && !use_tcp)
532 		return T_EXIT_PASS;
533 
534 	memset(&rd, 0, sizeof(rd));
535 	pthread_barrier_init(&rd.connect, NULL, 2);
536 	pthread_barrier_init(&rd.startup, NULL, 2);
537 	pthread_barrier_init(&rd.barrier, NULL, 2);
538 	pthread_barrier_init(&rd.finish, NULL, 2);
539 	rd.max_sends = max_sends;
540 	if (to_eagain)
541 		*to_eagain = 0;
542 
543 	rd.send_bundle = send_bundle;
544 	rd.recv_bundle = recv_bundle;
545 
546 	ret = pthread_create(&recv_thread, NULL, recv_fn, &rd);
547 	if (ret) {
548 		fprintf(stderr, "Thread create failed: %d\n", ret);
549 		return 1;
550 	}
551 
552 	ret = do_send(&rd);
553 	if (no_send_mshot)
554 		return 0;
555 
556 	if (ret)
557 		return ret;
558 
559 	pthread_join(recv_thread, &retval);
560 	if (to_eagain)
561 		*to_eagain = rd.to_eagain;
562 	return (intptr_t)retval;
563 }
564 
run_tests(int is_udp)565 static int run_tests(int is_udp)
566 {
567 	int ret, eagain_hit;
568 
569 	nr_msgs = NR_MIN_MSGS;
570 
571 	/* test basic send bundle first */
572 	ret = test(0, 0, NULL, 0, 0);
573 	if (ret) {
574 		fprintf(stderr, "test a failed\n");
575 		return T_EXIT_FAIL;
576 	}
577 	if (no_send_mshot)
578 		return T_EXIT_SKIP;
579 
580 	/* test recv bundle */
581 	ret = test(0, 0, NULL, 0, 1);
582 	if (ret) {
583 		fprintf(stderr, "test b failed\n");
584 		return T_EXIT_FAIL;
585 	}
586 
587 	/* test bundling recv and send */
588 	ret = test(0, 0, NULL, 1, 1);
589 	if (ret) {
590 		fprintf(stderr, "test c failed\n");
591 		return T_EXIT_FAIL;
592 	}
593 
594 	/* test bundling with full socket */
595 	ret = test(1, 1000000, &eagain_hit, 1, 1);
596 	if (ret) {
597 		fprintf(stderr, "test d failed\n");
598 		return T_EXIT_FAIL;
599 	}
600 
601 	/* test bundling with almost full socket */
602 	ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 1, 1);
603 	if (ret) {
604 		fprintf(stderr, "test e failed\n");
605 		return T_EXIT_FAIL;
606 	}
607 
608 	/* test recv bundle with almost full socket */
609 	ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 0, 1);
610 	if (ret) {
611 		fprintf(stderr, "test f failed\n");
612 		return T_EXIT_FAIL;
613 	}
614 
615 	if (is_udp)
616 		return T_EXIT_PASS;
617 
618 	/* test send bundle with almost full socket */
619 	ret = test(1, eagain_hit - (nr_msgs / 2), &eagain_hit, 1, 0);
620 	if (ret) {
621 		fprintf(stderr, "test g failed\n");
622 		return T_EXIT_FAIL;
623 	}
624 
625 	/* now repeat the last three tests, but with > FAST_UIOV segments */
626 	nr_msgs = NR_MAX_MSGS;
627 
628 	/* test bundling with almost full socket */
629 	ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 1, 1);
630 	if (ret) {
631 		fprintf(stderr, "test h failed\n");
632 		return T_EXIT_FAIL;
633 	}
634 
635 	/* test recv bundle with almost full socket */
636 	ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 0, 1);
637 	if (ret) {
638 		fprintf(stderr, "test i failed\n");
639 		return T_EXIT_FAIL;
640 	}
641 
642 	/* test send bundle with almost full socket */
643 	ret = test(1, eagain_hit - (nr_msgs / 2), &eagain_hit, 1, 0);
644 	if (ret) {
645 		fprintf(stderr, "test j failed\n");
646 		return T_EXIT_FAIL;
647 	}
648 
649 	return T_EXIT_PASS;
650 }
651 
test_tcp(void)652 static int test_tcp(void)
653 {
654 	int ret;
655 
656 	use_tcp = 1;
657 	ret = run_tests(false);
658 	if (ret == T_EXIT_FAIL)
659 		fprintf(stderr, "TCP test case failed\n");
660 	return ret;
661 }
662 
test_udp(void)663 static int test_udp(void)
664 {
665 	int ret;
666 
667 	use_tcp = 0;
668 	use_port++;
669 	ret = run_tests(true);
670 	if (ret == T_EXIT_FAIL)
671 		fprintf(stderr, "UDP test case failed\n");
672 	return ret;
673 }
674 
main(int argc,char * argv[])675 int main(int argc, char *argv[])
676 {
677 	int ret;
678 
679 	if (argc > 1)
680 		return T_EXIT_SKIP;
681 
682 	ret = test_tcp();
683 	if (ret != T_EXIT_PASS)
684 		return ret;
685 
686 	ret = test_udp();
687 	if (ret != T_EXIT_PASS)
688 		return ret;
689 
690 	return T_EXIT_PASS;
691 }
692