• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* SPDX-License-Identifier: MIT */
2 /*
3  * Description: test multishot read (IORING_OP_READ_MULTISHOT) on pipes,
4  *		using ring provided buffers
5  *
6  */
7 #include <errno.h>
8 #include <stdio.h>
9 #include <unistd.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <fcntl.h>
13 
14 #include "liburing.h"
15 #include "helpers.h"
16 
17 #define BUF_SIZE	32
18 #define BUF_SIZE_FIRST	17
19 #define NR_BUFS		64
20 #define BUF_BGID	1
21 
22 #define BR_MASK		(NR_BUFS - 1)
23 
24 #define NR_OVERFLOW	(NR_BUFS / 4)
25 
26 static int no_buf_ring, no_read_mshot, no_buf_ring_inc;
27 
arm_read(struct io_uring * ring,int fd,int use_mshot)28 static void arm_read(struct io_uring *ring, int fd, int use_mshot)
29 {
30 	struct io_uring_sqe *sqe;
31 
32 	sqe = io_uring_get_sqe(ring);
33 	if (use_mshot) {
34 		io_uring_prep_read_multishot(sqe, fd, 0, 0, BUF_BGID);
35 	} else {
36 		io_uring_prep_read(sqe, fd, NULL, 0, 0);
37 		sqe->flags = IOSQE_BUFFER_SELECT;
38 		sqe->buf_group = BUF_BGID;
39 	}
40 
41 	io_uring_submit(ring);
42 }
43 
test_inc(int use_mshot,int flags)44 static int test_inc(int use_mshot, int flags)
45 {
46 	struct io_uring_buf_ring *br;
47 	struct io_uring_params p = { };
48 	struct io_uring_cqe *cqe;
49 	struct io_uring ring;
50 	int nbytes = 65536;
51 	int ret, fds[2], i;
52 	char tmp[31];
53 	char *buf;
54 	void *ptr;
55 	int bid = -1;
56 	int bid_bytes;
57 
58 	if (no_buf_ring)
59 		return 0;
60 
61 	p.flags = flags;
62 	ret = io_uring_queue_init_params(64, &ring, &p);
63 	if (ret) {
64 		fprintf(stderr, "ring setup failed: %d\n", ret);
65 		return 1;
66 	}
67 
68 	if (pipe(fds) < 0) {
69 		perror("pipe");
70 		return 1;
71 	}
72 
73 	if (posix_memalign((void **) &buf, 4096, 65536))
74 		return 1;
75 
76 	br = io_uring_setup_buf_ring(&ring, 32, BUF_BGID, IOU_PBUF_RING_INC, &ret);
77 	if (!br) {
78 		if (ret == -EINVAL) {
79 			no_buf_ring_inc = 1;
80 			free(buf);
81 			return 0;
82 		}
83 		fprintf(stderr, "Buffer ring register failed %d\n", ret);
84 		return 1;
85 	}
86 
87 	ptr = buf;
88 	buf = ptr + 65536 - 2048;
89 	for (i = 0; i < 32; i++) {
90 		io_uring_buf_ring_add(br, buf, 2048, i, 31, i);
91 		buf -= 2048;
92 	}
93 	io_uring_buf_ring_advance(br, 32);
94 
95 	memset(tmp, 0x5a, sizeof(tmp));
96 
97 	arm_read(&ring, fds[0], use_mshot);
98 
99 	bid_bytes = 0;
100 	do {
101 		int write_size = sizeof(tmp);
102 
103 		if (write_size > nbytes)
104 			write_size = nbytes;
105 
106 		io_uring_get_events(&ring);
107 		ret = io_uring_peek_cqe(&ring, &cqe);
108 		if (!ret) {
109 			int this_bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
110 			if (bid == -1) {
111 				bid = this_bid;
112 			} else if (bid != this_bid) {
113 				if (bid_bytes != 2048) {
114 					fprintf(stderr, "unexpected bid bytes %d\n",
115 						bid_bytes);
116 					return 1;
117 				}
118 				bid = this_bid;
119 				bid_bytes = 0;
120 			}
121 			bid_bytes += cqe->res;
122 			nbytes -= cqe->res;
123 			if (!(cqe->flags & IORING_CQE_F_MORE))
124 				arm_read(&ring, fds[0], use_mshot);
125 			io_uring_cqe_seen(&ring, cqe);
126 			if (!nbytes)
127 				break;
128 		}
129 		usleep(1000);
130 		ret = write(fds[1], tmp, write_size);
131 		if (ret < 0) {
132 			perror("write");
133 			return 1;
134 		} else if (ret != write_size) {
135 			printf("short write %d\n", ret);
136 			return 1;
137 		}
138 	} while (nbytes);
139 
140 	if (bid_bytes) {
141 		if (bid_bytes != 2048) {
142 			fprintf(stderr, "unexpected bid bytes %d\n", bid_bytes);
143 			return 1;
144 		}
145 	}
146 
147 	io_uring_free_buf_ring(&ring, br, 32, BUF_BGID);
148 	io_uring_queue_exit(&ring);
149 	free(ptr);
150 	close(fds[0]);
151 	close(fds[1]);
152 	return 0;
153 }
154 
test_clamp(void)155 static int test_clamp(void)
156 {
157 	struct io_uring_buf_ring *br;
158 	struct io_uring_params p = { };
159 	struct io_uring_sqe *sqe;
160 	struct io_uring_cqe *cqe;
161 	struct io_uring ring;
162 	int ret, fds[2], i;
163 	char tmp[32];
164 	char *buf;
165 	void *ptr;
166 
167 	ret = io_uring_queue_init_params(4, &ring, &p);
168 	if (ret) {
169 		fprintf(stderr, "ring setup failed: %d\n", ret);
170 		return 1;
171 	}
172 
173 	if (pipe(fds) < 0) {
174 		perror("pipe");
175 		return 1;
176 	}
177 
178 	if (posix_memalign((void **) &buf, 4096, NR_BUFS * BUF_SIZE))
179 		return 1;
180 
181 	br = io_uring_setup_buf_ring(&ring, NR_BUFS, BUF_BGID, 0, &ret);
182 	if (!br) {
183 		if (ret == -EINVAL) {
184 			no_buf_ring = 1;
185 			return 0;
186 		}
187 		fprintf(stderr, "Buffer ring register failed %d\n", ret);
188 		return 1;
189 	}
190 
191 	ptr = buf;
192 	io_uring_buf_ring_add(br, buf, 16, 1, BR_MASK, 0);
193 	buf += 16;
194 	io_uring_buf_ring_add(br, buf, 32, 2, BR_MASK, 1);
195 	buf += 32;
196 	io_uring_buf_ring_add(br, buf, 32, 3, BR_MASK, 2);
197 	buf += 32;
198 	io_uring_buf_ring_add(br, buf, 32, 4, BR_MASK, 3);
199 	buf += 32;
200 	io_uring_buf_ring_advance(br, 4);
201 
202 	memset(tmp, 0xaa, sizeof(tmp));
203 
204 	sqe = io_uring_get_sqe(&ring);
205 	io_uring_prep_read_multishot(sqe, fds[0], 0, 0, BUF_BGID);
206 
207 	ret = io_uring_submit(&ring);
208 	if (ret != 1) {
209 		fprintf(stderr, "submit: %d\n", ret);
210 		return 1;
211 	}
212 
213 	/* prevent pipe buffer merging */
214 	usleep(1000);
215 	ret = write(fds[1], tmp, 16);
216 
217 	usleep(1000);
218 	ret = write(fds[1], tmp, sizeof(tmp));
219 
220 	/* prevent pipe buffer merging */
221 	usleep(1000);
222 	ret = write(fds[1], tmp, 16);
223 
224 	usleep(1000);
225 	ret = write(fds[1], tmp, sizeof(tmp));
226 
227 	/*
228 	 * We should see a 16 byte completion, then a 32 byte, then a 16 byte,
229 	 * and finally a 32 byte again.
230 	 */
231 	for (i = 0; i < 4; i++) {
232 		ret = io_uring_wait_cqe(&ring, &cqe);
233 		if (ret) {
234 			fprintf(stderr, "wait cqe failed %d\n", ret);
235 			return 1;
236 		}
237 		if (cqe->res < 0) {
238 			fprintf(stderr, "cqe res: %d\n", cqe->res);
239 			return 1;
240 		}
241 		if (!(cqe->flags & IORING_CQE_F_MORE)) {
242 			fprintf(stderr, "no more cqes\n");
243 			return 1;
244 		}
245 		if (i == 0 || i == 2) {
246 			if (cqe->res != 16) {
247 				fprintf(stderr, "%d cqe got %d\n", i, cqe->res);
248 				return 1;
249 			}
250 		} else if (i == 1 || i == 3) {
251 			if (cqe->res != 32) {
252 				fprintf(stderr, "%d cqe got %d\n", i, cqe->res);
253 				return 1;
254 			}
255 		}
256 		io_uring_cqe_seen(&ring, cqe);
257 	}
258 
259 	io_uring_free_buf_ring(&ring, br, NR_BUFS, BUF_BGID);
260 	io_uring_queue_exit(&ring);
261 	free(ptr);
262 	return 0;
263 }
264 
test(int first_good,int async,int overflow,int incremental)265 static int test(int first_good, int async, int overflow, int incremental)
266 {
267 	struct io_uring_buf_ring *br;
268 	struct io_uring_params p = { };
269 	struct io_uring_sqe *sqe;
270 	struct io_uring_cqe *cqe;
271 	struct io_uring ring;
272 	int ret, fds[2], i, start_msg = 0;
273 	int br_flags = 0;
274 	char tmp[32];
275 	void *ptr[NR_BUFS];
276 	char *inc_index;
277 
278 	p.flags = IORING_SETUP_CQSIZE;
279 	if (!overflow)
280 		p.cq_entries = NR_BUFS + 1;
281 	else
282 		p.cq_entries = NR_OVERFLOW;
283 	ret = io_uring_queue_init_params(1, &ring, &p);
284 	if (ret) {
285 		fprintf(stderr, "ring setup failed: %d\n", ret);
286 		return 1;
287 	}
288 
289 	if (incremental) {
290 		if (no_buf_ring_inc)
291 			return 0;
292 		br_flags |= IOU_PBUF_RING_INC;
293 	}
294 
295 	br = io_uring_setup_buf_ring(&ring, NR_BUFS, BUF_BGID, br_flags, &ret);
296 	if (!br) {
297 		if (ret == -EINVAL) {
298 			if (incremental) {
299 				no_buf_ring_inc = 1;
300 				return 0;
301 			}
302 			no_buf_ring = 1;
303 			return 0;
304 		}
305 		fprintf(stderr, "Buffer ring register failed %d\n", ret);
306 		return 1;
307 	}
308 
309 	if (pipe(fds) < 0) {
310 		perror("pipe");
311 		return 1;
312 	}
313 
314 	if (!incremental) {
315 		for (i = 0; i < NR_BUFS; i++) {
316 			unsigned size = i <= 1 ? BUF_SIZE_FIRST : BUF_SIZE;
317 			ptr[i] = malloc(size);
318 			if (!ptr[i])
319 				return 1;
320 			io_uring_buf_ring_add(br, ptr[i], size, i + 1, BR_MASK, i);
321 		}
322 		inc_index = NULL;
323 		io_uring_buf_ring_advance(br, NR_BUFS);
324 	} else {
325 		inc_index = ptr[0] = malloc(NR_BUFS * BUF_SIZE);
326 		memset(inc_index, 0, NR_BUFS * BUF_SIZE);
327 		io_uring_buf_ring_add(br, ptr[0], NR_BUFS * BUF_SIZE, 1, BR_MASK, 0);
328 		io_uring_buf_ring_advance(br, 1);
329 	}
330 
331 	if (first_good) {
332 		sprintf(tmp, "this is buffer %d\n", start_msg++);
333 		ret = write(fds[1], tmp, strlen(tmp));
334 	}
335 
336 	sqe = io_uring_get_sqe(&ring);
337 	/* len == 0 means just use the defined provided buffer length */
338 	io_uring_prep_read_multishot(sqe, fds[0], 0, 0, BUF_BGID);
339 	if (async)
340 		sqe->flags |= IOSQE_ASYNC;
341 
342 	ret = io_uring_submit(&ring);
343 	if (ret != 1) {
344 		fprintf(stderr, "submit: %d\n", ret);
345 		return 1;
346 	}
347 
348 	/* write NR_BUFS + 1, or if first_good is set, NR_BUFS */
349 	for (i = 0; i < NR_BUFS + !first_good; i++) {
350 		/* prevent pipe buffer merging */
351 		usleep(1000);
352 		sprintf(tmp, "this is buffer %d\n", i + start_msg);
353 		ret = write(fds[1], tmp, strlen(tmp));
354 		if (ret != strlen(tmp)) {
355 			fprintf(stderr, "write ret %d\n", ret);
356 			return 1;
357 		}
358 	}
359 
360 	for (i = 0; i < NR_BUFS + 1; i++) {
361 		int bid;
362 
363 		ret = io_uring_wait_cqe(&ring, &cqe);
364 		if (ret) {
365 			fprintf(stderr, "wait cqe failed %d\n", ret);
366 			return 1;
367 		}
368 		if (cqe->res < 0) {
369 			/* expected failure as we try to read one too many */
370 			if (cqe->res == -ENOBUFS && i == NR_BUFS)
371 				break;
372 			if (!i && cqe->res == -EINVAL) {
373 				no_read_mshot = 1;
374 				break;
375 			}
376 			fprintf(stderr, "%d: cqe res %d\n", i, cqe->res);
377 			return 1;
378 		} else if (i > 9 && cqe->res <= 17) {
379 			fprintf(stderr, "truncated message %d %d\n", i, cqe->res);
380 			return 1;
381 		}
382 
383 		if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
384 			fprintf(stderr, "no buffer selected\n");
385 			return 1;
386 		}
387 		bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
388 		if (incremental && bid != 1) {
389 			fprintf(stderr, "bid %d for incremental\n", bid);
390 			return 1;
391 		}
392 		if (incremental && !first_good) {
393 			char out_buf[64];
394 			sprintf(out_buf, "this is buffer %d\n", i + start_msg);
395 			if (strncmp(inc_index, out_buf, strlen(out_buf)))
396 				return 1;
397 			inc_index += cqe->res;
398 		}
399 		if (!(cqe->flags & IORING_CQE_F_MORE)) {
400 			/* we expect this on overflow */
401 			if (overflow && i >= NR_OVERFLOW)
402 				break;
403 			fprintf(stderr, "no more cqes\n");
404 			return 1;
405 		}
406 		/* should've overflown! */
407 		if (overflow && i > NR_OVERFLOW) {
408 			fprintf(stderr, "Expected overflow!\n");
409 			return 1;
410 		}
411 		io_uring_cqe_seen(&ring, cqe);
412 	}
413 
414 
415 	io_uring_free_buf_ring(&ring, br, NR_BUFS, BUF_BGID);
416 	io_uring_queue_exit(&ring);
417 	if (incremental) {
418 		free(ptr[0]);
419 	} else {
420 		for (i = 0; i < NR_BUFS; i++)
421 			free(ptr[i]);
422 	}
423 	return 0;
424 }
425 
test_invalid(int async)426 static int test_invalid(int async)
427 {
428 	struct io_uring_buf_ring *br;
429 	struct io_uring_params p = { };
430 	struct io_uring_sqe *sqe;
431 	struct io_uring_cqe *cqe;
432 	struct io_uring ring;
433 	char fname[32] = ".mshot.%d.XXXXXX";
434 	int ret, fd;
435 	char *buf;
436 
437 	p.flags = IORING_SETUP_CQSIZE;
438 	p.cq_entries = NR_BUFS;
439 	ret = io_uring_queue_init_params(1, &ring, &p);
440 	if (ret) {
441 		fprintf(stderr, "ring setup failed: %d\n", ret);
442 		return 1;
443 	}
444 
445 	fd = mkstemp(fname);
446 	if (fd < 0) {
447 		perror("mkstemp");
448 		return 1;
449 	}
450 	unlink(fname);
451 
452 	if (posix_memalign((void **) &buf, 4096, BUF_SIZE))
453 		return 1;
454 
455 	br = io_uring_setup_buf_ring(&ring, 1, BUF_BGID, 0, &ret);
456 	if (!br) {
457 		fprintf(stderr, "Buffer ring register failed %d\n", ret);
458 		return 1;
459 	}
460 
461 	io_uring_buf_ring_add(br, buf, BUF_SIZE, 1, BR_MASK, 0);
462 	io_uring_buf_ring_advance(br, 1);
463 
464 	sqe = io_uring_get_sqe(&ring);
465 	/* len == 0 means just use the defined provided buffer length */
466 	io_uring_prep_read_multishot(sqe, fd, 0, 0, BUF_BGID);
467 	if (async)
468 		sqe->flags |= IOSQE_ASYNC;
469 
470 	ret = io_uring_submit(&ring);
471 	if (ret != 1) {
472 		fprintf(stderr, "submit: %d\n", ret);
473 		return 1;
474 	}
475 
476 	ret = io_uring_wait_cqe(&ring, &cqe);
477 	if (ret) {
478 		fprintf(stderr, "wait cqe failed %d\n", ret);
479 		return 1;
480 	}
481 	if (cqe->flags & IORING_CQE_F_MORE) {
482 		fprintf(stderr, "MORE flag set unexpected %d\n", cqe->flags);
483 		return 1;
484 	}
485 	if (cqe->res != -EBADFD) {
486 		fprintf(stderr, "Got cqe res %d, wanted -EBADFD\n", cqe->res);
487 		return 1;
488 	}
489 
490 	io_uring_cqe_seen(&ring, cqe);
491 	io_uring_free_buf_ring(&ring, br, 1, BUF_BGID);
492 	io_uring_queue_exit(&ring);
493 	free(buf);
494 	return 0;
495 }
496 
main(int argc,char * argv[])497 int main(int argc, char *argv[])
498 {
499 	int ret;
500 
501 	if (argc > 1)
502 		return T_EXIT_SKIP;
503 
504 	ret = test(0, 0, 0, 0);
505 	if (ret) {
506 		fprintf(stderr, "test 0 0 0 failed\n");
507 		return T_EXIT_FAIL;
508 	}
509 	if (no_buf_ring || no_read_mshot) {
510 		printf("skip\n");
511 		return T_EXIT_SKIP;
512 	}
513 
514 	ret = test(0, 1, 0, 0);
515 	if (ret) {
516 		fprintf(stderr, "test 0 1 0, failed\n");
517 		return T_EXIT_FAIL;
518 	}
519 
520 	ret = test(1, 0, 0, 0);
521 	if (ret) {
522 		fprintf(stderr, "test 1 0 0 failed\n");
523 		return T_EXIT_FAIL;
524 	}
525 
526 	ret = test(0, 0, 1, 0);
527 	if (ret) {
528 		fprintf(stderr, "test 0 0 1 failed\n");
529 		return T_EXIT_FAIL;
530 	}
531 
532 	ret = test(0, 1, 1, 0);
533 	if (ret) {
534 		fprintf(stderr, "test 0 1 1 failed\n");
535 		return T_EXIT_FAIL;
536 	}
537 
538 	ret = test(1, 0, 1, 0);
539 	if (ret) {
540 		fprintf(stderr, "test 1 0 1, failed\n");
541 		return T_EXIT_FAIL;
542 	}
543 
544 	ret = test(1, 0, 1, 0);
545 	if (ret) {
546 		fprintf(stderr, "test 1 0 1 failed\n");
547 		return T_EXIT_FAIL;
548 	}
549 
550 	ret = test(1, 1, 1, 0);
551 	if (ret) {
552 		fprintf(stderr, "test 1 1 1 failed\n");
553 		return T_EXIT_FAIL;
554 	}
555 
556 	ret = test(0, 0, 0, 1);
557 	if (ret) {
558 		fprintf(stderr, "test 0 0 0 1 failed\n");
559 		return T_EXIT_FAIL;
560 	}
561 
562 	ret = test(0, 0, 1, 1);
563 	if (ret) {
564 		fprintf(stderr, "test 0 0 1 1 failed\n");
565 		return T_EXIT_FAIL;
566 	}
567 
568 	ret = test(0, 1, 0, 1);
569 	if (ret) {
570 		fprintf(stderr, "test 0 1 0 1 failed\n");
571 		return T_EXIT_FAIL;
572 	}
573 
574 	ret = test(0, 1, 1, 1);
575 	if (ret) {
576 		fprintf(stderr, "test 0 1 1 1 failed\n");
577 		return T_EXIT_FAIL;
578 	}
579 
580 	ret = test(1, 0, 0, 1);
581 	if (ret) {
582 		fprintf(stderr, "test 1 0 0 1 failed\n");
583 		return T_EXIT_FAIL;
584 	}
585 
586 	ret = test(1, 0, 1, 1);
587 	if (ret) {
588 		fprintf(stderr, "test 1 0 1 1 failed\n");
589 		return T_EXIT_FAIL;
590 	}
591 
592 	ret = test(1, 1, 0, 1);
593 	if (ret) {
594 		fprintf(stderr, "test 1 1 0 1 failed\n");
595 		return T_EXIT_FAIL;
596 	}
597 
598 	ret = test(1, 1, 1, 1);
599 	if (ret) {
600 		fprintf(stderr, "test 1 1 1 1 failed\n");
601 		return T_EXIT_FAIL;
602 	}
603 
604 	ret = test_invalid(0);
605 	if (ret) {
606 		fprintf(stderr, "test_invalid 0 failed\n");
607 		return T_EXIT_FAIL;
608 	}
609 
610 	ret = test_invalid(1);
611 	if (ret) {
612 		fprintf(stderr, "test_invalid 1 failed\n");
613 		return T_EXIT_FAIL;
614 	}
615 
616 	ret = test_clamp();
617 	if (ret) {
618 		fprintf(stderr, "test_clamp failed\n");
619 		return T_EXIT_FAIL;
620 	}
621 
622 	ret = test_inc(0, 0);
623 	if (ret) {
624 		fprintf(stderr, "test_inc 0 0 failed\n");
625 		return T_EXIT_FAIL;
626 	}
627 
628 	ret = test_inc(0, IORING_SETUP_SQPOLL);
629 	if (ret) {
630 		fprintf(stderr, "test_inc 0 sqpoll failed\n");
631 		return T_EXIT_FAIL;
632 	}
633 
634 	ret = test_inc(0, IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN);
635 	if (ret) {
636 		fprintf(stderr, "test_inc 0 defer failed\n");
637 		return T_EXIT_FAIL;
638 	}
639 
640 	ret = test_inc(1, 0);
641 	if (ret) {
642 		fprintf(stderr, "test_inc 1 0 failed\n");
643 		return T_EXIT_FAIL;
644 	}
645 
646 	ret = test_inc(1, IORING_SETUP_SQPOLL);
647 	if (ret) {
648 		fprintf(stderr, "test_inc 1 sqpoll failed\n");
649 		return T_EXIT_FAIL;
650 	}
651 
652 	ret = test_inc(1, IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN);
653 	if (ret) {
654 		fprintf(stderr, "test_inc 1 defer failed\n");
655 		return T_EXIT_FAIL;
656 	}
657 
658 	return T_EXIT_PASS;
659 }
660