• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* SPDX-License-Identifier: MIT */
2 /*
3  * Simple test case showing using send and recv through io_uring
4  */
5 #include <errno.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <unistd.h>
10 #include <arpa/inet.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <pthread.h>
14 
15 #include "liburing.h"
16 #include "helpers.h"
17 
18 static char str[] = "This is a test of send and recv over io_uring!";
19 
20 #define MAX_MSG	128
21 
22 #define PORT	10200
23 #define HOST	"127.0.0.1"
24 
25 #if 0
26 #	define io_uring_prep_send io_uring_prep_write
27 #	define io_uring_prep_recv io_uring_prep_read
28 #endif
29 
recv_prep(struct io_uring * ring,struct iovec * iov,int * sock,int registerfiles)30 static int recv_prep(struct io_uring *ring, struct iovec *iov, int *sock,
31 		     int registerfiles)
32 {
33 	struct sockaddr_in saddr;
34 	struct io_uring_sqe *sqe;
35 	int sockfd, ret, val, use_fd;
36 
37 	memset(&saddr, 0, sizeof(saddr));
38 	saddr.sin_family = AF_INET;
39 	saddr.sin_addr.s_addr = htonl(INADDR_ANY);
40 	saddr.sin_port = htons(PORT);
41 
42 	sockfd = socket(AF_INET, SOCK_DGRAM, 0);
43 	if (sockfd < 0) {
44 		perror("socket");
45 		return 1;
46 	}
47 
48 	val = 1;
49 	setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
50 
51 	ret = bind(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
52 	if (ret < 0) {
53 		perror("bind");
54 		goto err;
55 	}
56 
57 	if (registerfiles) {
58 		ret = io_uring_register_files(ring, &sockfd, 1);
59 		if (ret) {
60 			fprintf(stderr, "file reg failed\n");
61 			goto err;
62 		}
63 		use_fd = 0;
64 	} else {
65 		use_fd = sockfd;
66 	}
67 
68 	sqe = io_uring_get_sqe(ring);
69 	io_uring_prep_recv(sqe, use_fd, iov->iov_base, iov->iov_len, 0);
70 	if (registerfiles)
71 		sqe->flags |= IOSQE_FIXED_FILE;
72 	sqe->user_data = 2;
73 
74 	ret = io_uring_submit(ring);
75 	if (ret <= 0) {
76 		fprintf(stderr, "submit failed: %d\n", ret);
77 		goto err;
78 	}
79 
80 	*sock = sockfd;
81 	return 0;
82 err:
83 	close(sockfd);
84 	return 1;
85 }
86 
do_recv(struct io_uring * ring,struct iovec * iov)87 static int do_recv(struct io_uring *ring, struct iovec *iov)
88 {
89 	struct io_uring_cqe *cqe;
90 	int ret;
91 
92 	ret = io_uring_wait_cqe(ring, &cqe);
93 	if (ret) {
94 		fprintf(stdout, "wait_cqe: %d\n", ret);
95 		goto err;
96 	}
97 	if (cqe->res == -EINVAL) {
98 		fprintf(stdout, "recv not supported, skipping\n");
99 		return 0;
100 	}
101 	if (cqe->res < 0) {
102 		fprintf(stderr, "failed cqe: %d\n", cqe->res);
103 		goto err;
104 	}
105 
106 	if (cqe->res -1 != strlen(str)) {
107 		fprintf(stderr, "got wrong length: %d/%d\n", cqe->res,
108 							(int) strlen(str) + 1);
109 		goto err;
110 	}
111 
112 	if (strcmp(str, iov->iov_base)) {
113 		fprintf(stderr, "string mismatch\n");
114 		goto err;
115 	}
116 
117 	return 0;
118 err:
119 	return 1;
120 }
121 
122 struct recv_data {
123 	pthread_mutex_t mutex;
124 	int use_sqthread;
125 	int registerfiles;
126 };
127 
recv_fn(void * data)128 static void *recv_fn(void *data)
129 {
130 	struct recv_data *rd = data;
131 	char buf[MAX_MSG + 1];
132 	struct iovec iov = {
133 		.iov_base = buf,
134 		.iov_len = sizeof(buf) - 1,
135 	};
136 	struct io_uring_params p = { };
137 	struct io_uring ring;
138 	int ret, sock;
139 
140 	if (rd->use_sqthread)
141 		p.flags = IORING_SETUP_SQPOLL;
142 	ret = t_create_ring_params(1, &ring, &p);
143 	if (ret == T_SETUP_SKIP) {
144 		pthread_mutex_unlock(&rd->mutex);
145 		ret = 0;
146 		goto err;
147 	} else if (ret < 0) {
148 		pthread_mutex_unlock(&rd->mutex);
149 		goto err;
150 	}
151 
152 	if (rd->use_sqthread && !rd->registerfiles) {
153 		if (!(p.features & IORING_FEAT_SQPOLL_NONFIXED)) {
154 			fprintf(stdout, "Non-registered SQPOLL not available, skipping\n");
155 			pthread_mutex_unlock(&rd->mutex);
156 			goto err;
157 		}
158 	}
159 
160 	ret = recv_prep(&ring, &iov, &sock, rd->registerfiles);
161 	if (ret) {
162 		fprintf(stderr, "recv_prep failed: %d\n", ret);
163 		goto err;
164 	}
165 	pthread_mutex_unlock(&rd->mutex);
166 	ret = do_recv(&ring, &iov);
167 
168 	close(sock);
169 	io_uring_queue_exit(&ring);
170 err:
171 	return (void *)(intptr_t)ret;
172 }
173 
do_send(void)174 static int do_send(void)
175 {
176 	struct sockaddr_in saddr;
177 	struct iovec iov = {
178 		.iov_base = str,
179 		.iov_len = sizeof(str),
180 	};
181 	struct io_uring ring;
182 	struct io_uring_cqe *cqe;
183 	struct io_uring_sqe *sqe;
184 	int sockfd, ret;
185 
186 	ret = io_uring_queue_init(1, &ring, 0);
187 	if (ret) {
188 		fprintf(stderr, "queue init failed: %d\n", ret);
189 		return 1;
190 	}
191 
192 	memset(&saddr, 0, sizeof(saddr));
193 	saddr.sin_family = AF_INET;
194 	saddr.sin_port = htons(PORT);
195 	inet_pton(AF_INET, HOST, &saddr.sin_addr);
196 
197 	sockfd = socket(AF_INET, SOCK_DGRAM, 0);
198 	if (sockfd < 0) {
199 		perror("socket");
200 		return 1;
201 	}
202 
203 	ret = connect(sockfd, &saddr, sizeof(saddr));
204 	if (ret < 0) {
205 		perror("connect");
206 		return 1;
207 	}
208 
209 	sqe = io_uring_get_sqe(&ring);
210 	io_uring_prep_send(sqe, sockfd, iov.iov_base, iov.iov_len, 0);
211 	sqe->user_data = 1;
212 
213 	ret = io_uring_submit(&ring);
214 	if (ret <= 0) {
215 		fprintf(stderr, "submit failed: %d\n", ret);
216 		goto err;
217 	}
218 
219 	ret = io_uring_wait_cqe(&ring, &cqe);
220 	if (cqe->res == -EINVAL) {
221 		fprintf(stdout, "send not supported, skipping\n");
222 		close(sockfd);
223 		return 0;
224 	}
225 	if (cqe->res != iov.iov_len) {
226 		fprintf(stderr, "failed cqe: %d\n", cqe->res);
227 		goto err;
228 	}
229 
230 	close(sockfd);
231 	return 0;
232 err:
233 	close(sockfd);
234 	return 1;
235 }
236 
test(int use_sqthread,int regfiles)237 static int test(int use_sqthread, int regfiles)
238 {
239 	pthread_mutexattr_t attr;
240 	pthread_t recv_thread;
241 	struct recv_data rd;
242 	int ret;
243 	void *retval;
244 
245 	pthread_mutexattr_init(&attr);
246 	pthread_mutexattr_setpshared(&attr, 1);
247 	pthread_mutex_init(&rd.mutex, &attr);
248 	pthread_mutex_lock(&rd.mutex);
249 	rd.use_sqthread = use_sqthread;
250 	rd.registerfiles = regfiles;
251 
252 	ret = pthread_create(&recv_thread, NULL, recv_fn, &rd);
253 	if (ret) {
254 		fprintf(stderr, "Thread create failed: %d\n", ret);
255 		pthread_mutex_unlock(&rd.mutex);
256 		return 1;
257 	}
258 
259 	pthread_mutex_lock(&rd.mutex);
260 	do_send();
261 	pthread_join(recv_thread, &retval);
262 	return (int)(intptr_t)retval;
263 }
264 
main(int argc,char * argv[])265 int main(int argc, char *argv[])
266 {
267 	int ret;
268 
269 	if (argc > 1)
270 		return 0;
271 
272 	ret = test(0, 0);
273 	if (ret) {
274 		fprintf(stderr, "test sqthread=0 failed\n");
275 		return ret;
276 	}
277 
278 	ret = test(1, 1);
279 	if (ret) {
280 		fprintf(stderr, "test sqthread=1 reg=1 failed\n");
281 		return ret;
282 	}
283 
284 	ret = test(1, 0);
285 	if (ret) {
286 		fprintf(stderr, "test sqthread=1 reg=0 failed\n");
287 		return ret;
288 	}
289 
290 	return 0;
291 }
292