1 /* SPDX-License-Identifier: MIT */
2 /*
3 * Simple test case showing using send and recv bundles with incremental
4 * buffer ring usage
5 */
6 #include <errno.h>
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <unistd.h>
11 #include <arpa/inet.h>
12 #include <sys/types.h>
13 #include <sys/socket.h>
14 #include <pthread.h>
15
16 #define MSG_SIZE 128
17 #define NR_MIN_MSGS 4
18 #define NR_MAX_MSGS 32
19 #define SEQ_SIZE (MSG_SIZE / sizeof(unsigned long))
20
21 static int nr_msgs;
22
23 #define RECV_BIDS 8192
24 #define RECV_BID_MASK (RECV_BIDS - 1)
25
26 #include <liburing.h>
27
28 enum t_test_result {
29 T_EXIT_PASS = 0,
30 T_EXIT_FAIL = 1,
31 T_EXIT_SKIP = 77,
32 };
33
34 #define PORT 10202
35 #define HOST "127.0.0.1"
36
37 static int use_port = PORT;
38
39 #define SEND_BGID 7
40 #define RECV_BGID 8
41
42 static int no_send_mshot;
43
44 struct recv_data {
45 pthread_barrier_t connect;
46 pthread_barrier_t startup;
47 pthread_barrier_t barrier;
48 pthread_barrier_t finish;
49 unsigned long seq;
50 int recv_bytes;
51 int accept_fd;
52 int abort;
53 unsigned int max_sends;
54 int to_eagain;
55 void *recv_buf;
56
57 int send_bundle;
58 int recv_bundle;
59 };
60
arm_recv(struct io_uring * ring,struct recv_data * rd)61 static int arm_recv(struct io_uring *ring, struct recv_data *rd)
62 {
63 struct io_uring_sqe *sqe;
64 int ret;
65
66 sqe = io_uring_get_sqe(ring);
67 io_uring_prep_recv_multishot(sqe, rd->accept_fd, NULL, 0, 0);
68 if (rd->recv_bundle)
69 sqe->ioprio |= IORING_RECVSEND_BUNDLE;
70 sqe->buf_group = RECV_BGID;
71 sqe->flags |= IOSQE_BUFFER_SELECT;
72 sqe->user_data = 2;
73
74 ret = io_uring_submit(ring);
75 if (ret != 1) {
76 fprintf(stderr, "submit failed: %d\n", ret);
77 return 1;
78 }
79
80 return 0;
81 }
82
recv_prep(struct io_uring * ring,struct recv_data * rd,int * sock)83 static int recv_prep(struct io_uring *ring, struct recv_data *rd, int *sock)
84 {
85 struct sockaddr_in saddr;
86 int sockfd, ret, val, use_fd;
87 socklen_t socklen;
88
89 memset(&saddr, 0, sizeof(saddr));
90 saddr.sin_family = AF_INET;
91 saddr.sin_addr.s_addr = htonl(INADDR_ANY);
92 saddr.sin_port = htons(use_port);
93
94 sockfd = socket(AF_INET, SOCK_STREAM, 0);
95 if (sockfd < 0) {
96 perror("socket");
97 return 1;
98 }
99
100 val = 1;
101 setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
102
103 ret = bind(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
104 if (ret < 0) {
105 perror("bind");
106 goto err;
107 }
108
109 ret = listen(sockfd, 1);
110 if (ret < 0) {
111 perror("listen");
112 goto err;
113 }
114
115 pthread_barrier_wait(&rd->connect);
116
117 socklen = sizeof(saddr);
118 use_fd = accept(sockfd, (struct sockaddr *)&saddr, &socklen);
119 if (use_fd < 0) {
120 perror("accept");
121 goto err;
122 }
123
124 rd->accept_fd = use_fd;
125 pthread_barrier_wait(&rd->startup);
126 pthread_barrier_wait(&rd->barrier);
127
128 if (arm_recv(ring, rd))
129 goto err;
130
131 *sock = sockfd;
132 return 0;
133 err:
134 close(sockfd);
135 return 1;
136 }
137
verify_seq(struct recv_data * rd,void * verify_ptr,int verify_sz,int start_bid)138 static int verify_seq(struct recv_data *rd, void *verify_ptr, int verify_sz,
139 int start_bid)
140 {
141 unsigned long *seqp;
142 int seq_size = verify_sz / sizeof(unsigned long);
143 int i;
144
145 seqp = verify_ptr;
146 for (i = 0; i < seq_size; i++) {
147 if (rd->seq != *seqp) {
148 fprintf(stderr, "bid=%d, got seq %lu, wanted %lu, offset %d\n", start_bid, *seqp, rd->seq, i);
149 return 0;
150 }
151 seqp++;
152 rd->seq++;
153 }
154
155 return 1;
156 }
157
recv_get_cqe(struct io_uring * ring,struct recv_data * rd,struct io_uring_cqe ** cqe)158 static int recv_get_cqe(struct io_uring *ring, struct recv_data *rd,
159 struct io_uring_cqe **cqe)
160 {
161 struct __kernel_timespec ts = { .tv_sec = 0, .tv_nsec = 100000000LL };
162 int ret;
163
164 do {
165 ret = io_uring_wait_cqe_timeout(ring, cqe, &ts);
166 if (!ret)
167 return 0;
168 if (ret == -ETIME) {
169 if (rd->abort)
170 break;
171 continue;
172 }
173 fprintf(stderr, "wait recv: %d\n", ret);
174 break;
175 } while (1);
176
177 return 1;
178 }
179
do_recv(struct io_uring * ring,struct recv_data * rd)180 static int do_recv(struct io_uring *ring, struct recv_data *rd)
181 {
182 struct io_uring_cqe *cqe;
183 void *verify_ptr;
184 int verify_sz = 0;
185 int verify_bid = 0;
186 int bid;
187
188 verify_ptr = malloc(rd->recv_bytes);
189
190 do {
191 if (recv_get_cqe(ring, rd, &cqe))
192 break;
193 if (cqe->res == -EINVAL) {
194 fprintf(stdout, "recv not supported, skipping\n");
195 return 0;
196 }
197 if (cqe->res < 0) {
198 fprintf(stderr, "failed recv cqe: %d\n", cqe->res);
199 goto err;
200 }
201 if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
202 fprintf(stderr, "no buffer set in recv\n");
203 goto err;
204 }
205 if (!(cqe->flags & IORING_CQE_F_BUF_MORE)) {
206 fprintf(stderr, "CQE_F_BUF_MORE not set\n");
207 goto err;
208 }
209 bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
210 if (bid != 0) {
211 fprintf(stderr, "got bid %d\n", bid);
212 goto err;
213 }
214 if (!(verify_sz % MSG_SIZE)) {
215 if (!verify_seq(rd, verify_ptr, verify_sz, verify_bid))
216 goto err;
217 verify_bid += verify_sz / MSG_SIZE;
218 verify_bid &= RECV_BID_MASK;
219 verify_sz = 0;
220 } else {
221 memcpy(verify_ptr + verify_sz, rd->recv_buf + (bid * MSG_SIZE), cqe->res);
222 verify_sz += cqe->res;
223 }
224 rd->recv_bytes -= cqe->res;
225 io_uring_cqe_seen(ring, cqe);
226 if (!(cqe->flags & IORING_CQE_F_MORE) && rd->recv_bytes) {
227 if (arm_recv(ring, rd))
228 goto err;
229 }
230 } while (rd->recv_bytes);
231
232 if (verify_sz && !(verify_sz % MSG_SIZE) &&
233 !verify_seq(rd, verify_ptr, verify_sz, verify_bid))
234 goto err;
235
236 pthread_barrier_wait(&rd->finish);
237 return 0;
238 err:
239 pthread_barrier_wait(&rd->finish);
240 return 1;
241 }
242
recv_fn(void * data)243 static void *recv_fn(void *data)
244 {
245 struct recv_data *rd = data;
246 struct io_uring_params p = { };
247 struct io_uring ring;
248 struct io_uring_buf_ring *br;
249 void *buf, *ptr;
250 int ret, sock;
251
252 p.cq_entries = 4096;
253 p.flags = IORING_SETUP_CQSIZE;
254 io_uring_queue_init_params(16, &ring, &p);
255
256 ret = 0;
257 if (posix_memalign(&buf, 4096, MSG_SIZE * RECV_BIDS))
258 goto err;
259
260 br = io_uring_setup_buf_ring(&ring, RECV_BIDS, RECV_BGID, IOU_PBUF_RING_INC, &ret);
261 if (!br) {
262 fprintf(stderr, "failed setting up recv ring %d\n", ret);
263 goto err;
264 }
265
266 ptr = buf;
267 io_uring_buf_ring_add(br, ptr, MSG_SIZE * RECV_BIDS, 0, RECV_BID_MASK, 0);
268 io_uring_buf_ring_advance(br, 1);
269 rd->recv_buf = buf;
270
271 ret = recv_prep(&ring, rd, &sock);
272 if (ret) {
273 fprintf(stderr, "recv_prep failed: %d\n", ret);
274 goto err;
275 }
276
277 ret = do_recv(&ring, rd);
278
279 close(sock);
280 close(rd->accept_fd);
281 free(buf);
282 io_uring_queue_exit(&ring);
283 err:
284 return (void *)(intptr_t)ret;
285 }
286
__do_send_bundle(struct recv_data * rd,struct io_uring * ring,int sockfd)287 static int __do_send_bundle(struct recv_data *rd, struct io_uring *ring, int sockfd)
288 {
289 struct io_uring_cqe *cqe;
290 struct io_uring_sqe *sqe;
291 int bytes_needed = MSG_SIZE * nr_msgs;
292 int i, ret;
293
294 sqe = io_uring_get_sqe(ring);
295 io_uring_prep_send_bundle(sqe, sockfd, 0, 0);
296 sqe->flags |= IOSQE_BUFFER_SELECT;
297 sqe->buf_group = SEND_BGID;
298 sqe->user_data = 1;
299
300 ret = io_uring_submit(ring);
301 if (ret != 1)
302 return 1;
303
304 pthread_barrier_wait(&rd->barrier);
305
306 for (i = 0; i < nr_msgs; i++) {
307 ret = io_uring_wait_cqe(ring, &cqe);
308 if (ret) {
309 fprintf(stderr, "wait send: %d\n", ret);
310 return 1;
311 }
312 if (!i && cqe->res == -EINVAL) {
313 rd->abort = 1;
314 no_send_mshot = 1;
315 break;
316 }
317 if (cqe->res < 0) {
318 fprintf(stderr, "bad send cqe res: %d\n", cqe->res);
319 return 1;
320 }
321 bytes_needed -= cqe->res;
322 if (!bytes_needed) {
323 io_uring_cqe_seen(ring, cqe);
324 break;
325 }
326 if (!(cqe->flags & IORING_CQE_F_MORE)) {
327 fprintf(stderr, "expected more, but MORE not set\n");
328 return 1;
329 }
330 io_uring_cqe_seen(ring, cqe);
331 }
332
333 return 0;
334 }
335
__do_send(struct recv_data * rd,struct io_uring * ring,int sockfd)336 static int __do_send(struct recv_data *rd, struct io_uring *ring, int sockfd)
337 {
338 struct io_uring_cqe *cqe;
339 struct io_uring_sqe *sqe;
340 int bytes_needed = MSG_SIZE * nr_msgs;
341 int i, ret;
342
343 for (i = 0; i < nr_msgs; i++) {
344 sqe = io_uring_get_sqe(ring);
345 io_uring_prep_send(sqe, sockfd, NULL, 0, 0);
346 sqe->user_data = 10 + i;
347 sqe->flags |= IOSQE_BUFFER_SELECT;
348 sqe->buf_group = SEND_BGID;
349
350 ret = io_uring_submit(ring);
351 if (ret != 1)
352 return 1;
353
354 if (!i)
355 pthread_barrier_wait(&rd->barrier);
356 ret = io_uring_wait_cqe(ring, &cqe);
357 if (ret) {
358 fprintf(stderr, "send wait cqe %d\n", ret);
359 return 1;
360 }
361
362 if (!i && cqe->res == -EINVAL) {
363 rd->abort = 1;
364 no_send_mshot = 1;
365 break;
366 }
367 if (cqe->res != MSG_SIZE) {
368 fprintf(stderr, "send failed cqe: %d\n", cqe->res);
369 return 1;
370 }
371 if (cqe->res < 0) {
372 fprintf(stderr, "bad send cqe res: %d\n", cqe->res);
373 return 1;
374 }
375 bytes_needed -= cqe->res;
376 io_uring_cqe_seen(ring, cqe);
377 if (!bytes_needed)
378 break;
379 }
380
381 return 0;
382 }
383
do_send(struct recv_data * rd)384 static int do_send(struct recv_data *rd)
385 {
386 struct sockaddr_in saddr;
387 struct io_uring ring;
388 unsigned long seq_buf[SEQ_SIZE], send_seq;
389 struct io_uring_params p = { };
390 struct io_uring_buf_ring *br;
391 int sockfd, ret, len, i;
392 socklen_t optlen;
393 void *buf, *ptr;
394
395 ret = io_uring_queue_init_params(16, &ring, &p);
396 if (ret) {
397 fprintf(stderr, "queue init failed: %d\n", ret);
398 return 1;
399 }
400 if (!(p.features & IORING_FEAT_RECVSEND_BUNDLE)) {
401 no_send_mshot = 1;
402 return 0;
403 }
404
405 if (posix_memalign(&buf, 4096, MSG_SIZE * nr_msgs))
406 return 1;
407
408 br = io_uring_setup_buf_ring(&ring, nr_msgs, SEND_BGID, 0, &ret);
409 if (!br) {
410 if (ret == -EINVAL) {
411 fprintf(stderr, "einval on br setup\n");
412 return 0;
413 }
414 fprintf(stderr, "failed setting up send ring %d\n", ret);
415 return 1;
416 }
417
418 ptr = buf;
419 for (i = 0; i < nr_msgs; i++) {
420 io_uring_buf_ring_add(br, ptr, MSG_SIZE, i, nr_msgs - 1, i);
421 ptr += MSG_SIZE;
422 }
423 io_uring_buf_ring_advance(br, nr_msgs);
424
425 memset(&saddr, 0, sizeof(saddr));
426 saddr.sin_family = AF_INET;
427 saddr.sin_port = htons(use_port);
428 inet_pton(AF_INET, HOST, &saddr.sin_addr);
429
430 sockfd = socket(AF_INET, SOCK_STREAM, 0);
431 if (sockfd < 0) {
432 perror("socket");
433 goto err2;
434 }
435
436 pthread_barrier_wait(&rd->connect);
437
438 ret = connect(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
439 if (ret < 0) {
440 perror("connect");
441 goto err;
442 }
443
444 pthread_barrier_wait(&rd->startup);
445
446 optlen = sizeof(len);
447 len = 1024 * MSG_SIZE;
448 setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &len, optlen);
449
450 /* almost fill queue, leave room for one message */
451 send_seq = 0;
452 rd->to_eagain = 0;
453 while (rd->max_sends && rd->max_sends--) {
454 for (i = 0; i < SEQ_SIZE; i++)
455 seq_buf[i] = send_seq++;
456
457 ret = send(sockfd, seq_buf, sizeof(seq_buf), MSG_DONTWAIT);
458 if (ret < 0) {
459 if (errno == EAGAIN) {
460 send_seq -= SEQ_SIZE;
461 break;
462 }
463 perror("send");
464 return 1;
465 } else if (ret != sizeof(seq_buf)) {
466 fprintf(stderr, "short %d send\n", ret);
467 return 1;
468 }
469
470 rd->to_eagain++;
471 rd->recv_bytes += sizeof(seq_buf);
472 }
473
474 ptr = buf;
475 for (i = 0; i < nr_msgs; i++) {
476 unsigned long *pseq = ptr;
477 int j;
478
479 for (j = 0; j < SEQ_SIZE; j++)
480 pseq[j] = send_seq++;
481 ptr += MSG_SIZE;
482 }
483
484 /* prepare more messages, sending with bundle */
485 rd->recv_bytes += (nr_msgs * MSG_SIZE);
486 if (rd->send_bundle)
487 ret = __do_send_bundle(rd, &ring, sockfd);
488 else
489 ret = __do_send(rd, &ring, sockfd);
490 if (ret)
491 goto err;
492
493 pthread_barrier_wait(&rd->finish);
494
495 close(sockfd);
496 free(buf);
497 io_uring_queue_exit(&ring);
498 return 0;
499
500 err:
501 close(sockfd);
502 err2:
503 io_uring_queue_exit(&ring);
504 pthread_barrier_wait(&rd->finish);
505 return 1;
506 }
507
test(int backlog,unsigned int max_sends,int * to_eagain,int send_bundle,int recv_bundle)508 static int test(int backlog, unsigned int max_sends, int *to_eagain,
509 int send_bundle, int recv_bundle)
510 {
511 pthread_t recv_thread;
512 struct recv_data rd;
513 int ret;
514 void *retval;
515
516 memset(&rd, 0, sizeof(rd));
517 pthread_barrier_init(&rd.connect, NULL, 2);
518 pthread_barrier_init(&rd.startup, NULL, 2);
519 pthread_barrier_init(&rd.barrier, NULL, 2);
520 pthread_barrier_init(&rd.finish, NULL, 2);
521 rd.max_sends = max_sends;
522 if (to_eagain)
523 *to_eagain = 0;
524
525 rd.send_bundle = send_bundle;
526 rd.recv_bundle = recv_bundle;
527
528 ret = pthread_create(&recv_thread, NULL, recv_fn, &rd);
529 if (ret) {
530 fprintf(stderr, "Thread create failed: %d\n", ret);
531 return 1;
532 }
533
534 ret = do_send(&rd);
535 if (no_send_mshot)
536 return 0;
537
538 if (ret)
539 return ret;
540
541 pthread_join(recv_thread, &retval);
542 if (to_eagain)
543 *to_eagain = rd.to_eagain;
544 return (intptr_t)retval;
545 }
546
run_tests(void)547 static int run_tests(void)
548 {
549 int ret, eagain_hit;
550
551 nr_msgs = NR_MIN_MSGS;
552
553 /* test basic send bundle first */
554 ret = test(0, 0, NULL, 0, 0);
555 if (ret) {
556 fprintf(stderr, "test a failed\n");
557 return T_EXIT_FAIL;
558 }
559 if (no_send_mshot)
560 return T_EXIT_SKIP;
561
562 /* test recv bundle */
563 ret = test(0, 0, NULL, 0, 1);
564 if (ret) {
565 fprintf(stderr, "test b failed\n");
566 return T_EXIT_FAIL;
567 }
568
569 /* test bundling recv and send */
570 ret = test(0, 0, NULL, 1, 1);
571 if (ret) {
572 fprintf(stderr, "test c failed\n");
573 return T_EXIT_FAIL;
574 }
575
576 /* test bundling with full socket */
577 ret = test(1, 1000000, &eagain_hit, 1, 1);
578 if (ret) {
579 fprintf(stderr, "test d failed\n");
580 return T_EXIT_FAIL;
581 }
582
583 /* test bundling with almost full socket */
584 ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 1, 1);
585 if (ret) {
586 fprintf(stderr, "test e failed\n");
587 return T_EXIT_FAIL;
588 }
589
590 /* test recv bundle with almost full socket */
591 ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 0, 1);
592 if (ret) {
593 fprintf(stderr, "test f failed\n");
594 return T_EXIT_FAIL;
595 }
596
597 /* test send bundle with almost full socket */
598 ret = test(1, eagain_hit - (nr_msgs / 2), &eagain_hit, 1, 0);
599 if (ret) {
600 fprintf(stderr, "test g failed\n");
601 return T_EXIT_FAIL;
602 }
603
604 /* now repeat the last three tests, but with > FAST_UIOV segments */
605 nr_msgs = NR_MAX_MSGS;
606
607 /* test bundling with almost full socket */
608 ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 1, 1);
609 if (ret) {
610 fprintf(stderr, "test h failed\n");
611 return T_EXIT_FAIL;
612 }
613
614 /* test recv bundle with almost full socket */
615 ret = test(1, eagain_hit - (nr_msgs / 2), NULL, 0, 1);
616 if (ret) {
617 fprintf(stderr, "test i failed\n");
618 return T_EXIT_FAIL;
619 }
620
621 /* test send bundle with almost full socket */
622 ret = test(1, eagain_hit - (nr_msgs / 2), &eagain_hit, 1, 0);
623 if (ret) {
624 fprintf(stderr, "test j failed\n");
625 return T_EXIT_FAIL;
626 }
627
628 return T_EXIT_PASS;
629 }
630
test_tcp(void)631 static int test_tcp(void)
632 {
633 int ret;
634
635 ret = run_tests();
636 if (ret == T_EXIT_FAIL)
637 fprintf(stderr, "TCP test case failed\n");
638 return ret;
639 }
640
has_pbuf_ring_inc(void)641 static bool has_pbuf_ring_inc(void)
642 {
643 struct io_uring_buf_ring *br;
644 bool has_pbuf_inc = false;
645 struct io_uring ring;
646 void *buf;
647 int ret;
648
649 ret = io_uring_queue_init(1, &ring, 0);
650 if (ret)
651 return false;
652
653 if (posix_memalign(&buf, 4096, MSG_SIZE * RECV_BIDS))
654 return false;
655
656 br = io_uring_setup_buf_ring(&ring, RECV_BIDS, RECV_BGID, IOU_PBUF_RING_INC, &ret);
657 if (br) {
658 has_pbuf_inc = true;
659 io_uring_unregister_buf_ring(&ring, RECV_BGID);
660 }
661 io_uring_queue_exit(&ring);
662 free(buf);
663 return has_pbuf_inc;
664 }
665
main(int argc,char * argv[])666 int main(int argc, char *argv[])
667 {
668 int ret;
669
670 if (argc > 1)
671 return T_EXIT_SKIP;
672 if (!has_pbuf_ring_inc())
673 return T_EXIT_SKIP;
674
675 ret = test_tcp();
676 if (ret != T_EXIT_PASS)
677 return ret;
678
679 return T_EXIT_PASS;
680 }
681