1 /* SPDX-License-Identifier: MIT */
2 /*
3 * Description: test multishot read (IORING_OP_READ_MULTISHOT) on pipes,
4 * using ring provided buffers
5 *
6 */
7 #include <errno.h>
8 #include <stdio.h>
9 #include <unistd.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <fcntl.h>
13
14 #include "liburing.h"
15 #include "helpers.h"
16
17 #define BUF_SIZE 32
18 #define BUF_SIZE_FIRST 17
19 #define NR_BUFS 64
20 #define BUF_BGID 1
21
22 #define BR_MASK (NR_BUFS - 1)
23
24 #define NR_OVERFLOW (NR_BUFS / 4)
25
26 static int no_buf_ring, no_read_mshot;
27
test_clamp(void)28 static int test_clamp(void)
29 {
30 struct io_uring_buf_ring *br;
31 struct io_uring_params p = { };
32 struct io_uring_sqe *sqe;
33 struct io_uring_cqe *cqe;
34 struct io_uring ring;
35 int ret, fds[2], i;
36 char tmp[32];
37 char *buf;
38 void *ptr;
39
40 ret = io_uring_queue_init_params(4, &ring, &p);
41 if (ret) {
42 fprintf(stderr, "ring setup failed: %d\n", ret);
43 return 1;
44 }
45
46 if (pipe(fds) < 0) {
47 perror("pipe");
48 return 1;
49 }
50
51 if (posix_memalign((void **) &buf, 4096, NR_BUFS * BUF_SIZE))
52 return 1;
53
54 br = io_uring_setup_buf_ring(&ring, NR_BUFS, BUF_BGID, 0, &ret);
55 if (!br) {
56 if (ret == -EINVAL) {
57 no_buf_ring = 1;
58 return 0;
59 }
60 fprintf(stderr, "Buffer ring register failed %d\n", ret);
61 return 1;
62 }
63
64 ptr = buf;
65 io_uring_buf_ring_add(br, buf, 16, 1, BR_MASK, 0);
66 buf += 16;
67 io_uring_buf_ring_add(br, buf, 32, 2, BR_MASK, 1);
68 buf += 32;
69 io_uring_buf_ring_add(br, buf, 32, 3, BR_MASK, 2);
70 buf += 32;
71 io_uring_buf_ring_add(br, buf, 32, 4, BR_MASK, 3);
72 buf += 32;
73 io_uring_buf_ring_advance(br, 4);
74
75 memset(tmp, 0xaa, sizeof(tmp));
76
77 sqe = io_uring_get_sqe(&ring);
78 io_uring_prep_read_multishot(sqe, fds[0], 0, 0, BUF_BGID);
79
80 ret = io_uring_submit(&ring);
81 if (ret != 1) {
82 fprintf(stderr, "submit: %d\n", ret);
83 return 1;
84 }
85
86 /* prevent pipe buffer merging */
87 usleep(1000);
88 ret = write(fds[1], tmp, 16);
89
90 usleep(1000);
91 ret = write(fds[1], tmp, sizeof(tmp));
92
93 /* prevent pipe buffer merging */
94 usleep(1000);
95 ret = write(fds[1], tmp, 16);
96
97 usleep(1000);
98 ret = write(fds[1], tmp, sizeof(tmp));
99
100 /*
101 * We should see a 16 byte completion, then a 32 byte, then a 16 byte,
102 * and finally a 32 byte again.
103 */
104 for (i = 0; i < 4; i++) {
105 ret = io_uring_wait_cqe(&ring, &cqe);
106 if (ret) {
107 fprintf(stderr, "wait cqe failed %d\n", ret);
108 return 1;
109 }
110 if (cqe->res < 0) {
111 fprintf(stderr, "cqe res: %d\n", cqe->res);
112 return 1;
113 }
114 if (!(cqe->flags & IORING_CQE_F_MORE)) {
115 fprintf(stderr, "no more cqes\n");
116 return 1;
117 }
118 if (i == 0 || i == 2) {
119 if (cqe->res != 16) {
120 fprintf(stderr, "%d cqe got %d\n", i, cqe->res);
121 return 1;
122 }
123 } else if (i == 1 || i == 3) {
124 if (cqe->res != 32) {
125 fprintf(stderr, "%d cqe got %d\n", i, cqe->res);
126 return 1;
127 }
128 }
129 io_uring_cqe_seen(&ring, cqe);
130 }
131
132 io_uring_queue_exit(&ring);
133 free(ptr);
134 return 0;
135 }
136
test(int first_good,int async,int overflow)137 static int test(int first_good, int async, int overflow)
138 {
139 struct io_uring_buf_ring *br;
140 struct io_uring_params p = { };
141 struct io_uring_sqe *sqe;
142 struct io_uring_cqe *cqe;
143 struct io_uring ring;
144 int ret, fds[2], i;
145 char tmp[32];
146 void *ptr[NR_BUFS];
147
148 p.flags = IORING_SETUP_CQSIZE;
149 if (!overflow)
150 p.cq_entries = NR_BUFS + 1;
151 else
152 p.cq_entries = NR_OVERFLOW;
153 ret = io_uring_queue_init_params(1, &ring, &p);
154 if (ret) {
155 fprintf(stderr, "ring setup failed: %d\n", ret);
156 return 1;
157 }
158
159 if (pipe(fds) < 0) {
160 perror("pipe");
161 return 1;
162 }
163
164 br = io_uring_setup_buf_ring(&ring, NR_BUFS, BUF_BGID, 0, &ret);
165 if (!br) {
166 if (ret == -EINVAL) {
167 no_buf_ring = 1;
168 return 0;
169 }
170 fprintf(stderr, "Buffer ring register failed %d\n", ret);
171 return 1;
172 }
173
174 for (i = 0; i < NR_BUFS; i++) {
175 unsigned size = i <= 1 ? BUF_SIZE_FIRST : BUF_SIZE;
176 ptr[i] = malloc(size);
177 if (!ptr[i])
178 return 1;
179 io_uring_buf_ring_add(br, ptr[i], size, i + 1, BR_MASK, i);
180 }
181 io_uring_buf_ring_advance(br, NR_BUFS);
182
183 if (first_good) {
184 sprintf(tmp, "this is buffer %d\n", 0);
185 ret = write(fds[1], tmp, strlen(tmp));
186 }
187
188 sqe = io_uring_get_sqe(&ring);
189 /* len == 0 means just use the defined provided buffer length */
190 io_uring_prep_read_multishot(sqe, fds[0], 0, 0, BUF_BGID);
191 if (async)
192 sqe->flags |= IOSQE_ASYNC;
193
194 ret = io_uring_submit(&ring);
195 if (ret != 1) {
196 fprintf(stderr, "submit: %d\n", ret);
197 return 1;
198 }
199
200 /* write NR_BUFS + 1, or if first_good is set, NR_BUFS */
201 for (i = 0; i < NR_BUFS + !first_good; i++) {
202 /* prevent pipe buffer merging */
203 usleep(1000);
204 sprintf(tmp, "this is buffer %d\n", i + 1);
205 ret = write(fds[1], tmp, strlen(tmp));
206 if (ret != strlen(tmp)) {
207 fprintf(stderr, "write ret %d\n", ret);
208 return 1;
209 }
210 }
211
212 for (i = 0; i < NR_BUFS + 1; i++) {
213 ret = io_uring_wait_cqe(&ring, &cqe);
214 if (ret) {
215 fprintf(stderr, "wait cqe failed %d\n", ret);
216 return 1;
217 }
218 if (cqe->res < 0) {
219 /* expected failure as we try to read one too many */
220 if (cqe->res == -ENOBUFS && i == NR_BUFS)
221 break;
222 if (!i && cqe->res == -EINVAL) {
223 no_read_mshot = 1;
224 break;
225 }
226 fprintf(stderr, "%d: cqe res %d\n", i, cqe->res);
227 return 1;
228 } else if (i > 9 && cqe->res <= 17) {
229 fprintf(stderr, "truncated message %d %d\n", i, cqe->res);
230 return 1;
231 }
232
233 if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
234 fprintf(stderr, "no buffer selected\n");
235 return 1;
236 }
237 if (!(cqe->flags & IORING_CQE_F_MORE)) {
238 /* we expect this on overflow */
239 if (overflow && i >= NR_OVERFLOW)
240 break;
241 fprintf(stderr, "no more cqes\n");
242 return 1;
243 }
244 /* should've overflown! */
245 if (overflow && i > NR_OVERFLOW) {
246 fprintf(stderr, "Expected overflow!\n");
247 return 1;
248 }
249 io_uring_cqe_seen(&ring, cqe);
250 }
251
252 io_uring_queue_exit(&ring);
253 for (i = 0; i < NR_BUFS; i++)
254 free(ptr[i]);
255 return 0;
256 }
257
test_invalid(int async)258 static int test_invalid(int async)
259 {
260 struct io_uring_buf_ring *br;
261 struct io_uring_params p = { };
262 struct io_uring_sqe *sqe;
263 struct io_uring_cqe *cqe;
264 struct io_uring ring;
265 char fname[32] = ".mshot.%d.XXXXXX";
266 int ret, fd;
267 char *buf;
268
269 p.flags = IORING_SETUP_CQSIZE;
270 p.cq_entries = NR_BUFS;
271 ret = io_uring_queue_init_params(1, &ring, &p);
272 if (ret) {
273 fprintf(stderr, "ring setup failed: %d\n", ret);
274 return 1;
275 }
276
277 fd = mkstemp(fname);
278 if (fd < 0) {
279 perror("mkstemp");
280 return 1;
281 }
282 unlink(fname);
283
284 if (posix_memalign((void **) &buf, 4096, BUF_SIZE))
285 return 1;
286
287 br = io_uring_setup_buf_ring(&ring, 1, BUF_BGID, 0, &ret);
288 if (!br) {
289 fprintf(stderr, "Buffer ring register failed %d\n", ret);
290 return 1;
291 }
292
293 io_uring_buf_ring_add(br, buf, BUF_SIZE, 1, BR_MASK, 0);
294 io_uring_buf_ring_advance(br, 1);
295
296 sqe = io_uring_get_sqe(&ring);
297 /* len == 0 means just use the defined provided buffer length */
298 io_uring_prep_read_multishot(sqe, fd, 0, 0, BUF_BGID);
299 if (async)
300 sqe->flags |= IOSQE_ASYNC;
301
302 ret = io_uring_submit(&ring);
303 if (ret != 1) {
304 fprintf(stderr, "submit: %d\n", ret);
305 return 1;
306 }
307
308 ret = io_uring_wait_cqe(&ring, &cqe);
309 if (ret) {
310 fprintf(stderr, "wait cqe failed %d\n", ret);
311 return 1;
312 }
313 if (cqe->flags & IORING_CQE_F_MORE) {
314 fprintf(stderr, "MORE flag set unexpected %d\n", cqe->flags);
315 return 1;
316 }
317 if (cqe->res != -EBADFD) {
318 fprintf(stderr, "Got cqe res %d, wanted -EBADFD\n", cqe->res);
319 return 1;
320 }
321
322 io_uring_cqe_seen(&ring, cqe);
323 io_uring_queue_exit(&ring);
324 free(buf);
325 return 0;
326 }
327
main(int argc,char * argv[])328 int main(int argc, char *argv[])
329 {
330 int ret;
331
332 if (argc > 1)
333 return T_EXIT_SKIP;
334
335 ret = test(0, 0, 0);
336 if (ret) {
337 fprintf(stderr, "test 0 0 0 failed\n");
338 return T_EXIT_FAIL;
339 }
340 if (no_buf_ring || no_read_mshot)
341 return T_EXIT_SKIP;
342
343 ret = test(0, 1, 0);
344 if (ret) {
345 fprintf(stderr, "test 0 1 0, failed\n");
346 return T_EXIT_FAIL;
347 }
348
349 ret = test(1, 0, 0);
350 if (ret) {
351 fprintf(stderr, "test 1 0 0 failed\n");
352 return T_EXIT_FAIL;
353 }
354
355 ret = test(0, 0, 1);
356 if (ret) {
357 fprintf(stderr, "test 0 0 1 failed\n");
358 return T_EXIT_FAIL;
359 }
360
361 ret = test(0, 1, 1);
362 if (ret) {
363 fprintf(stderr, "test 0 1 1 failed\n");
364 return T_EXIT_FAIL;
365 }
366
367 ret = test(1, 0, 1);
368 if (ret) {
369 fprintf(stderr, "test 1 0 1, failed\n");
370 return T_EXIT_FAIL;
371 }
372
373 ret = test(1, 0, 1);
374 if (ret) {
375 fprintf(stderr, "test 1 0 1 failed\n");
376 return T_EXIT_FAIL;
377 }
378
379 ret = test(1, 1, 1);
380 if (ret) {
381 fprintf(stderr, "test 1 1 1 failed\n");
382 return T_EXIT_FAIL;
383 }
384
385 ret = test_invalid(0);
386 if (ret) {
387 fprintf(stderr, "test_invalid 0 failed\n");
388 return T_EXIT_FAIL;
389 }
390
391 ret = test_invalid(1);
392 if (ret) {
393 fprintf(stderr, "test_invalid 1 failed\n");
394 return T_EXIT_FAIL;
395 }
396
397 ret = test_clamp();
398 if (ret) {
399 fprintf(stderr, "test_clamp failed\n");
400 return T_EXIT_FAIL;
401 }
402
403 return T_EXIT_PASS;
404 }
405