1 // SPDX-License-Identifier: MIT
2
3 #include <errno.h>
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <unistd.h>
8 #include <arpa/inet.h>
9 #include <sys/types.h>
10 #include <sys/socket.h>
11 #include <pthread.h>
12 #include <assert.h>
13
14 #include "liburing.h"
15 #include "helpers.h"
16
17 #define ENORECVMULTISHOT 9999
18
19 enum early_error_t {
20 ERROR_NONE = 0,
21 ERROR_NOT_ENOUGH_BUFFERS,
22 ERROR_EARLY_CLOSE_SENDER,
23 ERROR_EARLY_CLOSE_RECEIVER,
24 ERROR_EARLY_OVERFLOW,
25 ERROR_EARLY_LAST
26 };
27
28 struct args {
29 bool stream;
30 bool wait_each;
31 bool recvmsg;
32 enum early_error_t early_error;
33 bool defer;
34 };
35
check_sockaddr(struct sockaddr_in * in)36 static int check_sockaddr(struct sockaddr_in *in)
37 {
38 struct in_addr expected;
39
40 inet_pton(AF_INET, "127.0.0.1", &expected);
41 if (in->sin_family != AF_INET) {
42 fprintf(stderr, "bad family %d\n", (int)htons(in->sin_family));
43 return -1;
44 }
45 if (memcmp(&expected, &in->sin_addr, sizeof(in->sin_addr))) {
46 char buff[256];
47 const char *addr = inet_ntop(AF_INET, &in->sin_addr, buff, sizeof(buff));
48
49 fprintf(stderr, "unexpected address %s\n", addr ? addr : "INVALID");
50 return -1;
51 }
52 return 0;
53 }
54
test(struct args * args)55 static int test(struct args *args)
56 {
57 int const N = 8;
58 int const N_BUFFS = N * 64;
59 int const N_CQE_OVERFLOW = 4;
60 int const min_cqes = args->early_error ? 2 : 8;
61 int const NAME_LEN = sizeof(struct sockaddr_storage);
62 int const CONTROL_LEN = CMSG_ALIGN(sizeof(struct sockaddr_storage))
63 + sizeof(struct cmsghdr);
64 struct io_uring ring;
65 struct io_uring_cqe *cqe;
66 struct io_uring_sqe *sqe;
67 int fds[2], ret, i, j;
68 int total_sent_bytes = 0, total_recv_bytes = 0, total_dropped_bytes = 0;
69 int send_buff[256];
70 int *sent_buffs[N_BUFFS];
71 int *recv_buffs[N_BUFFS];
72 int *at;
73 struct io_uring_cqe recv_cqe[N_BUFFS];
74 int recv_cqes = 0;
75 bool early_error = false;
76 bool early_error_started = false;
77 struct __kernel_timespec timeout = {
78 .tv_sec = 1,
79 };
80 struct msghdr msg;
81 struct io_uring_params params = { };
82 int n_sqe = 32;
83
84 memset(recv_buffs, 0, sizeof(recv_buffs));
85
86 if (args->defer)
87 params.flags |= IORING_SETUP_SINGLE_ISSUER |
88 IORING_SETUP_DEFER_TASKRUN;
89
90 if (args->early_error == ERROR_EARLY_OVERFLOW) {
91 params.flags |= IORING_SETUP_CQSIZE;
92 params.cq_entries = N_CQE_OVERFLOW;
93 n_sqe = N_CQE_OVERFLOW;
94 }
95
96 ret = io_uring_queue_init_params(n_sqe, &ring, ¶ms);
97 if (ret) {
98 fprintf(stderr, "queue init failed: %d\n", ret);
99 return ret;
100 }
101
102 ret = t_create_socket_pair(fds, args->stream);
103 if (ret) {
104 fprintf(stderr, "t_create_socket_pair failed: %d\n", ret);
105 return ret;
106 }
107
108 if (!args->stream) {
109 bool val = true;
110
111 /* force some cmsgs to come back to us */
112 ret = setsockopt(fds[0], IPPROTO_IP, IP_RECVORIGDSTADDR, &val,
113 sizeof(val));
114 if (ret) {
115 fprintf(stderr, "setsockopt failed %d\n", errno);
116 goto cleanup;
117 }
118 }
119
120 for (i = 0; i < ARRAY_SIZE(send_buff); i++)
121 send_buff[i] = i;
122
123 for (i = 0; i < ARRAY_SIZE(recv_buffs); i++) {
124 /* prepare some different sized buffers */
125 int buffer_size = (i % 2 == 0 && (args->stream || args->recvmsg)) ? 1 : N;
126
127 buffer_size *= sizeof(int);
128 if (args->recvmsg) {
129 buffer_size +=
130 sizeof(struct io_uring_recvmsg_out) +
131 NAME_LEN +
132 CONTROL_LEN;
133 }
134
135 recv_buffs[i] = malloc(buffer_size);
136
137 if (i > 2 && args->early_error == ERROR_NOT_ENOUGH_BUFFERS)
138 continue;
139
140 sqe = io_uring_get_sqe(&ring);
141 io_uring_prep_provide_buffers(sqe, recv_buffs[i],
142 buffer_size, 1, 7, i);
143 io_uring_sqe_set_data64(sqe, 0x999);
144 memset(recv_buffs[i], 0xcc, buffer_size);
145 if (io_uring_submit_and_wait_timeout(&ring, &cqe, 1, &timeout, NULL) < 0) {
146 fprintf(stderr, "provide buffers failed: %d\n", ret);
147 ret = -1;
148 goto cleanup;
149 }
150 io_uring_cqe_seen(&ring, cqe);
151 }
152
153 sqe = io_uring_get_sqe(&ring);
154 if (args->recvmsg) {
155 unsigned int flags = 0;
156
157 if (!args->stream)
158 flags |= MSG_TRUNC;
159
160 memset(&msg, 0, sizeof(msg));
161 msg.msg_namelen = NAME_LEN;
162 msg.msg_controllen = CONTROL_LEN;
163 io_uring_prep_recvmsg_multishot(sqe, fds[0], &msg, flags);
164 } else {
165 io_uring_prep_recv_multishot(sqe, fds[0], NULL, 0, 0);
166 }
167 sqe->flags |= IOSQE_BUFFER_SELECT;
168 sqe->buf_group = 7;
169 io_uring_sqe_set_data64(sqe, 1234);
170 io_uring_submit(&ring);
171
172 at = &send_buff[0];
173 total_sent_bytes = 0;
174 for (i = 0; i < N; i++) {
175 int to_send = sizeof(*at) * (i+1);
176
177 total_sent_bytes += to_send;
178 sent_buffs[i] = at;
179 if (send(fds[1], at, to_send, 0) != to_send) {
180 if (early_error_started)
181 break;
182 fprintf(stderr, "send failed %d\n", errno);
183 ret = -1;
184 goto cleanup;
185 }
186
187 if (i == 2) {
188 if (args->early_error == ERROR_EARLY_CLOSE_RECEIVER) {
189 /* allow previous sends to complete */
190 usleep(1000);
191 io_uring_get_events(&ring);
192
193 sqe = io_uring_get_sqe(&ring);
194 io_uring_prep_recv(sqe, fds[0], NULL, 0, 0);
195 io_uring_prep_cancel64(sqe, 1234, 0);
196 io_uring_sqe_set_data64(sqe, 0x888);
197 sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
198 io_uring_submit(&ring);
199 early_error_started = true;
200
201 /* allow the cancel to complete */
202 usleep(1000);
203 io_uring_get_events(&ring);
204 }
205 if (args->early_error == ERROR_EARLY_CLOSE_SENDER) {
206 early_error_started = true;
207 shutdown(fds[1], SHUT_RDWR);
208 close(fds[1]);
209 }
210 }
211 at += (i+1);
212
213 if (args->wait_each) {
214 ret = io_uring_wait_cqes(&ring, &cqe, 1, &timeout, NULL);
215 if (ret) {
216 fprintf(stderr, "wait_each failed: %d\n", ret);
217 ret = -1;
218 goto cleanup;
219 }
220 while (io_uring_peek_cqe(&ring, &cqe) == 0) {
221 recv_cqe[recv_cqes++] = *cqe;
222 if (cqe->flags & IORING_CQE_F_MORE) {
223 io_uring_cqe_seen(&ring, cqe);
224 } else {
225 early_error = true;
226 io_uring_cqe_seen(&ring, cqe);
227 }
228 }
229 if (early_error)
230 break;
231 }
232 }
233
234 close(fds[1]);
235
236 /* allow sends to finish */
237 usleep(1000);
238
239 if ((args->stream && !early_error) || recv_cqes < min_cqes) {
240 unsigned int to_wait = 1;
241
242 if (recv_cqes < min_cqes)
243 to_wait = min_cqes - recv_cqes;
244 ret = io_uring_wait_cqes(&ring, &cqe, to_wait, &timeout, NULL);
245 if (ret && ret != -ETIME) {
246 fprintf(stderr, "wait final failed: %d\n", ret);
247 ret = -1;
248 goto cleanup;
249 }
250 }
251
252 while (io_uring_peek_cqe(&ring, &cqe) == 0) {
253 recv_cqe[recv_cqes++] = *cqe;
254 io_uring_cqe_seen(&ring, cqe);
255 }
256
257 ret = -1;
258 at = &send_buff[0];
259 if (recv_cqes < min_cqes) {
260 if (recv_cqes > 0 && recv_cqe[0].res == -EINVAL) {
261 return -ENORECVMULTISHOT;
262 }
263 /* some kernels apparently don't check ->ioprio, skip */
264 ret = -ENORECVMULTISHOT;
265 goto cleanup;
266 }
267 for (i = 0; i < recv_cqes; i++) {
268 cqe = &recv_cqe[i];
269
270 bool const is_last = i == recv_cqes - 1;
271
272 /*
273 * Older kernels could terminate multishot early due to overflow,
274 * but later ones will not. So discriminate based on the MORE flag.
275 */
276 bool const early_last = args->early_error == ERROR_EARLY_OVERFLOW &&
277 !args->wait_each &&
278 i >= N_CQE_OVERFLOW &&
279 !(cqe->flags & IORING_CQE_F_MORE);
280
281 bool const should_be_last =
282 (cqe->res <= 0) ||
283 (args->stream && is_last) ||
284 early_last;
285 int *this_recv;
286 int orig_payload_size = cqe->res;
287
288
289 if (should_be_last) {
290 int used_res = cqe->res;
291
292 if (!is_last) {
293 fprintf(stderr, "not last cqe had error %d\n", i);
294 goto cleanup;
295 }
296
297 switch (args->early_error) {
298 case ERROR_NOT_ENOUGH_BUFFERS:
299 if (cqe->res != -ENOBUFS) {
300 fprintf(stderr,
301 "ERROR_NOT_ENOUGH_BUFFERS: res %d\n", cqe->res);
302 goto cleanup;
303 }
304 break;
305 case ERROR_EARLY_OVERFLOW:
306 if (cqe->res < 0) {
307 fprintf(stderr,
308 "ERROR_EARLY_OVERFLOW: res %d\n", cqe->res);
309 goto cleanup;
310 }
311 break;
312 case ERROR_EARLY_CLOSE_RECEIVER:
313 if (cqe->res != -ECANCELED) {
314 fprintf(stderr,
315 "ERROR_EARLY_CLOSE_RECEIVER: res %d\n", cqe->res);
316 goto cleanup;
317 }
318 break;
319 case ERROR_NONE:
320 case ERROR_EARLY_CLOSE_SENDER:
321 if (args->recvmsg && (cqe->flags & IORING_CQE_F_BUFFER)) {
322 void *buff = recv_buffs[cqe->flags >> 16];
323 struct io_uring_recvmsg_out *o =
324 io_uring_recvmsg_validate(buff, cqe->res, &msg);
325
326 if (!o) {
327 fprintf(stderr, "invalid buff\n");
328 goto cleanup;
329 }
330 if (o->payloadlen != 0) {
331 fprintf(stderr, "expected 0 payloadlen, got %u\n",
332 o->payloadlen);
333 goto cleanup;
334 }
335 used_res = 0;
336 } else if (cqe->res != 0) {
337 fprintf(stderr, "early error: res %d\n", cqe->res);
338 goto cleanup;
339 }
340 break;
341 case ERROR_EARLY_LAST:
342 fprintf(stderr, "bad error_early\n");
343 goto cleanup;
344 }
345
346 if (cqe->res <= 0 && cqe->flags & IORING_CQE_F_BUFFER) {
347 fprintf(stderr, "final BUFFER flag set\n");
348 goto cleanup;
349 }
350
351 if (cqe->flags & IORING_CQE_F_MORE) {
352 fprintf(stderr, "final MORE flag set\n");
353 goto cleanup;
354 }
355
356 if (used_res <= 0)
357 continue;
358 } else {
359 if (!(cqe->flags & IORING_CQE_F_MORE)) {
360 fprintf(stderr, "MORE flag not set\n");
361 goto cleanup;
362 }
363 }
364
365 if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
366 fprintf(stderr, "BUFFER flag not set\n");
367 goto cleanup;
368 }
369
370 this_recv = recv_buffs[cqe->flags >> 16];
371
372 if (args->recvmsg) {
373 struct io_uring_recvmsg_out *o = io_uring_recvmsg_validate(
374 this_recv, cqe->res, &msg);
375
376 if (!o) {
377 fprintf(stderr, "bad recvmsg\n");
378 goto cleanup;
379 }
380 orig_payload_size = o->payloadlen;
381
382 if (!args->stream) {
383 orig_payload_size = o->payloadlen;
384
385 struct cmsghdr *cmsg;
386
387 if (o->namelen < sizeof(struct sockaddr_in)) {
388 fprintf(stderr, "bad addr len %d",
389 o->namelen);
390 goto cleanup;
391 }
392 if (check_sockaddr((struct sockaddr_in *)io_uring_recvmsg_name(o)))
393 goto cleanup;
394
395 cmsg = io_uring_recvmsg_cmsg_firsthdr(o, &msg);
396 if (!cmsg ||
397 cmsg->cmsg_level != IPPROTO_IP ||
398 cmsg->cmsg_type != IP_RECVORIGDSTADDR) {
399 fprintf(stderr, "bad cmsg");
400 goto cleanup;
401 }
402 if (check_sockaddr((struct sockaddr_in *)CMSG_DATA(cmsg)))
403 goto cleanup;
404 cmsg = io_uring_recvmsg_cmsg_nexthdr(o, &msg, cmsg);
405 if (cmsg) {
406 fprintf(stderr, "unexpected extra cmsg\n");
407 goto cleanup;
408 }
409
410 }
411
412 this_recv = (int *)io_uring_recvmsg_payload(o, &msg);
413 cqe->res = io_uring_recvmsg_payload_length(o, cqe->res, &msg);
414 if (o->payloadlen != cqe->res) {
415 if (!(o->flags & MSG_TRUNC)) {
416 fprintf(stderr, "expected truncated flag\n");
417 goto cleanup;
418 }
419 total_dropped_bytes += (o->payloadlen - cqe->res);
420 }
421 }
422
423 total_recv_bytes += cqe->res;
424
425 if (cqe->res % 4 != 0) {
426 /*
427 * doesn't seem to happen in practice, would need some
428 * work to remove this requirement
429 */
430 fprintf(stderr, "unexpectedly aligned buffer cqe->res=%d\n", cqe->res);
431 goto cleanup;
432 }
433
434 /*
435 * for tcp: check buffer arrived in order
436 * for udp: based on size validate data based on size
437 */
438 if (!args->stream) {
439 int sent_idx = orig_payload_size / sizeof(*at) - 1;
440
441 if (sent_idx < 0 || sent_idx > N) {
442 fprintf(stderr, "Bad sent idx: %d\n", sent_idx);
443 goto cleanup;
444 }
445 at = sent_buffs[sent_idx];
446 }
447 for (j = 0; j < cqe->res / 4; j++) {
448 int sent = *at++;
449 int recv = *this_recv++;
450
451 if (sent != recv) {
452 fprintf(stderr, "recv=%d sent=%d\n", recv, sent);
453 goto cleanup;
454 }
455 }
456 }
457
458 if (args->early_error == ERROR_NONE &&
459 total_recv_bytes + total_dropped_bytes < total_sent_bytes) {
460 fprintf(stderr,
461 "missing recv: recv=%d dropped=%d sent=%d\n",
462 total_recv_bytes, total_sent_bytes, total_dropped_bytes);
463 goto cleanup;
464 }
465
466 ret = 0;
467 cleanup:
468 for (i = 0; i < ARRAY_SIZE(recv_buffs); i++)
469 free(recv_buffs[i]);
470 close(fds[0]);
471 close(fds[1]);
472 io_uring_queue_exit(&ring);
473
474 return ret;
475 }
476
test_enobuf(void)477 static int test_enobuf(void)
478 {
479 struct io_uring ring;
480 struct io_uring_sqe *sqe;
481 struct io_uring_cqe *cqes[16];
482 char buffs[256];
483 int ret, i, fds[2];
484
485 if (t_create_ring(8, &ring, 0) != T_SETUP_OK) {
486 fprintf(stderr, "ring create\n");
487 return -1;
488 }
489
490 ret = t_create_socket_pair(fds, false);
491 if (ret) {
492 fprintf(stderr, "t_create_socket_pair\n");
493 return ret;
494 }
495
496 sqe = io_uring_get_sqe(&ring);
497 assert(sqe);
498 /* deliberately only 2 provided buffers */
499 io_uring_prep_provide_buffers(sqe, &buffs[0], 1, 2, 0, 0);
500 io_uring_sqe_set_data64(sqe, 0);
501
502 sqe = io_uring_get_sqe(&ring);
503 assert(sqe);
504 io_uring_prep_recv_multishot(sqe, fds[0], NULL, 0, 0);
505 io_uring_sqe_set_data64(sqe, 1);
506 sqe->buf_group = 0;
507 sqe->flags |= IOSQE_BUFFER_SELECT;
508
509 ret = io_uring_submit(&ring);
510 if (ret != 2) {
511 fprintf(stderr, "bad submit %d\n", ret);
512 return -1;
513 }
514 for (i = 0; i < 3; i++) {
515 do {
516 ret = write(fds[1], "?", 1);
517 } while (ret == -1 && errno == EINTR);
518 }
519
520 ret = io_uring_wait_cqes(&ring, &cqes[0], 4, NULL, NULL);
521 if (ret) {
522 fprintf(stderr, "wait cqes\n");
523 return ret;
524 }
525
526 ret = io_uring_peek_batch_cqe(&ring, &cqes[0], 4);
527 if (ret != 4) {
528 fprintf(stderr, "peek batch cqes\n");
529 return -1;
530 }
531
532 /* provide buffers */
533 assert(cqes[0]->user_data == 0);
534 assert(cqes[0]->res == 0);
535
536 /* valid recv */
537 assert(cqes[1]->user_data == 1);
538 assert(cqes[2]->user_data == 1);
539 assert(cqes[1]->res == 1);
540 assert(cqes[2]->res == 1);
541 assert(cqes[1]->flags & (IORING_CQE_F_BUFFER | IORING_CQE_F_MORE));
542 assert(cqes[2]->flags & (IORING_CQE_F_BUFFER | IORING_CQE_F_MORE));
543
544 /* missing buffer */
545 assert(cqes[3]->user_data == 1);
546 assert(cqes[3]->res == -ENOBUFS);
547 assert(!(cqes[3]->flags & (IORING_CQE_F_BUFFER | IORING_CQE_F_MORE)));
548
549 close(fds[0]);
550 close(fds[1]);
551 io_uring_queue_exit(&ring);
552 return 0;
553 }
554
main(int argc,char * argv[])555 int main(int argc, char *argv[])
556 {
557 int ret;
558 int loop;
559 int early_error = 0;
560 bool has_defer;
561
562 if (argc > 1)
563 return T_EXIT_SKIP;
564
565 has_defer = t_probe_defer_taskrun();
566
567 for (loop = 0; loop < 16; loop++) {
568 struct args a = {
569 .stream = loop & 0x01,
570 .wait_each = loop & 0x2,
571 .recvmsg = loop & 0x04,
572 .defer = loop & 0x08,
573 };
574 if (a.defer && !has_defer)
575 continue;
576 for (early_error = 0; early_error < ERROR_EARLY_LAST; early_error++) {
577 a.early_error = (enum early_error_t)early_error;
578 ret = test(&a);
579 if (ret) {
580 if (ret == -ENORECVMULTISHOT) {
581 if (loop == 0)
582 return T_EXIT_SKIP;
583 fprintf(stderr,
584 "ENORECVMULTISHOT received but loop>0\n");
585 }
586 fprintf(stderr,
587 "test stream=%d wait_each=%d recvmsg=%d early_error=%d "
588 " defer=%d failed\n",
589 a.stream, a.wait_each, a.recvmsg, a.early_error, a.defer);
590 return T_EXIT_FAIL;
591 }
592 }
593 }
594
595 ret = test_enobuf();
596 if (ret) {
597 fprintf(stderr, "test_enobuf() failed: %d\n", ret);
598 return T_EXIT_FAIL;
599 }
600
601 return T_EXIT_PASS;
602 }
603