• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // SPDX-License-Identifier: MIT
2 
3 #include <errno.h>
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <unistd.h>
8 #include <arpa/inet.h>
9 #include <sys/types.h>
10 #include <sys/socket.h>
11 #include <pthread.h>
12 #include <assert.h>
13 
14 #include "liburing.h"
15 #include "helpers.h"
16 
17 #define ENORECVMULTISHOT 9999
18 
19 enum early_error_t {
20 	ERROR_NONE  = 0,
21 	ERROR_NOT_ENOUGH_BUFFERS,
22 	ERROR_EARLY_CLOSE_SENDER,
23 	ERROR_EARLY_CLOSE_RECEIVER,
24 	ERROR_EARLY_OVERFLOW,
25 	ERROR_EARLY_LAST
26 };
27 
28 struct args {
29 	bool stream;
30 	bool wait_each;
31 	bool recvmsg;
32 	enum early_error_t early_error;
33 	bool defer;
34 };
35 
check_sockaddr(struct sockaddr_in * in)36 static int check_sockaddr(struct sockaddr_in *in)
37 {
38 	struct in_addr expected;
39 
40 	inet_pton(AF_INET, "127.0.0.1", &expected);
41 	if (in->sin_family != AF_INET) {
42 		fprintf(stderr, "bad family %d\n", (int)htons(in->sin_family));
43 		return -1;
44 	}
45 	if (memcmp(&expected, &in->sin_addr, sizeof(in->sin_addr))) {
46 		char buff[256];
47 		const char *addr = inet_ntop(AF_INET, &in->sin_addr, buff, sizeof(buff));
48 
49 		fprintf(stderr, "unexpected address %s\n", addr ? addr : "INVALID");
50 		return -1;
51 	}
52 	return 0;
53 }
54 
test(struct args * args)55 static int test(struct args *args)
56 {
57 	int const N = 8;
58 	int const N_BUFFS = N * 64;
59 	int const N_CQE_OVERFLOW = 4;
60 	int const min_cqes = args->early_error ? 2 : 8;
61 	int const NAME_LEN = sizeof(struct sockaddr_storage);
62 	int const CONTROL_LEN = CMSG_ALIGN(sizeof(struct sockaddr_storage))
63 					+ sizeof(struct cmsghdr);
64 	struct io_uring ring;
65 	struct io_uring_cqe *cqe;
66 	struct io_uring_sqe *sqe;
67 	int fds[2], ret, i, j;
68 	int total_sent_bytes = 0, total_recv_bytes = 0, total_dropped_bytes = 0;
69 	int send_buff[256];
70 	int *sent_buffs[N_BUFFS];
71 	int *recv_buffs[N_BUFFS];
72 	int *at;
73 	struct io_uring_cqe recv_cqe[N_BUFFS];
74 	int recv_cqes = 0;
75 	bool early_error = false;
76 	bool early_error_started = false;
77 	struct __kernel_timespec timeout = {
78 		.tv_sec = 1,
79 	};
80 	struct msghdr msg;
81 	struct io_uring_params params = { };
82 	int n_sqe = 32;
83 
84 	memset(recv_buffs, 0, sizeof(recv_buffs));
85 
86 	if (args->defer)
87 		params.flags |= IORING_SETUP_SINGLE_ISSUER |
88 				IORING_SETUP_DEFER_TASKRUN;
89 
90 	if (args->early_error == ERROR_EARLY_OVERFLOW) {
91 		params.flags |= IORING_SETUP_CQSIZE;
92 		params.cq_entries = N_CQE_OVERFLOW;
93 		n_sqe = N_CQE_OVERFLOW;
94 	}
95 
96 	ret = io_uring_queue_init_params(n_sqe, &ring, &params);
97 	if (ret) {
98 		fprintf(stderr, "queue init failed: %d\n", ret);
99 		return ret;
100 	}
101 
102 	ret = t_create_socket_pair(fds, args->stream);
103 	if (ret) {
104 		fprintf(stderr, "t_create_socket_pair failed: %d\n", ret);
105 		return ret;
106 	}
107 
108 	if (!args->stream) {
109 		bool val = true;
110 
111 		/* force some cmsgs to come back to us */
112 		ret = setsockopt(fds[0], IPPROTO_IP, IP_RECVORIGDSTADDR, &val,
113 				 sizeof(val));
114 		if (ret) {
115 			fprintf(stderr, "setsockopt failed %d\n", errno);
116 			goto cleanup;
117 		}
118 	}
119 
120 	for (i = 0; i < ARRAY_SIZE(send_buff); i++)
121 		send_buff[i] = i;
122 
123 	for (i = 0; i < ARRAY_SIZE(recv_buffs); i++) {
124 		/* prepare some different sized buffers */
125 		int buffer_size = (i % 2 == 0 && (args->stream || args->recvmsg)) ? 1 : N;
126 
127 		buffer_size *= sizeof(int);
128 		if (args->recvmsg) {
129 			buffer_size +=
130 				sizeof(struct io_uring_recvmsg_out) +
131 				NAME_LEN +
132 				CONTROL_LEN;
133 		}
134 
135 		recv_buffs[i] = malloc(buffer_size);
136 
137 		if (i > 2 && args->early_error == ERROR_NOT_ENOUGH_BUFFERS)
138 			continue;
139 
140 		sqe = io_uring_get_sqe(&ring);
141 		io_uring_prep_provide_buffers(sqe, recv_buffs[i],
142 					buffer_size, 1, 7, i);
143 		io_uring_sqe_set_data64(sqe, 0x999);
144 		memset(recv_buffs[i], 0xcc, buffer_size);
145 		if (io_uring_submit_and_wait_timeout(&ring, &cqe, 1, &timeout, NULL) < 0) {
146 			fprintf(stderr, "provide buffers failed: %d\n", ret);
147 			ret = -1;
148 			goto cleanup;
149 		}
150 		io_uring_cqe_seen(&ring, cqe);
151 	}
152 
153 	sqe = io_uring_get_sqe(&ring);
154 	if (args->recvmsg) {
155 		unsigned int flags = 0;
156 
157 		if (!args->stream)
158 			flags |= MSG_TRUNC;
159 
160 		memset(&msg, 0, sizeof(msg));
161 		msg.msg_namelen = NAME_LEN;
162 		msg.msg_controllen = CONTROL_LEN;
163 		io_uring_prep_recvmsg_multishot(sqe, fds[0], &msg, flags);
164 	} else {
165 		io_uring_prep_recv_multishot(sqe, fds[0], NULL, 0, 0);
166 	}
167 	sqe->flags |= IOSQE_BUFFER_SELECT;
168 	sqe->buf_group = 7;
169 	io_uring_sqe_set_data64(sqe, 1234);
170 	io_uring_submit(&ring);
171 
172 	at = &send_buff[0];
173 	total_sent_bytes = 0;
174 	for (i = 0; i < N; i++) {
175 		int to_send = sizeof(*at) * (i+1);
176 
177 		total_sent_bytes += to_send;
178 		sent_buffs[i] = at;
179 		if (send(fds[1], at, to_send, 0) != to_send) {
180 			if (early_error_started)
181 				break;
182 			fprintf(stderr, "send failed %d\n", errno);
183 			ret = -1;
184 			goto cleanup;
185 		}
186 
187 		if (i == 2) {
188 			if (args->early_error == ERROR_EARLY_CLOSE_RECEIVER) {
189 				/* allow previous sends to complete */
190 				usleep(1000);
191 				io_uring_get_events(&ring);
192 
193 				sqe = io_uring_get_sqe(&ring);
194 				io_uring_prep_recv(sqe, fds[0], NULL, 0, 0);
195 				io_uring_prep_cancel64(sqe, 1234, 0);
196 				io_uring_sqe_set_data64(sqe, 0x888);
197 				sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
198 				io_uring_submit(&ring);
199 				early_error_started = true;
200 
201 				/* allow the cancel to complete */
202 				usleep(1000);
203 				io_uring_get_events(&ring);
204 			}
205 			if (args->early_error == ERROR_EARLY_CLOSE_SENDER) {
206 				early_error_started = true;
207 				shutdown(fds[1], SHUT_RDWR);
208 				close(fds[1]);
209 			}
210 		}
211 		at += (i+1);
212 
213 		if (args->wait_each) {
214 			ret = io_uring_wait_cqes(&ring, &cqe, 1, &timeout, NULL);
215 			if (ret) {
216 				fprintf(stderr, "wait_each failed: %d\n", ret);
217 				ret = -1;
218 				goto cleanup;
219 			}
220 			while (io_uring_peek_cqe(&ring, &cqe) == 0) {
221 				recv_cqe[recv_cqes++] = *cqe;
222 				if (cqe->flags & IORING_CQE_F_MORE) {
223 					io_uring_cqe_seen(&ring, cqe);
224 				} else {
225 					early_error = true;
226 					io_uring_cqe_seen(&ring, cqe);
227 				}
228 			}
229 			if (early_error)
230 				break;
231 		}
232 	}
233 
234 	close(fds[1]);
235 
236 	/* allow sends to finish */
237 	usleep(1000);
238 
239 	if ((args->stream && !early_error) || recv_cqes < min_cqes) {
240 		unsigned int to_wait = 1;
241 
242 		if (recv_cqes < min_cqes)
243 			to_wait = min_cqes - recv_cqes;
244 		ret = io_uring_wait_cqes(&ring, &cqe, to_wait, &timeout, NULL);
245 		if (ret && ret != -ETIME) {
246 			fprintf(stderr, "wait final failed: %d\n", ret);
247 			ret = -1;
248 			goto cleanup;
249 		}
250 	}
251 
252 	while (io_uring_peek_cqe(&ring, &cqe) == 0) {
253 		recv_cqe[recv_cqes++] = *cqe;
254 		io_uring_cqe_seen(&ring, cqe);
255 	}
256 
257 	ret = -1;
258 	at = &send_buff[0];
259 	if (recv_cqes < min_cqes) {
260 		if (recv_cqes > 0 && recv_cqe[0].res == -EINVAL) {
261 			return -ENORECVMULTISHOT;
262 		}
263 		/* some kernels apparently don't check ->ioprio, skip */
264 		ret = -ENORECVMULTISHOT;
265 		goto cleanup;
266 	}
267 	for (i = 0; i < recv_cqes; i++) {
268 		cqe = &recv_cqe[i];
269 
270 		bool const is_last = i == recv_cqes - 1;
271 
272 		/*
273 		 * Older kernels could terminate multishot early due to overflow,
274 		 * but later ones will not. So discriminate based on the MORE flag.
275 		 */
276 		bool const early_last = args->early_error == ERROR_EARLY_OVERFLOW &&
277 					!args->wait_each &&
278 					i >= N_CQE_OVERFLOW &&
279 					!(cqe->flags & IORING_CQE_F_MORE);
280 
281 		bool const should_be_last =
282 			(cqe->res <= 0) ||
283 			(args->stream && is_last) ||
284 			early_last;
285 		int *this_recv;
286 		int orig_payload_size = cqe->res;
287 
288 
289 		if (should_be_last) {
290 			int used_res = cqe->res;
291 
292 			if (!is_last) {
293 				fprintf(stderr, "not last cqe had error %d\n", i);
294 				goto cleanup;
295 			}
296 
297 			switch (args->early_error) {
298 			case ERROR_NOT_ENOUGH_BUFFERS:
299 				if (cqe->res != -ENOBUFS) {
300 					fprintf(stderr,
301 						"ERROR_NOT_ENOUGH_BUFFERS: res %d\n", cqe->res);
302 					goto cleanup;
303 				}
304 				break;
305 			case ERROR_EARLY_OVERFLOW:
306 				if (cqe->res < 0) {
307 					fprintf(stderr,
308 						"ERROR_EARLY_OVERFLOW: res %d\n", cqe->res);
309 					goto cleanup;
310 				}
311 				break;
312 			case ERROR_EARLY_CLOSE_RECEIVER:
313 				if (cqe->res != -ECANCELED) {
314 					fprintf(stderr,
315 						"ERROR_EARLY_CLOSE_RECEIVER: res %d\n", cqe->res);
316 					goto cleanup;
317 				}
318 				break;
319 			case ERROR_NONE:
320 			case ERROR_EARLY_CLOSE_SENDER:
321 				if (args->recvmsg && (cqe->flags & IORING_CQE_F_BUFFER)) {
322 					void *buff = recv_buffs[cqe->flags >> 16];
323 					struct io_uring_recvmsg_out *o =
324 						io_uring_recvmsg_validate(buff, cqe->res, &msg);
325 
326 					if (!o) {
327 						fprintf(stderr, "invalid buff\n");
328 						goto cleanup;
329 					}
330 					if (o->payloadlen != 0) {
331 						fprintf(stderr, "expected 0 payloadlen, got %u\n",
332 							o->payloadlen);
333 						goto cleanup;
334 					}
335 					used_res = 0;
336 				} else if (cqe->res != 0) {
337 					fprintf(stderr, "early error: res %d\n", cqe->res);
338 					goto cleanup;
339 				}
340 				break;
341 			case ERROR_EARLY_LAST:
342 				fprintf(stderr, "bad error_early\n");
343 				goto cleanup;
344 			}
345 
346 			if (cqe->res <= 0 && cqe->flags & IORING_CQE_F_BUFFER) {
347 				fprintf(stderr, "final BUFFER flag set\n");
348 				goto cleanup;
349 			}
350 
351 			if (cqe->flags & IORING_CQE_F_MORE) {
352 				fprintf(stderr, "final MORE flag set\n");
353 				goto cleanup;
354 			}
355 
356 			if (used_res <= 0)
357 				continue;
358 		} else {
359 			if (!(cqe->flags & IORING_CQE_F_MORE)) {
360 				fprintf(stderr, "MORE flag not set\n");
361 				goto cleanup;
362 			}
363 		}
364 
365 		if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
366 			fprintf(stderr, "BUFFER flag not set\n");
367 			goto cleanup;
368 		}
369 
370 		this_recv = recv_buffs[cqe->flags >> 16];
371 
372 		if (args->recvmsg) {
373 			struct io_uring_recvmsg_out *o = io_uring_recvmsg_validate(
374 				this_recv, cqe->res, &msg);
375 
376 			if (!o) {
377 				fprintf(stderr, "bad recvmsg\n");
378 				goto cleanup;
379 			}
380 			orig_payload_size = o->payloadlen;
381 
382 			if (!args->stream) {
383 				orig_payload_size = o->payloadlen;
384 
385 				struct cmsghdr *cmsg;
386 
387 				if (o->namelen < sizeof(struct sockaddr_in)) {
388 					fprintf(stderr, "bad addr len %d",
389 						o->namelen);
390 					goto cleanup;
391 				}
392 				if (check_sockaddr((struct sockaddr_in *)io_uring_recvmsg_name(o)))
393 					goto cleanup;
394 
395 				cmsg = io_uring_recvmsg_cmsg_firsthdr(o, &msg);
396 				if (!cmsg ||
397 				    cmsg->cmsg_level != IPPROTO_IP ||
398 				    cmsg->cmsg_type != IP_RECVORIGDSTADDR) {
399 					fprintf(stderr, "bad cmsg");
400 					goto cleanup;
401 				}
402 				if (check_sockaddr((struct sockaddr_in *)CMSG_DATA(cmsg)))
403 					goto cleanup;
404 				cmsg = io_uring_recvmsg_cmsg_nexthdr(o, &msg, cmsg);
405 				if (cmsg) {
406 					fprintf(stderr, "unexpected extra cmsg\n");
407 					goto cleanup;
408 				}
409 
410 			}
411 
412 			this_recv = (int *)io_uring_recvmsg_payload(o, &msg);
413 			cqe->res = io_uring_recvmsg_payload_length(o, cqe->res, &msg);
414 			if (o->payloadlen != cqe->res) {
415 				if (!(o->flags & MSG_TRUNC)) {
416 					fprintf(stderr, "expected truncated flag\n");
417 					goto cleanup;
418 				}
419 				total_dropped_bytes += (o->payloadlen - cqe->res);
420 			}
421 		}
422 
423 		total_recv_bytes += cqe->res;
424 
425 		if (cqe->res % 4 != 0) {
426 			/*
427 			 * doesn't seem to happen in practice, would need some
428 			 * work to remove this requirement
429 			 */
430 			fprintf(stderr, "unexpectedly aligned buffer cqe->res=%d\n", cqe->res);
431 			goto cleanup;
432 		}
433 
434 		/*
435 		 * for tcp: check buffer arrived in order
436 		 * for udp: based on size validate data based on size
437 		 */
438 		if (!args->stream) {
439 			int sent_idx = orig_payload_size / sizeof(*at) - 1;
440 
441 			if (sent_idx < 0 || sent_idx > N) {
442 				fprintf(stderr, "Bad sent idx: %d\n", sent_idx);
443 				goto cleanup;
444 			}
445 			at = sent_buffs[sent_idx];
446 		}
447 		for (j = 0; j < cqe->res / 4; j++) {
448 			int sent = *at++;
449 			int recv = *this_recv++;
450 
451 			if (sent != recv) {
452 				fprintf(stderr, "recv=%d sent=%d\n", recv, sent);
453 				goto cleanup;
454 			}
455 		}
456 	}
457 
458 	if (args->early_error == ERROR_NONE &&
459 	    total_recv_bytes + total_dropped_bytes < total_sent_bytes) {
460 		fprintf(stderr,
461 			"missing recv: recv=%d dropped=%d sent=%d\n",
462 			total_recv_bytes, total_sent_bytes, total_dropped_bytes);
463 		goto cleanup;
464 	}
465 
466 	ret = 0;
467 cleanup:
468 	for (i = 0; i < ARRAY_SIZE(recv_buffs); i++)
469 		free(recv_buffs[i]);
470 	close(fds[0]);
471 	close(fds[1]);
472 	io_uring_queue_exit(&ring);
473 
474 	return ret;
475 }
476 
test_enobuf(void)477 static int test_enobuf(void)
478 {
479 	struct io_uring ring;
480 	struct io_uring_sqe *sqe;
481 	struct io_uring_cqe *cqes[16];
482 	char buffs[256];
483 	int ret, i, fds[2];
484 
485 	if (t_create_ring(8, &ring, 0) != T_SETUP_OK) {
486 		fprintf(stderr, "ring create\n");
487 		return -1;
488 	}
489 
490 	ret = t_create_socket_pair(fds, false);
491 	if (ret) {
492 		fprintf(stderr, "t_create_socket_pair\n");
493 		return ret;
494 	}
495 
496 	sqe = io_uring_get_sqe(&ring);
497 	assert(sqe);
498 	/* deliberately only 2 provided buffers */
499 	io_uring_prep_provide_buffers(sqe, &buffs[0], 1, 2, 0, 0);
500 	io_uring_sqe_set_data64(sqe, 0);
501 
502 	sqe = io_uring_get_sqe(&ring);
503 	assert(sqe);
504 	io_uring_prep_recv_multishot(sqe, fds[0], NULL, 0, 0);
505 	io_uring_sqe_set_data64(sqe, 1);
506 	sqe->buf_group = 0;
507 	sqe->flags |= IOSQE_BUFFER_SELECT;
508 
509 	ret = io_uring_submit(&ring);
510 	if (ret != 2) {
511 		fprintf(stderr, "bad submit %d\n", ret);
512 		return -1;
513 	}
514 	for (i = 0; i < 3; i++) {
515 		do {
516 			ret = write(fds[1], "?", 1);
517 		} while (ret == -1 && errno == EINTR);
518 	}
519 
520 	ret = io_uring_wait_cqes(&ring, &cqes[0], 4, NULL, NULL);
521 	if (ret) {
522 		fprintf(stderr, "wait cqes\n");
523 		return ret;
524 	}
525 
526 	ret = io_uring_peek_batch_cqe(&ring, &cqes[0], 4);
527 	if (ret != 4) {
528 		fprintf(stderr, "peek batch cqes\n");
529 		return -1;
530 	}
531 
532 	/* provide buffers */
533 	assert(cqes[0]->user_data == 0);
534 	assert(cqes[0]->res == 0);
535 
536 	/* valid recv */
537 	assert(cqes[1]->user_data == 1);
538 	assert(cqes[2]->user_data == 1);
539 	assert(cqes[1]->res == 1);
540 	assert(cqes[2]->res == 1);
541 	assert(cqes[1]->flags & (IORING_CQE_F_BUFFER | IORING_CQE_F_MORE));
542 	assert(cqes[2]->flags & (IORING_CQE_F_BUFFER | IORING_CQE_F_MORE));
543 
544 	/* missing buffer */
545 	assert(cqes[3]->user_data == 1);
546 	assert(cqes[3]->res == -ENOBUFS);
547 	assert(!(cqes[3]->flags & (IORING_CQE_F_BUFFER | IORING_CQE_F_MORE)));
548 
549 	close(fds[0]);
550 	close(fds[1]);
551 	io_uring_queue_exit(&ring);
552 	return 0;
553 }
554 
main(int argc,char * argv[])555 int main(int argc, char *argv[])
556 {
557 	int ret;
558 	int loop;
559 	int early_error = 0;
560 	bool has_defer;
561 
562 	if (argc > 1)
563 		return T_EXIT_SKIP;
564 
565 	has_defer = t_probe_defer_taskrun();
566 
567 	for (loop = 0; loop < 16; loop++) {
568 		struct args a = {
569 			.stream = loop & 0x01,
570 			.wait_each = loop & 0x2,
571 			.recvmsg = loop & 0x04,
572 			.defer = loop & 0x08,
573 		};
574 		if (a.defer && !has_defer)
575 			continue;
576 		for (early_error = 0; early_error < ERROR_EARLY_LAST; early_error++) {
577 			a.early_error = (enum early_error_t)early_error;
578 			ret = test(&a);
579 			if (ret) {
580 				if (ret == -ENORECVMULTISHOT) {
581 					if (loop == 0)
582 						return T_EXIT_SKIP;
583 					fprintf(stderr,
584 						"ENORECVMULTISHOT received but loop>0\n");
585 				}
586 				fprintf(stderr,
587 					"test stream=%d wait_each=%d recvmsg=%d early_error=%d "
588 					" defer=%d failed\n",
589 					a.stream, a.wait_each, a.recvmsg, a.early_error, a.defer);
590 				return T_EXIT_FAIL;
591 			}
592 		}
593 	}
594 
595 	ret = test_enobuf();
596 	if (ret) {
597 		fprintf(stderr, "test_enobuf() failed: %d\n", ret);
598 		return T_EXIT_FAIL;
599 	}
600 
601 	return T_EXIT_PASS;
602 }
603