• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* SPDX-License-Identifier: MIT */
2 /*
3  * Simple test case showing using send and recv bundles with incremental
4  * buffer ring usage
5  */
6 #include <errno.h>
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <unistd.h>
11 #include <arpa/inet.h>
12 #include <sys/types.h>
13 #include <sys/socket.h>
14 #include <pthread.h>
15 
16 #define MSG_SIZE 128
17 #define NR_MIN_MSGS	4
18 #define NR_MAX_MSGS	32
19 #define SEQ_SIZE	(MSG_SIZE / sizeof(unsigned long))
20 
21 static int nr_msgs;
22 
23 #define RECV_BIDS	8192
24 #define RECV_BID_MASK	(RECV_BIDS - 1)
25 
26 #include <liburing.h>
27 
28 enum t_test_result {
29 	T_EXIT_PASS   = 0,
30 	T_EXIT_FAIL   = 1,
31 	T_EXIT_SKIP   = 77,
32 };
33 
34 #define PORT	10202
35 #define HOST	"127.0.0.1"
36 
37 static int use_port = PORT;
38 
39 #define SEND_BGID	7
40 #define RECV_BGID	8
41 
42 static int no_send_mshot;
43 
44 struct recv_data {
45 	pthread_barrier_t connect;
46 	pthread_barrier_t startup;
47 	pthread_barrier_t barrier;
48 	pthread_barrier_t finish;
49 	unsigned long seq;
50 	int recv_bytes;
51 	int accept_fd;
52 	int abort;
53 	unsigned int max_sends;
54 	int to_eagain;
55 	void *recv_buf;
56 
57 	int send_bundle;
58 	int recv_bundle;
59 };
60 
arm_recv(struct io_uring * ring,struct recv_data * rd)61 static int arm_recv(struct io_uring *ring, struct recv_data *rd)
62 {
63 	struct io_uring_sqe *sqe;
64 	int ret;
65 
66 	sqe = io_uring_get_sqe(ring);
67 	io_uring_prep_recv_multishot(sqe, rd->accept_fd, NULL, 0, 0);
68 	if (rd->recv_bundle)
69 		sqe->ioprio |= IORING_RECVSEND_BUNDLE;
70 	sqe->buf_group = RECV_BGID;
71 	sqe->flags |= IOSQE_BUFFER_SELECT;
72 	sqe->user_data = 2;
73 
74 	ret = io_uring_submit(ring);
75 	if (ret != 1) {
76 		fprintf(stderr, "submit failed: %d\n", ret);
77 		return 1;
78 	}
79 
80 	return 0;
81 }
82 
recv_prep(struct io_uring * ring,struct recv_data * rd,int * sock)83 static int recv_prep(struct io_uring *ring, struct recv_data *rd, int *sock)
84 {
85 	struct sockaddr_in saddr;
86 	int sockfd, ret, val, use_fd;
87 	socklen_t socklen;
88 
89 	memset(&saddr, 0, sizeof(saddr));
90 	saddr.sin_family = AF_INET;
91 	saddr.sin_addr.s_addr = htonl(INADDR_ANY);
92 	saddr.sin_port = htons(use_port);
93 
94 	sockfd = socket(AF_INET, SOCK_STREAM, 0);
95 	if (sockfd < 0) {
96 		perror("socket");
97 		return 1;
98 	}
99 
100 	val = 1;
101 	setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
102 
103 	ret = bind(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
104 	if (ret < 0) {
105 		perror("bind");
106 		goto err;
107 	}
108 
109 	ret = listen(sockfd, 1);
110 	if (ret < 0) {
111 		perror("listen");
112 		goto err;
113 	}
114 
115 	pthread_barrier_wait(&rd->connect);
116 
117 	socklen = sizeof(saddr);
118 	use_fd = accept(sockfd, (struct sockaddr *)&saddr, &socklen);
119 	if (use_fd < 0) {
120 		perror("accept");
121 		goto err;
122 	}
123 
124 	rd->accept_fd = use_fd;
125 	pthread_barrier_wait(&rd->startup);
126 	pthread_barrier_wait(&rd->barrier);
127 
128 	if (arm_recv(ring, rd))
129 		goto err;
130 
131 	*sock = sockfd;
132 	return 0;
133 err:
134 	close(sockfd);
135 	return 1;
136 }
137 
verify_seq(struct recv_data * rd,void * verify_ptr,int verify_sz,int start_bid)138 static int verify_seq(struct recv_data *rd, void *verify_ptr, int verify_sz,
139 		      int start_bid)
140 {
141 	unsigned long *seqp;
142 	int seq_size = verify_sz / sizeof(unsigned long);
143 	int i;
144 
145 	seqp = verify_ptr;
146 	for (i = 0; i < seq_size; i++) {
147 		if (rd->seq != *seqp) {
148 			fprintf(stderr, "bid=%d, got seq %lu, wanted %lu, offset %d\n", start_bid, *seqp, rd->seq, i);
149 			return 0;
150 		}
151 		seqp++;
152 		rd->seq++;
153 	}
154 
155 	return 1;
156 }
157 
recv_get_cqe(struct io_uring * ring,struct recv_data * rd,struct io_uring_cqe ** cqe)158 static int recv_get_cqe(struct io_uring *ring, struct recv_data *rd,
159 			struct io_uring_cqe **cqe)
160 {
161 	struct __kernel_timespec ts = { .tv_sec = 0, .tv_nsec = 100000000LL };
162 	int ret;
163 
164 	do {
165 		ret = io_uring_wait_cqe_timeout(ring, cqe, &ts);
166 		if (!ret)
167 			return 0;
168 		if (ret == -ETIME) {
169 			if (rd->abort)
170 				break;
171 			continue;
172 		}
173 		fprintf(stderr, "wait recv: %d\n", ret);
174 		break;
175 	} while (1);
176 
177 	return 1;
178 }
179 
do_recv(struct io_uring * ring,struct recv_data * rd)180 static int do_recv(struct io_uring *ring, struct recv_data *rd)
181 {
182 	struct io_uring_cqe *cqe;
183 	void *verify_ptr;
184 	int verify_sz = 0;
185 	int verify_bid = 0;
186 	int bid;
187 
188 	verify_ptr = malloc(rd->recv_bytes);
189 
190 	do {
191 		if (recv_get_cqe(ring, rd, &cqe))
192 			break;
193 		if (cqe->res == -EINVAL) {
194 			fprintf(stdout, "recv not supported, skipping\n");
195 			return 0;
196 		}
197 		if (cqe->res < 0) {
198 			fprintf(stderr, "failed recv cqe: %d\n", cqe->res);
199 			goto err;
200 		}
201 		if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
202 			fprintf(stderr, "no buffer set in recv\n");
203 			goto err;
204 		}
205 		if (!(cqe->flags & IORING_CQE_F_BUF_MORE)) {
206 			fprintf(stderr, "CQE_F_BUF_MORE not set\n");
207 			goto err;
208 		}
209 		bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
210 		if (bid != 0) {
211 			fprintf(stderr, "got bid %d\n", bid);
212 			goto err;
213 		}
214 		if (!(verify_sz % MSG_SIZE)) {
215 			if (!verify_seq(rd, verify_ptr, verify_sz, verify_bid))
216 				goto err;
217 			verify_bid += verify_sz / MSG_SIZE;
218 			verify_bid &= RECV_BID_MASK;
219 			verify_sz = 0;
220 		} else {
221 			memcpy(verify_ptr + verify_sz, rd->recv_buf + (bid * MSG_SIZE), cqe->res);
222 			verify_sz += cqe->res;
223 		}
224 		rd->recv_bytes -= cqe->res;
225 		io_uring_cqe_seen(ring, cqe);
226 		if (!(cqe->flags & IORING_CQE_F_MORE) && rd->recv_bytes) {
227 			if (arm_recv(ring, rd))
228 				goto err;
229 		}
230 	} while (rd->recv_bytes);
231 
232 	if (verify_sz && !(verify_sz % MSG_SIZE) &&
233 	    !verify_seq(rd, verify_ptr, verify_sz, verify_bid))
234 		goto err;
235 
236 	pthread_barrier_wait(&rd->finish);
237 	return 0;
238 err:
239 	pthread_barrier_wait(&rd->finish);
240 	return 1;
241 }
242 
recv_fn(void * data)243 static void *recv_fn(void *data)
244 {
245 	struct recv_data *rd = data;
246 	struct io_uring_params p = { };
247 	struct io_uring ring;
248 	struct io_uring_buf_ring *br;
249 	void *buf, *ptr;
250 	int ret, sock;
251 
252 	p.cq_entries = 4096;
253 	p.flags = IORING_SETUP_CQSIZE;
254 	io_uring_queue_init_params(16, &ring, &p);
255 
256 	ret = 0;
257 	if (posix_memalign(&buf, 4096, MSG_SIZE * RECV_BIDS))
258 		goto err;
259 
260 	br = io_uring_setup_buf_ring(&ring, RECV_BIDS, RECV_BGID, IOU_PBUF_RING_INC, &ret);
261 	if (!br) {
262 		fprintf(stderr, "failed setting up recv ring %d\n", ret);
263 		goto err;
264 	}
265 
266 	ptr = buf;
267 	io_uring_buf_ring_add(br, ptr, MSG_SIZE * RECV_BIDS, 0, RECV_BID_MASK, 0);
268 	io_uring_buf_ring_advance(br, 1);
269 	rd->recv_buf = buf;
270 
271 	ret = recv_prep(&ring, rd, &sock);
272 	if (ret) {
273 		fprintf(stderr, "recv_prep failed: %d\n", ret);
274 		goto err;
275 	}
276 
277 	ret = do_recv(&ring, rd);
278 
279 	close(sock);
280 	close(rd->accept_fd);
281 	free(buf);
282 	io_uring_queue_exit(&ring);
283 err:
284 	return (void *)(intptr_t)ret;
285 }
286 
__do_send_bundle(struct recv_data * rd,struct io_uring * ring,int sockfd)287 static int __do_send_bundle(struct recv_data *rd, struct io_uring *ring, int sockfd)
288 {
289 	struct io_uring_cqe *cqe;
290 	struct io_uring_sqe *sqe;
291 	int bytes_needed = MSG_SIZE * nr_msgs;
292 	int i, ret;
293 
294 	sqe = io_uring_get_sqe(ring);
295 	io_uring_prep_send_bundle(sqe, sockfd, 0, 0);
296 	sqe->flags |= IOSQE_BUFFER_SELECT;
297 	sqe->buf_group = SEND_BGID;
298 	sqe->user_data = 1;
299 
300 	ret = io_uring_submit(ring);
301 	if (ret != 1)
302 		return 1;
303 
304 	pthread_barrier_wait(&rd->barrier);
305 
306 	for (i = 0; i < nr_msgs; i++) {
307 		ret = io_uring_wait_cqe(ring, &cqe);
308 		if (ret) {
309 			fprintf(stderr, "wait send: %d\n", ret);
310 			return 1;
311 		}
312 		if (!i && cqe->res == -EINVAL) {
313 			rd->abort = 1;
314 			no_send_mshot = 1;
315 			break;
316 		}
317 		if (cqe->res < 0) {
318 			fprintf(stderr, "bad send cqe res: %d\n", cqe->res);
319 			return 1;
320 		}
321 		bytes_needed -= cqe->res;
322 		if (!bytes_needed) {
323 			io_uring_cqe_seen(ring, cqe);
324 			break;
325 		}
326 		if (!(cqe->flags & IORING_CQE_F_MORE)) {
327 			fprintf(stderr, "expected more, but MORE not set\n");
328 			return 1;
329 		}
330 		io_uring_cqe_seen(ring, cqe);
331 	}
332 
333 	return 0;
334 }
335 
__do_send(struct recv_data * rd,struct io_uring * ring,int sockfd)336 static int __do_send(struct recv_data *rd, struct io_uring *ring, int sockfd)
337 {
338 	struct io_uring_cqe *cqe;
339 	struct io_uring_sqe *sqe;
340 	int bytes_needed = MSG_SIZE * nr_msgs;
341 	int i, ret;
342 
343 	for (i = 0; i < nr_msgs; i++) {
344 		sqe = io_uring_get_sqe(ring);
345 		io_uring_prep_send(sqe, sockfd, NULL, 0, 0);
346 		sqe->user_data = 10 + i;
347 		sqe->flags |= IOSQE_BUFFER_SELECT;
348 		sqe->buf_group = SEND_BGID;
349 
350 		ret = io_uring_submit(ring);
351 		if (ret != 1)
352 			return 1;
353 
354 		if (!i)
355 			pthread_barrier_wait(&rd->barrier);
356 		ret = io_uring_wait_cqe(ring, &cqe);
357 		if (ret) {
358 			fprintf(stderr, "send wait cqe %d\n", ret);
359 			return 1;
360 		}
361 
362 		if (!i && cqe->res == -EINVAL) {
363 			rd->abort = 1;
364 			no_send_mshot = 1;
365 			break;
366 		}
367 		if (cqe->res != MSG_SIZE) {
368 			fprintf(stderr, "send failed cqe: %d\n", cqe->res);
369 			return 1;
370 		}
371 		if (cqe->res < 0) {
372 			fprintf(stderr, "bad send cqe res: %d\n", cqe->res);
373 			return 1;
374 		}
375 		bytes_needed -= cqe->res;
376 		io_uring_cqe_seen(ring, cqe);
377 		if (!bytes_needed)
378 			break;
379 	}
380 
381 	return 0;
382 }
383 
do_send(struct recv_data * rd)384 static int do_send(struct recv_data *rd)
385 {
386 	struct sockaddr_in saddr;
387 	struct io_uring ring;
388 	unsigned long seq_buf[SEQ_SIZE], send_seq;
389 	struct io_uring_params p = { };
390 	struct io_uring_buf_ring *br;
391 	int sockfd, ret, len, i;
392 	socklen_t optlen;
393 	void *buf, *ptr;
394 
395 	ret = io_uring_queue_init_params(16, &ring, &p);
396 	if (ret) {
397 		fprintf(stderr, "queue init failed: %d\n", ret);
398 		return 1;
399 	}
400 	if (!(p.features & IORING_FEAT_RECVSEND_BUNDLE)) {
401 		no_send_mshot = 1;
402 		return 0;
403 	}
404 
405 	if (posix_memalign(&buf, 4096, MSG_SIZE * nr_msgs))
406 		return 1;
407 
408 	br = io_uring_setup_buf_ring(&ring, nr_msgs, SEND_BGID, 0, &ret);
409 	if (!br) {
410 		if (ret == -EINVAL) {
411 			fprintf(stderr, "einval on br setup\n");
412 			return 0;
413 		}
414 		fprintf(stderr, "failed setting up send ring %d\n", ret);
415 		return 1;
416 	}
417 
418 	ptr = buf;
419 	for (i = 0; i < nr_msgs; i++) {
420 		io_uring_buf_ring_add(br, ptr, MSG_SIZE, i, nr_msgs - 1, i);
421 		ptr += MSG_SIZE;
422 	}
423 	io_uring_buf_ring_advance(br, nr_msgs);
424 
425 	memset(&saddr, 0, sizeof(saddr));
426 	saddr.sin_family = AF_INET;
427 	saddr.sin_port = htons(use_port);
428 	inet_pton(AF_INET, HOST, &saddr.sin_addr);
429 
430 	sockfd = socket(AF_INET, SOCK_STREAM, 0);
431 	if (sockfd < 0) {
432 		perror("socket");
433 		goto err2;
434 	}
435 
436 	pthread_barrier_wait(&rd->connect);
437 
438 	ret = connect(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
439 	if (ret < 0) {
440 		perror("connect");
441 		goto err;
442 	}
443 
444 	pthread_barrier_wait(&rd->startup);
445 
446 	optlen = sizeof(len);
447 	len = 1024 * MSG_SIZE;
448 	setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &len, optlen);
449 
450 	/* almost fill queue, leave room for one message */
451 	send_seq = 0;
452 	rd->to_eagain = 0;
453 	while (rd->max_sends && rd->max_sends--) {
454 		for (i = 0; i < SEQ_SIZE; i++)
455 			seq_buf[i] = send_seq++;
456 
457 		ret = send(sockfd, seq_buf, sizeof(seq_buf), MSG_DONTWAIT);
458 		if (ret < 0) {
459 			if (errno == EAGAIN) {
460 				send_seq -= SEQ_SIZE;
461 				break;
462 			}
463 			perror("send");
464 			return 1;
465 		} else if (ret != sizeof(seq_buf)) {
466 			fprintf(stderr, "short %d send\n", ret);
467 			return 1;
468 		}
469 
470 		rd->to_eagain++;
471 		rd->recv_bytes += sizeof(seq_buf);
472 	}
473 
474 	ptr = buf;
475 	for (i = 0; i < nr_msgs; i++) {
476 		unsigned long *pseq = ptr;
477 		int j;
478 
479 		for (j = 0; j < SEQ_SIZE; j++)
480 			pseq[j] = send_seq++;
481 		ptr += MSG_SIZE;
482 	}
483 
484 	/* prepare more messages, sending with bundle */
485 	rd->recv_bytes += (nr_msgs * MSG_SIZE);
486 	if (rd->send_bundle)
487 		ret = __do_send_bundle(rd, &ring, sockfd);
488 	else
489 		ret = __do_send(rd, &ring, sockfd);
490 	if (ret)
491 		goto err;
492 
493 	pthread_barrier_wait(&rd->finish);
494 
495 	close(sockfd);
496 	free(buf);
497 	io_uring_queue_exit(&ring);
498 	return 0;
499 
500 err:
501 	close(sockfd);
502 err2:
503 	io_uring_queue_exit(&ring);
504 	pthread_barrier_wait(&rd->finish);
505 	return 1;
506 }
507 
test(int backlog,unsigned int max_sends,int * to_eagain,int send_bundle,int recv_bundle)508 static int test(int backlog, unsigned int max_sends, int *to_eagain,
509 		int send_bundle, int recv_bundle)
510 {
511 	pthread_t recv_thread;
512 	struct recv_data rd;
513 	int ret;
514 	void *retval;
515 
516 	memset(&rd, 0, sizeof(rd));
517 	pthread_barrier_init(&rd.connect, NULL, 2);
518 	pthread_barrier_init(&rd.startup, NULL, 2);
519 	pthread_barrier_init(&rd.barrier, NULL, 2);
520 	pthread_barrier_init(&rd.finish, NULL, 2);
521 	rd.max_sends = max_sends;
522 	if (to_eagain)
523 		*to_eagain = 0;
524 
525 	rd.send_bundle = send_bundle;
526 	rd.recv_bundle = recv_bundle;
527 
528 	ret = pthread_create(&recv_thread, NULL, recv_fn, &rd);
529 	if (ret) {
530 		fprintf(stderr, "Thread create failed: %d\n", ret);
531 		return 1;
532 	}
533 
534 	ret = do_send(&rd);
535 	if (no_send_mshot)
536 		return 0;
537 
538 	if (ret)
539 		return ret;
540 
541 	pthread_join(recv_thread, &retval);
542 	if (to_eagain)
543 		*to_eagain = rd.to_eagain;
544 	return (intptr_t)retval;
545 }
546 
run_tests(void)547 static int run_tests(void)
548 {
549 	int ret, eagain_hit;
550 
551 	nr_msgs = NR_MIN_MSGS;
552 
553 	/* test basic send bundle first */
554 	ret = test(0, 0, NULL, 0, 0);
555 	if (ret) {
556 		fprintf(stderr, "test a failed\n");
557 		return T_EXIT_FAIL;
558 	}
559 	if (no_send_mshot)
560 		return T_EXIT_SKIP;
561 
562 	/* test recv bundle */
563 	ret = test(0, 0, NULL, 0, 1);
564 	if (ret) {
565 		fprintf(stderr, "test b failed\n");
566 		return T_EXIT_FAIL;
567 	}
568 
569 	/* test bundling recv and send */
570 	ret = test(0, 0, NULL, 1, 1);
571 	if (ret) {
572 		fprintf(stderr, "test c failed\n");
573 		return T_EXIT_FAIL;
574 	}
575 
576 	/* test bundling with full socket */
577 	ret = test(1, 1000000, &eagain_hit, 1, 1);
578 	if (ret) {
579 		fprintf(stderr, "test d failed\n");
580 		return T_EXIT_FAIL;
581 	}
582 
583 	/* test bundling with almost full socket */
584 	ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 1, 1);
585 	if (ret) {
586 		fprintf(stderr, "test e failed\n");
587 		return T_EXIT_FAIL;
588 	}
589 
590 	/* test recv bundle with almost full socket */
591 	ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 0, 1);
592 	if (ret) {
593 		fprintf(stderr, "test f failed\n");
594 		return T_EXIT_FAIL;
595 	}
596 
597 	/* test send bundle with almost full socket */
598 	ret = test(1, eagain_hit - (nr_msgs / 2), &eagain_hit, 1, 0);
599 	if (ret) {
600 		fprintf(stderr, "test g failed\n");
601 		return T_EXIT_FAIL;
602 	}
603 
604 	/* now repeat the last three tests, but with > FAST_UIOV segments */
605 	nr_msgs = NR_MAX_MSGS;
606 
607 	/* test bundling with almost full socket */
608 	ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 1, 1);
609 	if (ret) {
610 		fprintf(stderr, "test h failed\n");
611 		return T_EXIT_FAIL;
612 	}
613 
614 	/* test recv bundle with almost full socket */
615 	ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 0, 1);
616 	if (ret) {
617 		fprintf(stderr, "test i failed\n");
618 		return T_EXIT_FAIL;
619 	}
620 
621 	/* test send bundle with almost full socket */
622 	ret = test(1, eagain_hit - (nr_msgs / 2), &eagain_hit, 1, 0);
623 	if (ret) {
624 		fprintf(stderr, "test j failed\n");
625 		return T_EXIT_FAIL;
626 	}
627 
628 	return T_EXIT_PASS;
629 }
630 
test_tcp(void)631 static int test_tcp(void)
632 {
633 	int ret;
634 
635 	ret = run_tests();
636 	if (ret == T_EXIT_FAIL)
637 		fprintf(stderr, "TCP test case failed\n");
638 	return ret;
639 }
640 
has_pbuf_ring_inc(void)641 static bool has_pbuf_ring_inc(void)
642 {
643 	struct io_uring_buf_ring *br;
644 	bool has_pbuf_inc = false;
645 	struct io_uring ring;
646 	void *buf;
647 	int ret;
648 
649 	ret = io_uring_queue_init(1, &ring, 0);
650 	if (ret)
651 		return false;
652 
653 	if (posix_memalign(&buf, 4096, MSG_SIZE * RECV_BIDS))
654 		return false;
655 
656 	br = io_uring_setup_buf_ring(&ring, RECV_BIDS, RECV_BGID, IOU_PBUF_RING_INC, &ret);
657 	if (br) {
658 		has_pbuf_inc = true;
659 		io_uring_unregister_buf_ring(&ring, RECV_BGID);
660 	}
661 	io_uring_queue_exit(&ring);
662 	free(buf);
663 	return has_pbuf_inc;
664 }
665 
main(int argc,char * argv[])666 int main(int argc, char *argv[])
667 {
668 	int ret;
669 
670 	if (argc > 1)
671 		return T_EXIT_SKIP;
672 	if (!has_pbuf_ring_inc())
673 		return T_EXIT_SKIP;
674 
675 	ret = test_tcp();
676 	if (ret != T_EXIT_PASS)
677 		return ret;
678 
679 	return T_EXIT_PASS;
680 }
681