1 /* SPDX-License-Identifier: MIT */
2 /*
3 * Description: test multishot read (IORING_OP_READ_MULTISHOT) on pipes,
4 * using ring provided buffers
5 *
6 */
7 #include <errno.h>
8 #include <stdio.h>
9 #include <unistd.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <fcntl.h>
13
14 #include "liburing.h"
15 #include "helpers.h"
16
17 #define BUF_SIZE 32
18 #define BUF_SIZE_FIRST 17
19 #define NR_BUFS 64
20 #define BUF_BGID 1
21
22 #define BR_MASK (NR_BUFS - 1)
23
24 #define NR_OVERFLOW (NR_BUFS / 4)
25
26 static int no_buf_ring, no_read_mshot, no_buf_ring_inc;
27
arm_read(struct io_uring * ring,int fd,int use_mshot)28 static void arm_read(struct io_uring *ring, int fd, int use_mshot)
29 {
30 struct io_uring_sqe *sqe;
31
32 sqe = io_uring_get_sqe(ring);
33 if (use_mshot) {
34 io_uring_prep_read_multishot(sqe, fd, 0, 0, BUF_BGID);
35 } else {
36 io_uring_prep_read(sqe, fd, NULL, 0, 0);
37 sqe->flags = IOSQE_BUFFER_SELECT;
38 sqe->buf_group = BUF_BGID;
39 }
40
41 io_uring_submit(ring);
42 }
43
test_inc(int use_mshot,int flags)44 static int test_inc(int use_mshot, int flags)
45 {
46 struct io_uring_buf_ring *br;
47 struct io_uring_params p = { };
48 struct io_uring_cqe *cqe;
49 struct io_uring ring;
50 int nbytes = 65536;
51 int ret, fds[2], i;
52 char tmp[31];
53 char *buf;
54 void *ptr;
55 int bid = -1;
56 int bid_bytes;
57
58 if (no_buf_ring)
59 return 0;
60
61 p.flags = flags;
62 ret = io_uring_queue_init_params(64, &ring, &p);
63 if (ret) {
64 fprintf(stderr, "ring setup failed: %d\n", ret);
65 return 1;
66 }
67
68 if (pipe(fds) < 0) {
69 perror("pipe");
70 return 1;
71 }
72
73 if (posix_memalign((void **) &buf, 4096, 65536))
74 return 1;
75
76 br = io_uring_setup_buf_ring(&ring, 32, BUF_BGID, IOU_PBUF_RING_INC, &ret);
77 if (!br) {
78 if (ret == -EINVAL) {
79 no_buf_ring_inc = 1;
80 free(buf);
81 return 0;
82 }
83 fprintf(stderr, "Buffer ring register failed %d\n", ret);
84 return 1;
85 }
86
87 ptr = buf;
88 buf = ptr + 65536 - 2048;
89 for (i = 0; i < 32; i++) {
90 io_uring_buf_ring_add(br, buf, 2048, i, 31, i);
91 buf -= 2048;
92 }
93 io_uring_buf_ring_advance(br, 32);
94
95 memset(tmp, 0x5a, sizeof(tmp));
96
97 arm_read(&ring, fds[0], use_mshot);
98
99 bid_bytes = 0;
100 do {
101 int write_size = sizeof(tmp);
102
103 if (write_size > nbytes)
104 write_size = nbytes;
105
106 io_uring_get_events(&ring);
107 ret = io_uring_peek_cqe(&ring, &cqe);
108 if (!ret) {
109 int this_bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
110 if (bid == -1) {
111 bid = this_bid;
112 } else if (bid != this_bid) {
113 if (bid_bytes != 2048) {
114 fprintf(stderr, "unexpected bid bytes %d\n",
115 bid_bytes);
116 return 1;
117 }
118 bid = this_bid;
119 bid_bytes = 0;
120 }
121 bid_bytes += cqe->res;
122 nbytes -= cqe->res;
123 if (!(cqe->flags & IORING_CQE_F_MORE))
124 arm_read(&ring, fds[0], use_mshot);
125 io_uring_cqe_seen(&ring, cqe);
126 if (!nbytes)
127 break;
128 }
129 usleep(1000);
130 ret = write(fds[1], tmp, write_size);
131 if (ret < 0) {
132 perror("write");
133 return 1;
134 } else if (ret != write_size) {
135 printf("short write %d\n", ret);
136 return 1;
137 }
138 } while (nbytes);
139
140 if (bid_bytes) {
141 if (bid_bytes != 2048) {
142 fprintf(stderr, "unexpected bid bytes %d\n", bid_bytes);
143 return 1;
144 }
145 }
146
147 io_uring_free_buf_ring(&ring, br, 32, BUF_BGID);
148 io_uring_queue_exit(&ring);
149 free(ptr);
150 close(fds[0]);
151 close(fds[1]);
152 return 0;
153 }
154
test_clamp(void)155 static int test_clamp(void)
156 {
157 struct io_uring_buf_ring *br;
158 struct io_uring_params p = { };
159 struct io_uring_sqe *sqe;
160 struct io_uring_cqe *cqe;
161 struct io_uring ring;
162 int ret, fds[2], i;
163 char tmp[32];
164 char *buf;
165 void *ptr;
166
167 ret = io_uring_queue_init_params(4, &ring, &p);
168 if (ret) {
169 fprintf(stderr, "ring setup failed: %d\n", ret);
170 return 1;
171 }
172
173 if (pipe(fds) < 0) {
174 perror("pipe");
175 return 1;
176 }
177
178 if (posix_memalign((void **) &buf, 4096, NR_BUFS * BUF_SIZE))
179 return 1;
180
181 br = io_uring_setup_buf_ring(&ring, NR_BUFS, BUF_BGID, 0, &ret);
182 if (!br) {
183 if (ret == -EINVAL) {
184 no_buf_ring = 1;
185 return 0;
186 }
187 fprintf(stderr, "Buffer ring register failed %d\n", ret);
188 return 1;
189 }
190
191 ptr = buf;
192 io_uring_buf_ring_add(br, buf, 16, 1, BR_MASK, 0);
193 buf += 16;
194 io_uring_buf_ring_add(br, buf, 32, 2, BR_MASK, 1);
195 buf += 32;
196 io_uring_buf_ring_add(br, buf, 32, 3, BR_MASK, 2);
197 buf += 32;
198 io_uring_buf_ring_add(br, buf, 32, 4, BR_MASK, 3);
199 buf += 32;
200 io_uring_buf_ring_advance(br, 4);
201
202 memset(tmp, 0xaa, sizeof(tmp));
203
204 sqe = io_uring_get_sqe(&ring);
205 io_uring_prep_read_multishot(sqe, fds[0], 0, 0, BUF_BGID);
206
207 ret = io_uring_submit(&ring);
208 if (ret != 1) {
209 fprintf(stderr, "submit: %d\n", ret);
210 return 1;
211 }
212
213 /* prevent pipe buffer merging */
214 usleep(1000);
215 ret = write(fds[1], tmp, 16);
216
217 usleep(1000);
218 ret = write(fds[1], tmp, sizeof(tmp));
219
220 /* prevent pipe buffer merging */
221 usleep(1000);
222 ret = write(fds[1], tmp, 16);
223
224 usleep(1000);
225 ret = write(fds[1], tmp, sizeof(tmp));
226
227 /*
228 * We should see a 16 byte completion, then a 32 byte, then a 16 byte,
229 * and finally a 32 byte again.
230 */
231 for (i = 0; i < 4; i++) {
232 ret = io_uring_wait_cqe(&ring, &cqe);
233 if (ret) {
234 fprintf(stderr, "wait cqe failed %d\n", ret);
235 return 1;
236 }
237 if (cqe->res < 0) {
238 fprintf(stderr, "cqe res: %d\n", cqe->res);
239 return 1;
240 }
241 if (!(cqe->flags & IORING_CQE_F_MORE)) {
242 fprintf(stderr, "no more cqes\n");
243 return 1;
244 }
245 if (i == 0 || i == 2) {
246 if (cqe->res != 16) {
247 fprintf(stderr, "%d cqe got %d\n", i, cqe->res);
248 return 1;
249 }
250 } else if (i == 1 || i == 3) {
251 if (cqe->res != 32) {
252 fprintf(stderr, "%d cqe got %d\n", i, cqe->res);
253 return 1;
254 }
255 }
256 io_uring_cqe_seen(&ring, cqe);
257 }
258
259 io_uring_free_buf_ring(&ring, br, NR_BUFS, BUF_BGID);
260 io_uring_queue_exit(&ring);
261 free(ptr);
262 return 0;
263 }
264
test(int first_good,int async,int overflow,int incremental)265 static int test(int first_good, int async, int overflow, int incremental)
266 {
267 struct io_uring_buf_ring *br;
268 struct io_uring_params p = { };
269 struct io_uring_sqe *sqe;
270 struct io_uring_cqe *cqe;
271 struct io_uring ring;
272 int ret, fds[2], i, start_msg = 0;
273 int br_flags = 0;
274 char tmp[32];
275 void *ptr[NR_BUFS];
276 char *inc_index;
277
278 p.flags = IORING_SETUP_CQSIZE;
279 if (!overflow)
280 p.cq_entries = NR_BUFS + 1;
281 else
282 p.cq_entries = NR_OVERFLOW;
283 ret = io_uring_queue_init_params(1, &ring, &p);
284 if (ret) {
285 fprintf(stderr, "ring setup failed: %d\n", ret);
286 return 1;
287 }
288
289 if (incremental) {
290 if (no_buf_ring_inc)
291 return 0;
292 br_flags |= IOU_PBUF_RING_INC;
293 }
294
295 br = io_uring_setup_buf_ring(&ring, NR_BUFS, BUF_BGID, br_flags, &ret);
296 if (!br) {
297 if (ret == -EINVAL) {
298 if (incremental) {
299 no_buf_ring_inc = 1;
300 return 0;
301 }
302 no_buf_ring = 1;
303 return 0;
304 }
305 fprintf(stderr, "Buffer ring register failed %d\n", ret);
306 return 1;
307 }
308
309 if (pipe(fds) < 0) {
310 perror("pipe");
311 return 1;
312 }
313
314 if (!incremental) {
315 for (i = 0; i < NR_BUFS; i++) {
316 unsigned size = i <= 1 ? BUF_SIZE_FIRST : BUF_SIZE;
317 ptr[i] = malloc(size);
318 if (!ptr[i])
319 return 1;
320 io_uring_buf_ring_add(br, ptr[i], size, i + 1, BR_MASK, i);
321 }
322 inc_index = NULL;
323 io_uring_buf_ring_advance(br, NR_BUFS);
324 } else {
325 inc_index = ptr[0] = malloc(NR_BUFS * BUF_SIZE);
326 memset(inc_index, 0, NR_BUFS * BUF_SIZE);
327 io_uring_buf_ring_add(br, ptr[0], NR_BUFS * BUF_SIZE, 1, BR_MASK, 0);
328 io_uring_buf_ring_advance(br, 1);
329 }
330
331 if (first_good) {
332 sprintf(tmp, "this is buffer %d\n", start_msg++);
333 ret = write(fds[1], tmp, strlen(tmp));
334 }
335
336 sqe = io_uring_get_sqe(&ring);
337 /* len == 0 means just use the defined provided buffer length */
338 io_uring_prep_read_multishot(sqe, fds[0], 0, 0, BUF_BGID);
339 if (async)
340 sqe->flags |= IOSQE_ASYNC;
341
342 ret = io_uring_submit(&ring);
343 if (ret != 1) {
344 fprintf(stderr, "submit: %d\n", ret);
345 return 1;
346 }
347
348 /* write NR_BUFS + 1, or if first_good is set, NR_BUFS */
349 for (i = 0; i < NR_BUFS + !first_good; i++) {
350 /* prevent pipe buffer merging */
351 usleep(1000);
352 sprintf(tmp, "this is buffer %d\n", i + start_msg);
353 ret = write(fds[1], tmp, strlen(tmp));
354 if (ret != strlen(tmp)) {
355 fprintf(stderr, "write ret %d\n", ret);
356 return 1;
357 }
358 }
359
360 for (i = 0; i < NR_BUFS + 1; i++) {
361 int bid;
362
363 ret = io_uring_wait_cqe(&ring, &cqe);
364 if (ret) {
365 fprintf(stderr, "wait cqe failed %d\n", ret);
366 return 1;
367 }
368 if (cqe->res < 0) {
369 /* expected failure as we try to read one too many */
370 if (cqe->res == -ENOBUFS && i == NR_BUFS)
371 break;
372 if (!i && cqe->res == -EINVAL) {
373 no_read_mshot = 1;
374 break;
375 }
376 fprintf(stderr, "%d: cqe res %d\n", i, cqe->res);
377 return 1;
378 } else if (i > 9 && cqe->res <= 17) {
379 fprintf(stderr, "truncated message %d %d\n", i, cqe->res);
380 return 1;
381 }
382
383 if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
384 fprintf(stderr, "no buffer selected\n");
385 return 1;
386 }
387 bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
388 if (incremental && bid != 1) {
389 fprintf(stderr, "bid %d for incremental\n", bid);
390 return 1;
391 }
392 if (incremental && !first_good) {
393 char out_buf[64];
394 sprintf(out_buf, "this is buffer %d\n", i + start_msg);
395 if (strncmp(inc_index, out_buf, strlen(out_buf)))
396 return 1;
397 inc_index += cqe->res;
398 }
399 if (!(cqe->flags & IORING_CQE_F_MORE)) {
400 /* we expect this on overflow */
401 if (overflow && i >= NR_OVERFLOW)
402 break;
403 fprintf(stderr, "no more cqes\n");
404 return 1;
405 }
406 /* should've overflown! */
407 if (overflow && i > NR_OVERFLOW) {
408 fprintf(stderr, "Expected overflow!\n");
409 return 1;
410 }
411 io_uring_cqe_seen(&ring, cqe);
412 }
413
414
415 io_uring_free_buf_ring(&ring, br, NR_BUFS, BUF_BGID);
416 io_uring_queue_exit(&ring);
417 if (incremental) {
418 free(ptr[0]);
419 } else {
420 for (i = 0; i < NR_BUFS; i++)
421 free(ptr[i]);
422 }
423 return 0;
424 }
425
test_invalid(int async)426 static int test_invalid(int async)
427 {
428 struct io_uring_buf_ring *br;
429 struct io_uring_params p = { };
430 struct io_uring_sqe *sqe;
431 struct io_uring_cqe *cqe;
432 struct io_uring ring;
433 char fname[32] = ".mshot.%d.XXXXXX";
434 int ret, fd;
435 char *buf;
436
437 p.flags = IORING_SETUP_CQSIZE;
438 p.cq_entries = NR_BUFS;
439 ret = io_uring_queue_init_params(1, &ring, &p);
440 if (ret) {
441 fprintf(stderr, "ring setup failed: %d\n", ret);
442 return 1;
443 }
444
445 fd = mkstemp(fname);
446 if (fd < 0) {
447 perror("mkstemp");
448 return 1;
449 }
450 unlink(fname);
451
452 if (posix_memalign((void **) &buf, 4096, BUF_SIZE))
453 return 1;
454
455 br = io_uring_setup_buf_ring(&ring, 1, BUF_BGID, 0, &ret);
456 if (!br) {
457 fprintf(stderr, "Buffer ring register failed %d\n", ret);
458 return 1;
459 }
460
461 io_uring_buf_ring_add(br, buf, BUF_SIZE, 1, BR_MASK, 0);
462 io_uring_buf_ring_advance(br, 1);
463
464 sqe = io_uring_get_sqe(&ring);
465 /* len == 0 means just use the defined provided buffer length */
466 io_uring_prep_read_multishot(sqe, fd, 0, 0, BUF_BGID);
467 if (async)
468 sqe->flags |= IOSQE_ASYNC;
469
470 ret = io_uring_submit(&ring);
471 if (ret != 1) {
472 fprintf(stderr, "submit: %d\n", ret);
473 return 1;
474 }
475
476 ret = io_uring_wait_cqe(&ring, &cqe);
477 if (ret) {
478 fprintf(stderr, "wait cqe failed %d\n", ret);
479 return 1;
480 }
481 if (cqe->flags & IORING_CQE_F_MORE) {
482 fprintf(stderr, "MORE flag set unexpected %d\n", cqe->flags);
483 return 1;
484 }
485 if (cqe->res != -EBADFD) {
486 fprintf(stderr, "Got cqe res %d, wanted -EBADFD\n", cqe->res);
487 return 1;
488 }
489
490 io_uring_cqe_seen(&ring, cqe);
491 io_uring_free_buf_ring(&ring, br, 1, BUF_BGID);
492 io_uring_queue_exit(&ring);
493 free(buf);
494 return 0;
495 }
496
main(int argc,char * argv[])497 int main(int argc, char *argv[])
498 {
499 int ret;
500
501 if (argc > 1)
502 return T_EXIT_SKIP;
503
504 ret = test(0, 0, 0, 0);
505 if (ret) {
506 fprintf(stderr, "test 0 0 0 failed\n");
507 return T_EXIT_FAIL;
508 }
509 if (no_buf_ring || no_read_mshot) {
510 printf("skip\n");
511 return T_EXIT_SKIP;
512 }
513
514 ret = test(0, 1, 0, 0);
515 if (ret) {
516 fprintf(stderr, "test 0 1 0, failed\n");
517 return T_EXIT_FAIL;
518 }
519
520 ret = test(1, 0, 0, 0);
521 if (ret) {
522 fprintf(stderr, "test 1 0 0 failed\n");
523 return T_EXIT_FAIL;
524 }
525
526 ret = test(0, 0, 1, 0);
527 if (ret) {
528 fprintf(stderr, "test 0 0 1 failed\n");
529 return T_EXIT_FAIL;
530 }
531
532 ret = test(0, 1, 1, 0);
533 if (ret) {
534 fprintf(stderr, "test 0 1 1 failed\n");
535 return T_EXIT_FAIL;
536 }
537
538 ret = test(1, 0, 1, 0);
539 if (ret) {
540 fprintf(stderr, "test 1 0 1, failed\n");
541 return T_EXIT_FAIL;
542 }
543
544 ret = test(1, 0, 1, 0);
545 if (ret) {
546 fprintf(stderr, "test 1 0 1 failed\n");
547 return T_EXIT_FAIL;
548 }
549
550 ret = test(1, 1, 1, 0);
551 if (ret) {
552 fprintf(stderr, "test 1 1 1 failed\n");
553 return T_EXIT_FAIL;
554 }
555
556 ret = test(0, 0, 0, 1);
557 if (ret) {
558 fprintf(stderr, "test 0 0 0 1 failed\n");
559 return T_EXIT_FAIL;
560 }
561
562 ret = test(0, 0, 1, 1);
563 if (ret) {
564 fprintf(stderr, "test 0 0 1 1 failed\n");
565 return T_EXIT_FAIL;
566 }
567
568 ret = test(0, 1, 0, 1);
569 if (ret) {
570 fprintf(stderr, "test 0 1 0 1 failed\n");
571 return T_EXIT_FAIL;
572 }
573
574 ret = test(0, 1, 1, 1);
575 if (ret) {
576 fprintf(stderr, "test 0 1 1 1 failed\n");
577 return T_EXIT_FAIL;
578 }
579
580 ret = test(1, 0, 0, 1);
581 if (ret) {
582 fprintf(stderr, "test 1 0 0 1 failed\n");
583 return T_EXIT_FAIL;
584 }
585
586 ret = test(1, 0, 1, 1);
587 if (ret) {
588 fprintf(stderr, "test 1 0 1 1 failed\n");
589 return T_EXIT_FAIL;
590 }
591
592 ret = test(1, 1, 0, 1);
593 if (ret) {
594 fprintf(stderr, "test 1 1 0 1 failed\n");
595 return T_EXIT_FAIL;
596 }
597
598 ret = test(1, 1, 1, 1);
599 if (ret) {
600 fprintf(stderr, "test 1 1 1 1 failed\n");
601 return T_EXIT_FAIL;
602 }
603
604 ret = test_invalid(0);
605 if (ret) {
606 fprintf(stderr, "test_invalid 0 failed\n");
607 return T_EXIT_FAIL;
608 }
609
610 ret = test_invalid(1);
611 if (ret) {
612 fprintf(stderr, "test_invalid 1 failed\n");
613 return T_EXIT_FAIL;
614 }
615
616 ret = test_clamp();
617 if (ret) {
618 fprintf(stderr, "test_clamp failed\n");
619 return T_EXIT_FAIL;
620 }
621
622 ret = test_inc(0, 0);
623 if (ret) {
624 fprintf(stderr, "test_inc 0 0 failed\n");
625 return T_EXIT_FAIL;
626 }
627
628 ret = test_inc(0, IORING_SETUP_SQPOLL);
629 if (ret) {
630 fprintf(stderr, "test_inc 0 sqpoll failed\n");
631 return T_EXIT_FAIL;
632 }
633
634 ret = test_inc(0, IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN);
635 if (ret) {
636 fprintf(stderr, "test_inc 0 defer failed\n");
637 return T_EXIT_FAIL;
638 }
639
640 ret = test_inc(1, 0);
641 if (ret) {
642 fprintf(stderr, "test_inc 1 0 failed\n");
643 return T_EXIT_FAIL;
644 }
645
646 ret = test_inc(1, IORING_SETUP_SQPOLL);
647 if (ret) {
648 fprintf(stderr, "test_inc 1 sqpoll failed\n");
649 return T_EXIT_FAIL;
650 }
651
652 ret = test_inc(1, IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN);
653 if (ret) {
654 fprintf(stderr, "test_inc 1 defer failed\n");
655 return T_EXIT_FAIL;
656 }
657
658 return T_EXIT_PASS;
659 }
660