• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* SPDX-License-Identifier: MIT */
2 /*
3  * Description: test multishot read (IORING_OP_READ_MULTISHOT) on pipes,
4  *		using ring provided buffers
5  *
6  */
7 #include <errno.h>
8 #include <stdio.h>
9 #include <unistd.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <fcntl.h>
13 
14 #include "liburing.h"
15 #include "helpers.h"
16 
17 #define BUF_SIZE	32
18 #define BUF_SIZE_FIRST	17
19 #define NR_BUFS		64
20 #define BUF_BGID	1
21 
22 #define BR_MASK		(NR_BUFS - 1)
23 
24 #define NR_OVERFLOW	(NR_BUFS / 4)
25 
26 static int no_buf_ring, no_read_mshot;
27 
test_clamp(void)28 static int test_clamp(void)
29 {
30 	struct io_uring_buf_ring *br;
31 	struct io_uring_params p = { };
32 	struct io_uring_sqe *sqe;
33 	struct io_uring_cqe *cqe;
34 	struct io_uring ring;
35 	int ret, fds[2], i;
36 	char tmp[32];
37 	char *buf;
38 	void *ptr;
39 
40 	ret = io_uring_queue_init_params(4, &ring, &p);
41 	if (ret) {
42 		fprintf(stderr, "ring setup failed: %d\n", ret);
43 		return 1;
44 	}
45 
46 	if (pipe(fds) < 0) {
47 		perror("pipe");
48 		return 1;
49 	}
50 
51 	if (posix_memalign((void **) &buf, 4096, NR_BUFS * BUF_SIZE))
52 		return 1;
53 
54 	br = io_uring_setup_buf_ring(&ring, NR_BUFS, BUF_BGID, 0, &ret);
55 	if (!br) {
56 		if (ret == -EINVAL) {
57 			no_buf_ring = 1;
58 			return 0;
59 		}
60 		fprintf(stderr, "Buffer ring register failed %d\n", ret);
61 		return 1;
62 	}
63 
64 	ptr = buf;
65 	io_uring_buf_ring_add(br, buf, 16, 1, BR_MASK, 0);
66 	buf += 16;
67 	io_uring_buf_ring_add(br, buf, 32, 2, BR_MASK, 1);
68 	buf += 32;
69 	io_uring_buf_ring_add(br, buf, 32, 3, BR_MASK, 2);
70 	buf += 32;
71 	io_uring_buf_ring_add(br, buf, 32, 4, BR_MASK, 3);
72 	buf += 32;
73 	io_uring_buf_ring_advance(br, 4);
74 
75 	memset(tmp, 0xaa, sizeof(tmp));
76 
77 	sqe = io_uring_get_sqe(&ring);
78 	io_uring_prep_read_multishot(sqe, fds[0], 0, 0, BUF_BGID);
79 
80 	ret = io_uring_submit(&ring);
81 	if (ret != 1) {
82 		fprintf(stderr, "submit: %d\n", ret);
83 		return 1;
84 	}
85 
86 	/* prevent pipe buffer merging */
87 	usleep(1000);
88 	ret = write(fds[1], tmp, 16);
89 
90 	usleep(1000);
91 	ret = write(fds[1], tmp, sizeof(tmp));
92 
93 	/* prevent pipe buffer merging */
94 	usleep(1000);
95 	ret = write(fds[1], tmp, 16);
96 
97 	usleep(1000);
98 	ret = write(fds[1], tmp, sizeof(tmp));
99 
100 	/*
101 	 * We should see a 16 byte completion, then a 32 byte, then a 16 byte,
102 	 * and finally a 32 byte again.
103 	 */
104 	for (i = 0; i < 4; i++) {
105 		ret = io_uring_wait_cqe(&ring, &cqe);
106 		if (ret) {
107 			fprintf(stderr, "wait cqe failed %d\n", ret);
108 			return 1;
109 		}
110 		if (cqe->res < 0) {
111 			fprintf(stderr, "cqe res: %d\n", cqe->res);
112 			return 1;
113 		}
114 		if (!(cqe->flags & IORING_CQE_F_MORE)) {
115 			fprintf(stderr, "no more cqes\n");
116 			return 1;
117 		}
118 		if (i == 0 || i == 2) {
119 			if (cqe->res != 16) {
120 				fprintf(stderr, "%d cqe got %d\n", i, cqe->res);
121 				return 1;
122 			}
123 		} else if (i == 1 || i == 3) {
124 			if (cqe->res != 32) {
125 				fprintf(stderr, "%d cqe got %d\n", i, cqe->res);
126 				return 1;
127 			}
128 		}
129 		io_uring_cqe_seen(&ring, cqe);
130 	}
131 
132 	io_uring_queue_exit(&ring);
133 	free(ptr);
134 	return 0;
135 }
136 
test(int first_good,int async,int overflow)137 static int test(int first_good, int async, int overflow)
138 {
139 	struct io_uring_buf_ring *br;
140 	struct io_uring_params p = { };
141 	struct io_uring_sqe *sqe;
142 	struct io_uring_cqe *cqe;
143 	struct io_uring ring;
144 	int ret, fds[2], i;
145 	char tmp[32];
146 	void *ptr[NR_BUFS];
147 
148 	p.flags = IORING_SETUP_CQSIZE;
149 	if (!overflow)
150 		p.cq_entries = NR_BUFS + 1;
151 	else
152 		p.cq_entries = NR_OVERFLOW;
153 	ret = io_uring_queue_init_params(1, &ring, &p);
154 	if (ret) {
155 		fprintf(stderr, "ring setup failed: %d\n", ret);
156 		return 1;
157 	}
158 
159 	if (pipe(fds) < 0) {
160 		perror("pipe");
161 		return 1;
162 	}
163 
164 	br = io_uring_setup_buf_ring(&ring, NR_BUFS, BUF_BGID, 0, &ret);
165 	if (!br) {
166 		if (ret == -EINVAL) {
167 			no_buf_ring = 1;
168 			return 0;
169 		}
170 		fprintf(stderr, "Buffer ring register failed %d\n", ret);
171 		return 1;
172 	}
173 
174 	for (i = 0; i < NR_BUFS; i++) {
175 		unsigned size = i <= 1 ? BUF_SIZE_FIRST : BUF_SIZE;
176 		ptr[i] = malloc(size);
177 		if (!ptr[i])
178 			return 1;
179 		io_uring_buf_ring_add(br, ptr[i], size, i + 1, BR_MASK, i);
180 	}
181 	io_uring_buf_ring_advance(br, NR_BUFS);
182 
183 	if (first_good) {
184 		sprintf(tmp, "this is buffer %d\n", 0);
185 		ret = write(fds[1], tmp, strlen(tmp));
186 	}
187 
188 	sqe = io_uring_get_sqe(&ring);
189 	/* len == 0 means just use the defined provided buffer length */
190 	io_uring_prep_read_multishot(sqe, fds[0], 0, 0, BUF_BGID);
191 	if (async)
192 		sqe->flags |= IOSQE_ASYNC;
193 
194 	ret = io_uring_submit(&ring);
195 	if (ret != 1) {
196 		fprintf(stderr, "submit: %d\n", ret);
197 		return 1;
198 	}
199 
200 	/* write NR_BUFS + 1, or if first_good is set, NR_BUFS */
201 	for (i = 0; i < NR_BUFS + !first_good; i++) {
202 		/* prevent pipe buffer merging */
203 		usleep(1000);
204 		sprintf(tmp, "this is buffer %d\n", i + 1);
205 		ret = write(fds[1], tmp, strlen(tmp));
206 		if (ret != strlen(tmp)) {
207 			fprintf(stderr, "write ret %d\n", ret);
208 			return 1;
209 		}
210 	}
211 
212 	for (i = 0; i < NR_BUFS + 1; i++) {
213 		ret = io_uring_wait_cqe(&ring, &cqe);
214 		if (ret) {
215 			fprintf(stderr, "wait cqe failed %d\n", ret);
216 			return 1;
217 		}
218 		if (cqe->res < 0) {
219 			/* expected failure as we try to read one too many */
220 			if (cqe->res == -ENOBUFS && i == NR_BUFS)
221 				break;
222 			if (!i && cqe->res == -EINVAL) {
223 				no_read_mshot = 1;
224 				break;
225 			}
226 			fprintf(stderr, "%d: cqe res %d\n", i, cqe->res);
227 			return 1;
228 		} else if (i > 9 && cqe->res <= 17) {
229 			fprintf(stderr, "truncated message %d %d\n", i, cqe->res);
230 			return 1;
231 		}
232 
233 		if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
234 			fprintf(stderr, "no buffer selected\n");
235 			return 1;
236 		}
237 		if (!(cqe->flags & IORING_CQE_F_MORE)) {
238 			/* we expect this on overflow */
239 			if (overflow && i >= NR_OVERFLOW)
240 				break;
241 			fprintf(stderr, "no more cqes\n");
242 			return 1;
243 		}
244 		/* should've overflown! */
245 		if (overflow && i > NR_OVERFLOW) {
246 			fprintf(stderr, "Expected overflow!\n");
247 			return 1;
248 		}
249 		io_uring_cqe_seen(&ring, cqe);
250 	}
251 
252 	io_uring_queue_exit(&ring);
253 	for (i = 0; i < NR_BUFS; i++)
254 		free(ptr[i]);
255 	return 0;
256 }
257 
test_invalid(int async)258 static int test_invalid(int async)
259 {
260 	struct io_uring_buf_ring *br;
261 	struct io_uring_params p = { };
262 	struct io_uring_sqe *sqe;
263 	struct io_uring_cqe *cqe;
264 	struct io_uring ring;
265 	char fname[32] = ".mshot.%d.XXXXXX";
266 	int ret, fd;
267 	char *buf;
268 
269 	p.flags = IORING_SETUP_CQSIZE;
270 	p.cq_entries = NR_BUFS;
271 	ret = io_uring_queue_init_params(1, &ring, &p);
272 	if (ret) {
273 		fprintf(stderr, "ring setup failed: %d\n", ret);
274 		return 1;
275 	}
276 
277 	fd = mkstemp(fname);
278 	if (fd < 0) {
279 		perror("mkstemp");
280 		return 1;
281 	}
282 	unlink(fname);
283 
284 	if (posix_memalign((void **) &buf, 4096, BUF_SIZE))
285 		return 1;
286 
287 	br = io_uring_setup_buf_ring(&ring, 1, BUF_BGID, 0, &ret);
288 	if (!br) {
289 		fprintf(stderr, "Buffer ring register failed %d\n", ret);
290 		return 1;
291 	}
292 
293 	io_uring_buf_ring_add(br, buf, BUF_SIZE, 1, BR_MASK, 0);
294 	io_uring_buf_ring_advance(br, 1);
295 
296 	sqe = io_uring_get_sqe(&ring);
297 	/* len == 0 means just use the defined provided buffer length */
298 	io_uring_prep_read_multishot(sqe, fd, 0, 0, BUF_BGID);
299 	if (async)
300 		sqe->flags |= IOSQE_ASYNC;
301 
302 	ret = io_uring_submit(&ring);
303 	if (ret != 1) {
304 		fprintf(stderr, "submit: %d\n", ret);
305 		return 1;
306 	}
307 
308 	ret = io_uring_wait_cqe(&ring, &cqe);
309 	if (ret) {
310 		fprintf(stderr, "wait cqe failed %d\n", ret);
311 		return 1;
312 	}
313 	if (cqe->flags & IORING_CQE_F_MORE) {
314 		fprintf(stderr, "MORE flag set unexpected %d\n", cqe->flags);
315 		return 1;
316 	}
317 	if (cqe->res != -EBADFD) {
318 		fprintf(stderr, "Got cqe res %d, wanted -EBADFD\n", cqe->res);
319 		return 1;
320 	}
321 
322 	io_uring_cqe_seen(&ring, cqe);
323 	io_uring_queue_exit(&ring);
324 	free(buf);
325 	return 0;
326 }
327 
main(int argc,char * argv[])328 int main(int argc, char *argv[])
329 {
330 	int ret;
331 
332 	if (argc > 1)
333 		return T_EXIT_SKIP;
334 
335 	ret = test(0, 0, 0);
336 	if (ret) {
337 		fprintf(stderr, "test 0 0 0 failed\n");
338 		return T_EXIT_FAIL;
339 	}
340 	if (no_buf_ring || no_read_mshot)
341 		return T_EXIT_SKIP;
342 
343 	ret = test(0, 1, 0);
344 	if (ret) {
345 		fprintf(stderr, "test 0 1 0, failed\n");
346 		return T_EXIT_FAIL;
347 	}
348 
349 	ret = test(1, 0, 0);
350 	if (ret) {
351 		fprintf(stderr, "test 1 0 0 failed\n");
352 		return T_EXIT_FAIL;
353 	}
354 
355 	ret = test(0, 0, 1);
356 	if (ret) {
357 		fprintf(stderr, "test 0 0 1 failed\n");
358 		return T_EXIT_FAIL;
359 	}
360 
361 	ret = test(0, 1, 1);
362 	if (ret) {
363 		fprintf(stderr, "test 0 1 1 failed\n");
364 		return T_EXIT_FAIL;
365 	}
366 
367 	ret = test(1, 0, 1);
368 	if (ret) {
369 		fprintf(stderr, "test 1 0 1, failed\n");
370 		return T_EXIT_FAIL;
371 	}
372 
373 	ret = test(1, 0, 1);
374 	if (ret) {
375 		fprintf(stderr, "test 1 0 1 failed\n");
376 		return T_EXIT_FAIL;
377 	}
378 
379 	ret = test(1, 1, 1);
380 	if (ret) {
381 		fprintf(stderr, "test 1 1 1 failed\n");
382 		return T_EXIT_FAIL;
383 	}
384 
385 	ret = test_invalid(0);
386 	if (ret) {
387 		fprintf(stderr, "test_invalid 0 failed\n");
388 		return T_EXIT_FAIL;
389 	}
390 
391 	ret = test_invalid(1);
392 	if (ret) {
393 		fprintf(stderr, "test_invalid 1 failed\n");
394 		return T_EXIT_FAIL;
395 	}
396 
397 	ret = test_clamp();
398 	if (ret) {
399 		fprintf(stderr, "test_clamp failed\n");
400 		return T_EXIT_FAIL;
401 	}
402 
403 	return T_EXIT_PASS;
404 }
405