• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* SPDX-License-Identifier: MIT */
2 /*
3  * Check that IORING_OP_CONNECT works, with and without other side
4  * being open.
5  */
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 
10 #include <errno.h>
11 #include <fcntl.h>
12 #include <unistd.h>
13 #include <poll.h>
14 #include <sys/socket.h>
15 #include <netinet/in.h>
16 #include <netinet/tcp.h>
17 #include <arpa/inet.h>
18 
19 #include "liburing.h"
20 
21 static int no_connect;
22 static int use_port;
23 
create_socket(void)24 static int create_socket(void)
25 {
26 	int fd;
27 
28 	fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
29 	if (fd == -1) {
30 		perror("socket()");
31 		return -1;
32 	}
33 
34 	return fd;
35 }
36 
submit_and_wait(struct io_uring * ring,int * res)37 static int submit_and_wait(struct io_uring *ring, int *res)
38 {
39 	struct io_uring_cqe *cqe;
40 	int ret;
41 
42 	ret = io_uring_submit_and_wait(ring, 1);
43 	if (ret != 1) {
44 		fprintf(stderr, "io_using_submit: got %d\n", ret);
45 		return 1;
46 	}
47 
48 	ret = io_uring_peek_cqe(ring, &cqe);
49 	if (ret) {
50 		fprintf(stderr, "io_uring_peek_cqe(): no cqe returned");
51 		return 1;
52 	}
53 
54 	*res = cqe->res;
55 	io_uring_cqe_seen(ring, cqe);
56 	return 0;
57 }
58 
wait_for(struct io_uring * ring,int fd,int mask)59 static int wait_for(struct io_uring *ring, int fd, int mask)
60 {
61 	struct io_uring_sqe *sqe;
62 	int ret, res;
63 
64 	sqe = io_uring_get_sqe(ring);
65 	if (!sqe) {
66 		fprintf(stderr, "unable to get sqe\n");
67 		return -1;
68 	}
69 
70 	io_uring_prep_poll_add(sqe, fd, mask);
71 	sqe->user_data = 2;
72 
73 	ret = submit_and_wait(ring, &res);
74 	if (ret)
75 		return -1;
76 
77 	if (res < 0) {
78 		fprintf(stderr, "poll(): failed with %d\n", res);
79 		return -1;
80 	}
81 
82 	return res;
83 }
84 
listen_on_socket(int fd)85 static int listen_on_socket(int fd)
86 {
87 	struct sockaddr_in addr;
88 	int ret;
89 
90 	memset(&addr, 0, sizeof(addr));
91 	addr.sin_family = AF_INET;
92 	addr.sin_port = use_port;
93 	addr.sin_addr.s_addr = 0x0100007fU;
94 
95 	ret = bind(fd, (struct sockaddr*)&addr, sizeof(addr));
96 	if (ret == -1) {
97 		perror("bind()");
98 		return -1;
99 	}
100 
101 	ret = listen(fd, 128);
102 	if (ret == -1) {
103 		perror("listen()");
104 		return -1;
105 	}
106 
107 	return 0;
108 }
109 
configure_connect(int fd,struct sockaddr_in * addr)110 static int configure_connect(int fd, struct sockaddr_in* addr)
111 {
112 	int ret, val = 1;
113 
114 	ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
115 	if (ret == -1) {
116 		perror("setsockopt()");
117 		return -1;
118 	}
119 
120 	ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
121 	if (ret == -1) {
122 		perror("setsockopt()");
123 		return -1;
124 	}
125 
126 	memset(addr, 0, sizeof(*addr));
127 	addr->sin_family = AF_INET;
128 	addr->sin_port = use_port;
129 	ret = inet_aton("127.0.0.1", &addr->sin_addr);
130 	return ret;
131 }
132 
connect_socket(struct io_uring * ring,int fd,int * code)133 static int connect_socket(struct io_uring *ring, int fd, int *code)
134 {
135 	struct sockaddr_in addr;
136 	int ret, res;
137 	socklen_t code_len = sizeof(*code);
138 	struct io_uring_sqe *sqe;
139 
140 	if (configure_connect(fd, &addr) == -1)
141 		return -1;
142 
143 	sqe = io_uring_get_sqe(ring);
144 	if (!sqe) {
145 		fprintf(stderr, "unable to get sqe\n");
146 		return -1;
147 	}
148 
149 	io_uring_prep_connect(sqe, fd, (struct sockaddr*)&addr, sizeof(addr));
150 	sqe->user_data = 1;
151 
152 	ret = submit_and_wait(ring, &res);
153 	if (ret)
154 		return -1;
155 
156 	if (res == -EINPROGRESS) {
157 		ret = wait_for(ring, fd, POLLOUT | POLLHUP | POLLERR);
158 		if (ret == -1)
159 			return -1;
160 
161 		int ev = (ret & POLLOUT) || (ret & POLLHUP) || (ret & POLLERR);
162 		if (!ev) {
163 			fprintf(stderr, "poll(): returned invalid value %#x\n", ret);
164 			return -1;
165 		}
166 
167 		ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, code, &code_len);
168 		if (ret == -1) {
169 			perror("getsockopt()");
170 			return -1;
171 		}
172 	} else
173 		*code = res;
174 	return 0;
175 }
176 
test_connect_with_no_peer(struct io_uring * ring)177 static int test_connect_with_no_peer(struct io_uring *ring)
178 {
179 	int connect_fd;
180 	int ret, code;
181 
182 	connect_fd = create_socket();
183 	if (connect_fd == -1)
184 		return -1;
185 
186 	ret = connect_socket(ring, connect_fd, &code);
187 	if (ret == -1)
188 		goto err;
189 
190 	if (code != -ECONNREFUSED) {
191 		if (code == -EINVAL || code == -EBADF || code == -EOPNOTSUPP) {
192 			fprintf(stdout, "No connect support, skipping\n");
193 			no_connect = 1;
194 			goto out;
195 		}
196 		fprintf(stderr, "connect failed with %d\n", code);
197 		goto err;
198 	}
199 
200 out:
201 	close(connect_fd);
202 	return 0;
203 
204 err:
205 	close(connect_fd);
206 	return -1;
207 }
208 
test_connect(struct io_uring * ring)209 static int test_connect(struct io_uring *ring)
210 {
211 	int accept_fd;
212 	int connect_fd;
213 	int ret, code;
214 
215 	accept_fd = create_socket();
216 	if (accept_fd == -1)
217 		return -1;
218 
219 	ret = listen_on_socket(accept_fd);
220 	if (ret == -1)
221 		goto err1;
222 
223 	connect_fd = create_socket();
224 	if (connect_fd == -1)
225 		goto err1;
226 
227 	ret = connect_socket(ring, connect_fd, &code);
228 	if (ret == -1)
229 		goto err2;
230 
231 	if (code != 0) {
232 		fprintf(stderr, "connect failed with %d\n", code);
233 		goto err2;
234 	}
235 
236 	close(connect_fd);
237 	close(accept_fd);
238 
239 	return 0;
240 
241 err2:
242 	close(connect_fd);
243 
244 err1:
245 	close(accept_fd);
246 	return -1;
247 }
248 
test_connect_timeout(struct io_uring * ring)249 static int test_connect_timeout(struct io_uring *ring)
250 {
251 	int connect_fd[2] = {-1, -1};
252 	int accept_fd = -1;
253 	int ret, code;
254 	struct sockaddr_in addr;
255 	struct io_uring_sqe *sqe;
256 	struct __kernel_timespec ts = {.tv_sec = 0, .tv_nsec = 100000};
257 
258 	connect_fd[0] = create_socket();
259 	if (connect_fd[0] == -1)
260 		return -1;
261 
262 	connect_fd[1] = create_socket();
263 	if (connect_fd[1] == -1)
264 		goto err;
265 
266 	accept_fd = create_socket();
267 	if (accept_fd == -1)
268 		goto err;
269 
270 	if (configure_connect(connect_fd[0], &addr) == -1)
271 		goto err;
272 
273 	if (configure_connect(connect_fd[1], &addr) == -1)
274 		goto err;
275 
276 	ret = bind(accept_fd, (struct sockaddr*)&addr, sizeof(addr));
277 	if (ret == -1) {
278 		perror("bind()");
279 		goto err;
280 	}
281 
282 	ret = listen(accept_fd, 0);  // no backlog in order to block connect_fd[1]
283 	if (ret == -1) {
284 		perror("listen()");
285 		goto err;
286 	}
287 
288 	// We first connect with one client socket in order to fill the accept queue.
289 	ret = connect_socket(ring, connect_fd[0], &code);
290 	if (ret == -1 || code != 0) {
291 		fprintf(stderr, "unable to connect\n");
292 		goto err;
293 	}
294 
295 	// We do not offload completion events from listening socket on purpose.
296 	// This way we create a state where the second connect request being stalled by OS.
297 	sqe = io_uring_get_sqe(ring);
298 	if (!sqe) {
299 		fprintf(stderr, "unable to get sqe\n");
300 		goto err;
301 	}
302 
303 	io_uring_prep_connect(sqe, connect_fd[1], (struct sockaddr*)&addr, sizeof(addr));
304 	sqe->user_data = 1;
305 	sqe->flags |= IOSQE_IO_LINK;
306 
307 	sqe = io_uring_get_sqe(ring);
308 	if (!sqe) {
309 		fprintf(stderr, "unable to get sqe\n");
310 		goto err;
311 	}
312 	io_uring_prep_link_timeout(sqe, &ts, 0);
313 	sqe->user_data = 2;
314 
315 	ret = io_uring_submit(ring);
316 	if (ret != 2) {
317 		fprintf(stderr, "submitted %d\n", ret);
318 		return -1;
319 	}
320 
321 	for (int i = 0; i < 2; i++) {
322 		int expected;
323 		struct io_uring_cqe *cqe;
324 
325 		ret = io_uring_wait_cqe(ring, &cqe);
326 		if (ret) {
327 			fprintf(stderr, "wait_cqe=%d\n", ret);
328 			return -1;
329 		}
330 
331 		expected = (cqe->user_data == 1) ? -ECANCELED : -ETIME;
332 		if (expected != cqe->res) {
333 			fprintf(stderr, "cqe %d, res %d, wanted %d\n",
334 					(int)cqe->user_data, cqe->res, expected);
335 			goto err;
336 		}
337 		io_uring_cqe_seen(ring, cqe);
338 	}
339 
340 	close(connect_fd[0]);
341 	close(connect_fd[1]);
342 	close(accept_fd);
343 	return 0;
344 
345 err:
346 	if (connect_fd[0] != -1)
347 		close(connect_fd[0]);
348 	if (connect_fd[1] != -1)
349 		close(connect_fd[1]);
350 
351 	if (accept_fd != -1)
352 		close(accept_fd);
353 	return -1;
354 }
355 
main(int argc,char * argv[])356 int main(int argc, char *argv[])
357 {
358 	struct io_uring ring;
359 	int ret;
360 
361 	if (argc > 1)
362 		return 0;
363 
364 	ret = io_uring_queue_init(8, &ring, 0);
365 	if (ret) {
366 		fprintf(stderr, "io_uring_queue_setup() = %d\n", ret);
367 		return 1;
368 	}
369 
370 	srand(getpid());
371 	use_port = (rand() % 61440) + 4096;
372 
373 	ret = test_connect_with_no_peer(&ring);
374 	if (ret == -1) {
375 		fprintf(stderr, "test_connect_with_no_peer(): failed\n");
376 		return 1;
377 	}
378 	if (no_connect)
379 		return 0;
380 
381 	ret = test_connect(&ring);
382 	if (ret == -1) {
383 		fprintf(stderr, "test_connect(): failed\n");
384 		return 1;
385 	}
386 
387 	ret = test_connect_timeout(&ring);
388 	if (ret == -1) {
389 		fprintf(stderr, "test_connect_timeout(): failed\n");
390 		return 1;
391 	}
392 
393 	io_uring_queue_exit(&ring);
394 	return 0;
395 }
396