1 /* SPDX-License-Identifier: MIT */
2 /*
3 * Simple test case showing using send and recv bundles
4 */
5 #include <errno.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <unistd.h>
10 #include <arpa/inet.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <pthread.h>
14
15 #define MSG_SIZE 128
16 #define NR_MIN_MSGS 4
17 #define NR_MAX_MSGS 32
18 #define SEQ_SIZE (MSG_SIZE / sizeof(unsigned long))
19
20 static int nr_msgs;
21 static int use_tcp;
22
23 #define RECV_BIDS 8192
24 #define RECV_BID_MASK (RECV_BIDS - 1)
25
26 #include "liburing.h"
27 #include "helpers.h"
28
29 #define PORT 10202
30 #define HOST "127.0.0.1"
31
32 static int use_port = PORT;
33
34 #define SEND_BGID 7
35 #define RECV_BGID 8
36
37 static int no_send_mshot;
38
39 struct recv_data {
40 pthread_barrier_t connect;
41 pthread_barrier_t startup;
42 pthread_barrier_t barrier;
43 pthread_barrier_t finish;
44 unsigned long seq;
45 int recv_bytes;
46 int accept_fd;
47 int abort;
48 unsigned int max_sends;
49 int to_eagain;
50 void *recv_buf;
51
52 int send_bundle;
53 int recv_bundle;
54 };
55
arm_recv(struct io_uring * ring,struct recv_data * rd)56 static int arm_recv(struct io_uring *ring, struct recv_data *rd)
57 {
58 struct io_uring_sqe *sqe;
59 int ret;
60
61 sqe = io_uring_get_sqe(ring);
62 io_uring_prep_recv_multishot(sqe, rd->accept_fd, NULL, 0, 0);
63 if (rd->recv_bundle && use_tcp)
64 sqe->ioprio |= IORING_RECVSEND_BUNDLE;
65 sqe->buf_group = RECV_BGID;
66 sqe->flags |= IOSQE_BUFFER_SELECT;
67 sqe->user_data = 2;
68
69 ret = io_uring_submit(ring);
70 if (ret != 1) {
71 fprintf(stderr, "submit failed: %d\n", ret);
72 return 1;
73 }
74
75 return 0;
76 }
77
recv_prep(struct io_uring * ring,struct recv_data * rd,int * sock)78 static int recv_prep(struct io_uring *ring, struct recv_data *rd, int *sock)
79 {
80 struct sockaddr_in saddr;
81 int sockfd, ret, val, use_fd;
82 socklen_t socklen;
83
84 memset(&saddr, 0, sizeof(saddr));
85 saddr.sin_family = AF_INET;
86 saddr.sin_addr.s_addr = htonl(INADDR_ANY);
87 saddr.sin_port = htons(use_port);
88
89 if (use_tcp)
90 sockfd = socket(AF_INET, SOCK_STREAM, 0);
91 else
92 sockfd = socket(AF_INET, SOCK_DGRAM, 0);
93 if (sockfd < 0) {
94 perror("socket");
95 return 1;
96 }
97
98 val = 1;
99 setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
100
101 ret = bind(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
102 if (ret < 0) {
103 perror("bind");
104 goto err;
105 }
106
107 if (use_tcp) {
108 ret = listen(sockfd, 1);
109 if (ret < 0) {
110 perror("listen");
111 goto err;
112 }
113
114 pthread_barrier_wait(&rd->connect);
115
116 socklen = sizeof(saddr);
117 use_fd = accept(sockfd, (struct sockaddr *)&saddr, &socklen);
118 if (use_fd < 0) {
119 perror("accept");
120 goto err;
121 }
122 } else {
123 use_fd = sockfd;
124 pthread_barrier_wait(&rd->connect);
125 }
126
127 rd->accept_fd = use_fd;
128 pthread_barrier_wait(&rd->startup);
129 pthread_barrier_wait(&rd->barrier);
130
131 if (arm_recv(ring, rd))
132 goto err;
133
134 *sock = sockfd;
135 return 0;
136 err:
137 close(sockfd);
138 return 1;
139 }
140
verify_seq(struct recv_data * rd,void * verify_ptr,int verify_sz,int start_bid)141 static int verify_seq(struct recv_data *rd, void *verify_ptr, int verify_sz,
142 int start_bid)
143 {
144 unsigned long *seqp;
145 int seq_size = verify_sz / sizeof(unsigned long);
146 int i;
147
148 seqp = verify_ptr;
149 for (i = 0; i < seq_size; i++) {
150 if (rd->seq != *seqp) {
151 fprintf(stderr, "bid=%d, got seq %lu, wanted %lu, offset %d\n", start_bid, *seqp, rd->seq, i);
152 return 0;
153 }
154 seqp++;
155 rd->seq++;
156 }
157
158 return 1;
159 }
160
recv_get_cqe(struct io_uring * ring,struct recv_data * rd,struct io_uring_cqe ** cqe)161 static int recv_get_cqe(struct io_uring *ring, struct recv_data *rd,
162 struct io_uring_cqe **cqe)
163 {
164 struct __kernel_timespec ts = { .tv_sec = 0, .tv_nsec = 100000000LL };
165 int ret;
166
167 do {
168 ret = io_uring_wait_cqe_timeout(ring, cqe, &ts);
169 if (!ret)
170 return 0;
171 if (ret == -ETIME) {
172 if (rd->abort)
173 break;
174 continue;
175 }
176 fprintf(stderr, "wait recv: %d\n", ret);
177 break;
178 } while (1);
179
180 return 1;
181 }
182
do_recv(struct io_uring * ring,struct recv_data * rd)183 static int do_recv(struct io_uring *ring, struct recv_data *rd)
184 {
185 struct io_uring_cqe *cqe;
186 int bid, next_bid = 0;
187 void *verify_ptr;
188 int verify_sz = 0;
189 int verify_bid = 0;
190
191 verify_ptr = malloc(rd->recv_bytes);
192
193 do {
194 if (recv_get_cqe(ring, rd, &cqe))
195 break;
196 if (cqe->res == -EINVAL) {
197 fprintf(stdout, "recv not supported, skipping\n");
198 return 0;
199 }
200 if (cqe->res < 0) {
201 fprintf(stderr, "failed recv cqe: %d\n", cqe->res);
202 goto err;
203 }
204 if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
205 fprintf(stderr, "no buffer set in recv\n");
206 goto err;
207 }
208 bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
209 if (bid != next_bid) {
210 fprintf(stderr, "got bid %d, wanted %d\n", bid, next_bid);
211 goto err;
212 }
213 if (!rd->recv_bundle && cqe->res != MSG_SIZE) {
214 fprintf(stderr, "recv got wrong length: %d\n", cqe->res);
215 goto err;
216 }
217 if (!(verify_sz % MSG_SIZE)) {
218 if (!verify_seq(rd, verify_ptr, verify_sz, verify_bid))
219 goto err;
220 verify_bid += verify_sz / MSG_SIZE;
221 verify_bid &= RECV_BID_MASK;
222 verify_sz = 0;
223 } else {
224 memcpy(verify_ptr + verify_sz, rd->recv_buf + (bid * MSG_SIZE), cqe->res);
225 verify_sz += cqe->res;
226 }
227 next_bid = bid + ((cqe->res + MSG_SIZE - 1) / MSG_SIZE);
228 next_bid &= RECV_BID_MASK;
229 rd->recv_bytes -= cqe->res;
230 io_uring_cqe_seen(ring, cqe);
231 if (!(cqe->flags & IORING_CQE_F_MORE) && rd->recv_bytes) {
232 if (arm_recv(ring, rd))
233 goto err;
234 }
235 } while (rd->recv_bytes);
236
237 if (verify_sz && !(verify_sz % MSG_SIZE) &&
238 !verify_seq(rd, verify_ptr, verify_sz, verify_bid))
239 goto err;
240
241 pthread_barrier_wait(&rd->finish);
242 return 0;
243 err:
244 pthread_barrier_wait(&rd->finish);
245 return 1;
246 }
247
recv_fn(void * data)248 static void *recv_fn(void *data)
249 {
250 struct recv_data *rd = data;
251 struct io_uring_params p = { };
252 struct io_uring ring;
253 struct io_uring_buf_ring *br;
254 void *buf, *ptr;
255 int ret, sock, i;
256
257 p.cq_entries = 4096;
258 p.flags = IORING_SETUP_CQSIZE;
259 ret = t_create_ring_params(16, &ring, &p);
260 if (ret == T_SETUP_SKIP) {
261 ret = 0;
262 goto err;
263 } else if (ret < 0) {
264 goto err;
265 }
266
267 if (posix_memalign(&buf, 4096, MSG_SIZE * RECV_BIDS))
268 goto err;
269
270 br = io_uring_setup_buf_ring(&ring, RECV_BIDS, RECV_BGID, 0, &ret);
271 if (!br) {
272 fprintf(stderr, "failed setting up recv ring %d\n", ret);
273 goto err;
274 }
275
276 ptr = buf;
277 for (i = 0; i < RECV_BIDS; i++) {
278 io_uring_buf_ring_add(br, ptr, MSG_SIZE, i, RECV_BID_MASK, i);
279 ptr += MSG_SIZE;
280 }
281 io_uring_buf_ring_advance(br, RECV_BIDS);
282 rd->recv_buf = buf;
283
284 ret = recv_prep(&ring, rd, &sock);
285 if (ret) {
286 fprintf(stderr, "recv_prep failed: %d\n", ret);
287 goto err;
288 }
289
290 ret = do_recv(&ring, rd);
291
292 close(sock);
293 close(rd->accept_fd);
294 io_uring_queue_exit(&ring);
295 err:
296 return (void *)(intptr_t)ret;
297 }
298
__do_send_bundle(struct recv_data * rd,struct io_uring * ring,int sockfd)299 static int __do_send_bundle(struct recv_data *rd, struct io_uring *ring, int sockfd)
300 {
301 struct io_uring_cqe *cqe;
302 struct io_uring_sqe *sqe;
303 int bytes_needed = MSG_SIZE * nr_msgs;
304 int i, ret;
305
306 sqe = io_uring_get_sqe(ring);
307 io_uring_prep_send_bundle(sqe, sockfd, 0, 0);
308 sqe->flags |= IOSQE_BUFFER_SELECT;
309 sqe->buf_group = SEND_BGID;
310 sqe->user_data = 1;
311
312 ret = io_uring_submit(ring);
313 if (ret != 1)
314 return 1;
315
316 pthread_barrier_wait(&rd->barrier);
317
318 for (i = 0; i < nr_msgs; i++) {
319 ret = io_uring_wait_cqe(ring, &cqe);
320 if (ret) {
321 fprintf(stderr, "wait send: %d\n", ret);
322 return 1;
323 }
324 if (!i && cqe->res == -EINVAL) {
325 rd->abort = 1;
326 no_send_mshot = 1;
327 break;
328 }
329 if (cqe->res < 0) {
330 fprintf(stderr, "bad send cqe res: %d\n", cqe->res);
331 return 1;
332 }
333 bytes_needed -= cqe->res;
334 if (!bytes_needed) {
335 io_uring_cqe_seen(ring, cqe);
336 break;
337 }
338 if (!(cqe->flags & IORING_CQE_F_MORE)) {
339 fprintf(stderr, "expected more, but MORE not set\n");
340 return 1;
341 }
342 io_uring_cqe_seen(ring, cqe);
343 }
344
345 return 0;
346 }
347
__do_send(struct recv_data * rd,struct io_uring * ring,int sockfd)348 static int __do_send(struct recv_data *rd, struct io_uring *ring, int sockfd)
349 {
350 struct io_uring_cqe *cqe;
351 struct io_uring_sqe *sqe;
352 int bytes_needed = MSG_SIZE * nr_msgs;
353 int i, ret;
354
355 for (i = 0; i < nr_msgs; i++) {
356 sqe = io_uring_get_sqe(ring);
357 io_uring_prep_send(sqe, sockfd, NULL, 0, 0);
358 sqe->user_data = 10 + i;
359 sqe->flags |= IOSQE_BUFFER_SELECT;
360 sqe->buf_group = SEND_BGID;
361
362 ret = io_uring_submit(ring);
363 if (ret != 1)
364 return 1;
365
366 if (!i)
367 pthread_barrier_wait(&rd->barrier);
368 ret = io_uring_wait_cqe(ring, &cqe);
369 if (ret) {
370 fprintf(stderr, "send wait cqe %d\n", ret);
371 return 1;
372 }
373
374 if (!i && cqe->res == -EINVAL) {
375 rd->abort = 1;
376 no_send_mshot = 1;
377 break;
378 }
379 if (cqe->res != MSG_SIZE) {
380 fprintf(stderr, "send failed cqe: %d\n", cqe->res);
381 return 1;
382 }
383 if (cqe->res < 0) {
384 fprintf(stderr, "bad send cqe res: %d\n", cqe->res);
385 return 1;
386 }
387 bytes_needed -= cqe->res;
388 io_uring_cqe_seen(ring, cqe);
389 if (!bytes_needed)
390 break;
391 }
392
393 return 0;
394 }
395
do_send(struct recv_data * rd)396 static int do_send(struct recv_data *rd)
397 {
398 struct sockaddr_in saddr;
399 struct io_uring ring;
400 unsigned long seq_buf[SEQ_SIZE], send_seq;
401 struct io_uring_params p = { };
402 struct io_uring_buf_ring *br;
403 int sockfd, ret, len, i;
404 socklen_t optlen;
405 void *buf, *ptr;
406
407 ret = io_uring_queue_init_params(16, &ring, &p);
408 if (ret) {
409 fprintf(stderr, "queue init failed: %d\n", ret);
410 return 1;
411 }
412 if (!(p.features & IORING_FEAT_RECVSEND_BUNDLE)) {
413 no_send_mshot = 1;
414 return 0;
415 }
416
417 if (posix_memalign(&buf, 4096, MSG_SIZE * nr_msgs))
418 return 1;
419
420 br = io_uring_setup_buf_ring(&ring, nr_msgs, SEND_BGID, 0, &ret);
421 if (!br) {
422 if (ret == -EINVAL) {
423 fprintf(stderr, "einval on br setup\n");
424 return 0;
425 }
426 fprintf(stderr, "failed setting up send ring %d\n", ret);
427 return 1;
428 }
429
430 ptr = buf;
431 for (i = 0; i < nr_msgs; i++) {
432 io_uring_buf_ring_add(br, ptr, MSG_SIZE, i, nr_msgs - 1, i);
433 ptr += MSG_SIZE;
434 }
435 io_uring_buf_ring_advance(br, nr_msgs);
436
437 memset(&saddr, 0, sizeof(saddr));
438 saddr.sin_family = AF_INET;
439 saddr.sin_port = htons(use_port);
440 inet_pton(AF_INET, HOST, &saddr.sin_addr);
441
442 if (use_tcp)
443 sockfd = socket(AF_INET, SOCK_STREAM, 0);
444 else
445 sockfd = socket(AF_INET, SOCK_DGRAM, 0);
446 if (sockfd < 0) {
447 perror("socket");
448 goto err2;
449 }
450
451 pthread_barrier_wait(&rd->connect);
452
453 ret = connect(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
454 if (ret < 0) {
455 perror("connect");
456 goto err;
457 }
458
459 pthread_barrier_wait(&rd->startup);
460
461 optlen = sizeof(len);
462 len = 1024 * MSG_SIZE;
463 setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &len, optlen);
464
465 /* almost fill queue, leave room for one message */
466 send_seq = 0;
467 rd->to_eagain = 0;
468 while (rd->max_sends && rd->max_sends--) {
469 for (i = 0; i < SEQ_SIZE; i++)
470 seq_buf[i] = send_seq++;
471
472 ret = send(sockfd, seq_buf, sizeof(seq_buf), MSG_DONTWAIT);
473 if (ret < 0) {
474 if (errno == EAGAIN) {
475 send_seq -= SEQ_SIZE;
476 break;
477 }
478 perror("send");
479 return 1;
480 } else if (ret != sizeof(seq_buf)) {
481 fprintf(stderr, "short %d send\n", ret);
482 return 1;
483 }
484
485 rd->to_eagain++;
486 rd->recv_bytes += sizeof(seq_buf);
487 }
488
489 ptr = buf;
490 for (i = 0; i < nr_msgs; i++) {
491 unsigned long *pseq = ptr;
492 int j;
493
494 for (j = 0; j < SEQ_SIZE; j++)
495 pseq[j] = send_seq++;
496 ptr += MSG_SIZE;
497 }
498
499 /* prepare more messages, sending with bundle */
500 rd->recv_bytes += (nr_msgs * MSG_SIZE);
501 if (rd->send_bundle && use_tcp)
502 ret = __do_send_bundle(rd, &ring, sockfd);
503 else
504 ret = __do_send(rd, &ring, sockfd);
505 if (ret)
506 goto err;
507
508 pthread_barrier_wait(&rd->finish);
509
510 close(sockfd);
511 io_uring_queue_exit(&ring);
512 return 0;
513
514 err:
515 close(sockfd);
516 err2:
517 io_uring_queue_exit(&ring);
518 pthread_barrier_wait(&rd->finish);
519 return 1;
520 }
521
test(int backlog,unsigned int max_sends,int * to_eagain,int send_bundle,int recv_bundle)522 static int test(int backlog, unsigned int max_sends, int *to_eagain,
523 int send_bundle, int recv_bundle)
524 {
525 pthread_t recv_thread;
526 struct recv_data rd;
527 int ret;
528 void *retval;
529
530 /* backlog not reliable on UDP, skip it */
531 if ((backlog || max_sends) && !use_tcp)
532 return T_EXIT_PASS;
533
534 memset(&rd, 0, sizeof(rd));
535 pthread_barrier_init(&rd.connect, NULL, 2);
536 pthread_barrier_init(&rd.startup, NULL, 2);
537 pthread_barrier_init(&rd.barrier, NULL, 2);
538 pthread_barrier_init(&rd.finish, NULL, 2);
539 rd.max_sends = max_sends;
540 if (to_eagain)
541 *to_eagain = 0;
542
543 rd.send_bundle = send_bundle;
544 rd.recv_bundle = recv_bundle;
545
546 ret = pthread_create(&recv_thread, NULL, recv_fn, &rd);
547 if (ret) {
548 fprintf(stderr, "Thread create failed: %d\n", ret);
549 return 1;
550 }
551
552 ret = do_send(&rd);
553 if (no_send_mshot)
554 return 0;
555
556 if (ret)
557 return ret;
558
559 pthread_join(recv_thread, &retval);
560 if (to_eagain)
561 *to_eagain = rd.to_eagain;
562 return (intptr_t)retval;
563 }
564
run_tests(int is_udp)565 static int run_tests(int is_udp)
566 {
567 int ret, eagain_hit;
568
569 nr_msgs = NR_MIN_MSGS;
570
571 /* test basic send bundle first */
572 ret = test(0, 0, NULL, 0, 0);
573 if (ret) {
574 fprintf(stderr, "test a failed\n");
575 return T_EXIT_FAIL;
576 }
577 if (no_send_mshot)
578 return T_EXIT_SKIP;
579
580 /* test recv bundle */
581 ret = test(0, 0, NULL, 0, 1);
582 if (ret) {
583 fprintf(stderr, "test b failed\n");
584 return T_EXIT_FAIL;
585 }
586
587 /* test bundling recv and send */
588 ret = test(0, 0, NULL, 1, 1);
589 if (ret) {
590 fprintf(stderr, "test c failed\n");
591 return T_EXIT_FAIL;
592 }
593
594 /* test bundling with full socket */
595 ret = test(1, 1000000, &eagain_hit, 1, 1);
596 if (ret) {
597 fprintf(stderr, "test d failed\n");
598 return T_EXIT_FAIL;
599 }
600
601 /* test bundling with almost full socket */
602 ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 1, 1);
603 if (ret) {
604 fprintf(stderr, "test e failed\n");
605 return T_EXIT_FAIL;
606 }
607
608 /* test recv bundle with almost full socket */
609 ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 0, 1);
610 if (ret) {
611 fprintf(stderr, "test f failed\n");
612 return T_EXIT_FAIL;
613 }
614
615 if (is_udp)
616 return T_EXIT_PASS;
617
618 /* test send bundle with almost full socket */
619 ret = test(1, eagain_hit - (nr_msgs / 2), &eagain_hit, 1, 0);
620 if (ret) {
621 fprintf(stderr, "test g failed\n");
622 return T_EXIT_FAIL;
623 }
624
625 /* now repeat the last three tests, but with > FAST_UIOV segments */
626 nr_msgs = NR_MAX_MSGS;
627
628 /* test bundling with almost full socket */
629 ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 1, 1);
630 if (ret) {
631 fprintf(stderr, "test h failed\n");
632 return T_EXIT_FAIL;
633 }
634
635 /* test recv bundle with almost full socket */
636 ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 0, 1);
637 if (ret) {
638 fprintf(stderr, "test i failed\n");
639 return T_EXIT_FAIL;
640 }
641
642 /* test send bundle with almost full socket */
643 ret = test(1, eagain_hit - (nr_msgs / 2), &eagain_hit, 1, 0);
644 if (ret) {
645 fprintf(stderr, "test j failed\n");
646 return T_EXIT_FAIL;
647 }
648
649 return T_EXIT_PASS;
650 }
651
test_tcp(void)652 static int test_tcp(void)
653 {
654 int ret;
655
656 use_tcp = 1;
657 ret = run_tests(false);
658 if (ret == T_EXIT_FAIL)
659 fprintf(stderr, "TCP test case failed\n");
660 return ret;
661 }
662
test_udp(void)663 static int test_udp(void)
664 {
665 int ret;
666
667 use_tcp = 0;
668 use_port++;
669 ret = run_tests(true);
670 if (ret == T_EXIT_FAIL)
671 fprintf(stderr, "UDP test case failed\n");
672 return ret;
673 }
674
main(int argc,char * argv[])675 int main(int argc, char *argv[])
676 {
677 int ret;
678
679 if (argc > 1)
680 return T_EXIT_SKIP;
681
682 ret = test_tcp();
683 if (ret != T_EXIT_PASS)
684 return ret;
685
686 ret = test_udp();
687 if (ret != T_EXIT_PASS)
688 return ret;
689
690 return T_EXIT_PASS;
691 }
692