• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* SPDX-License-Identifier: MIT */
2 /*
3  * Test case for socket read/write through IORING_OP_READV and
4  * IORING_OP_WRITEV, using both TCP and sockets and blocking and
5  * non-blocking IO.
6  *
7  * Heavily based on a test case from Hrvoje Zeba <zeba.hrvoje@gmail.com>
8  */
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <stdint.h>
12 #include <assert.h>
13 
14 #include <pthread.h>
15 #include <errno.h>
16 #include <fcntl.h>
17 #include <unistd.h>
18 #include <sys/socket.h>
19 #include <sys/un.h>
20 #include <netinet/tcp.h>
21 #include <netinet/in.h>
22 #include <arpa/inet.h>
23 
24 #include "helpers.h"
25 #include "liburing.h"
26 
27 #define RECV_BUFF_SIZE 2
28 #define SEND_BUFF_SIZE 3
29 
30 struct params {
31 	int tcp;
32 	int non_blocking;
33 	__be16 bind_port;
34 };
35 
36 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
37 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
38 static int rcv_ready = 0;
39 
set_rcv_ready(void)40 static void set_rcv_ready(void)
41 {
42 	pthread_mutex_lock(&mutex);
43 
44 	rcv_ready = 1;
45 	pthread_cond_signal(&cond);
46 
47 	pthread_mutex_unlock(&mutex);
48 }
49 
wait_for_rcv_ready(void)50 static void wait_for_rcv_ready(void)
51 {
52 	pthread_mutex_lock(&mutex);
53 
54 	while (!rcv_ready)
55 		pthread_cond_wait(&cond, &mutex);
56 
57 	pthread_mutex_unlock(&mutex);
58 }
59 
rcv(void * arg)60 static void *rcv(void *arg)
61 {
62 	struct params *p = arg;
63 	int s0;
64 	int res;
65 
66 	if (p->tcp) {
67 		int ret, val = 1;
68 
69 		s0 = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP);
70 		res = setsockopt(s0, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
71 		assert(res != -1);
72 		res = setsockopt(s0, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
73 		assert(res != -1);
74 
75 		struct sockaddr_in addr;
76 
77 		addr.sin_family = AF_INET;
78 		addr.sin_addr.s_addr = inet_addr("127.0.0.1");
79 		ret = t_bind_ephemeral_port(s0, &addr);
80 		assert(!ret);
81 		p->bind_port = addr.sin_port;
82 	} else {
83 		s0 = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
84 		assert(s0 != -1);
85 
86 		struct sockaddr_un addr;
87 		memset(&addr, 0, sizeof(addr));
88 
89 		addr.sun_family = AF_UNIX;
90 		memcpy(addr.sun_path, "\0sock", 6);
91 		res = bind(s0, (struct sockaddr *) &addr, sizeof(addr));
92 		assert(res != -1);
93 	}
94 	res = listen(s0, 128);
95 	assert(res != -1);
96 
97 	set_rcv_ready();
98 
99 	int s1 = accept(s0, NULL, NULL);
100 	assert(s1 != -1);
101 
102 	if (p->non_blocking) {
103 		int flags = fcntl(s1, F_GETFL, 0);
104 		assert(flags != -1);
105 
106 		flags |= O_NONBLOCK;
107 		res = fcntl(s1, F_SETFL, flags);
108 		assert(res != -1);
109 	}
110 
111 	struct io_uring m_io_uring;
112 	void *ret = NULL;
113 
114 	res = io_uring_queue_init(32, &m_io_uring, 0);
115 	assert(res >= 0);
116 
117 	int bytes_read = 0;
118 	int expected_byte = 0;
119 	int done = 0;
120 
121 	while (!done && bytes_read != 33) {
122 		char buff[RECV_BUFF_SIZE];
123 		struct iovec iov;
124 
125 		iov.iov_base = buff;
126 		iov.iov_len = sizeof(buff);
127 
128 		struct io_uring_sqe *sqe = io_uring_get_sqe(&m_io_uring);
129 		assert(sqe != NULL);
130 
131 		io_uring_prep_readv(sqe, s1, &iov, 1, 0);
132 
133 		res = io_uring_submit(&m_io_uring);
134 		assert(res != -1);
135 
136 		struct io_uring_cqe *cqe;
137 		unsigned head;
138 		unsigned count = 0;
139 
140 		while (!done && count != 1) {
141 			io_uring_for_each_cqe(&m_io_uring, head, cqe) {
142 				if (cqe->res < 0)
143 					assert(cqe->res == -EAGAIN);
144 				else {
145 					int i;
146 
147 					for (i = 0; i < cqe->res; i++) {
148 						if (buff[i] != expected_byte) {
149 							fprintf(stderr,
150 								"Received %d, wanted %d\n",
151 								buff[i], expected_byte);
152 							ret++;
153 							done = 1;
154 						 }
155 						 expected_byte++;
156 					}
157 					bytes_read += cqe->res;
158 				}
159 
160 				count++;
161 			}
162 
163 			assert(count <= 1);
164 			io_uring_cq_advance(&m_io_uring, count);
165 		}
166 	}
167 
168 	shutdown(s1, SHUT_RDWR);
169 	close(s1);
170 	close(s0);
171 	io_uring_queue_exit(&m_io_uring);
172 	return ret;
173 }
174 
snd(void * arg)175 static void *snd(void *arg)
176 {
177 	struct params *p = arg;
178 	int s0;
179 	int ret;
180 
181 	wait_for_rcv_ready();
182 
183 	if (p->tcp) {
184 		int val = 1;
185 
186 		s0 = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP);
187 		ret = setsockopt(s0, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
188 		assert(ret != -1);
189 
190 		struct sockaddr_in addr;
191 
192 		addr.sin_family = AF_INET;
193 		addr.sin_port = p->bind_port;
194 		addr.sin_addr.s_addr = inet_addr("127.0.0.1");
195 		ret = connect(s0, (struct sockaddr*) &addr, sizeof(addr));
196 		assert(ret != -1);
197 	} else {
198 		s0 = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
199 		assert(s0 != -1);
200 
201 		struct sockaddr_un addr;
202 		memset(&addr, 0, sizeof(addr));
203 
204 		addr.sun_family = AF_UNIX;
205 		memcpy(addr.sun_path, "\0sock", 6);
206 		ret = connect(s0, (struct sockaddr*) &addr, sizeof(addr));
207 		assert(ret != -1);
208 	}
209 
210 	if (p->non_blocking) {
211 		int flags = fcntl(s0, F_GETFL, 0);
212 		assert(flags != -1);
213 
214 		flags |= O_NONBLOCK;
215 		ret = fcntl(s0, F_SETFL, flags);
216 		assert(ret != -1);
217 	}
218 
219 	struct io_uring m_io_uring;
220 
221 	ret = io_uring_queue_init(32, &m_io_uring, 0);
222 	assert(ret >= 0);
223 
224 	int bytes_written = 0;
225 	int done = 0;
226 
227 	while (!done && bytes_written != 33) {
228 		char buff[SEND_BUFF_SIZE];
229 		int i;
230 
231 		for (i = 0; i < SEND_BUFF_SIZE; i++)
232 			buff[i] = i + bytes_written;
233 
234 		struct iovec iov;
235 
236 		iov.iov_base = buff;
237 		iov.iov_len = sizeof(buff);
238 
239 		struct io_uring_sqe *sqe = io_uring_get_sqe(&m_io_uring);
240 		assert(sqe != NULL);
241 
242 		io_uring_prep_writev(sqe, s0, &iov, 1, 0);
243 
244 		ret = io_uring_submit(&m_io_uring);
245 		assert(ret != -1);
246 
247 		struct io_uring_cqe *cqe;
248 		unsigned head;
249 		unsigned count = 0;
250 
251 		while (!done && count != 1) {
252 			io_uring_for_each_cqe(&m_io_uring, head, cqe) {
253 				if (cqe->res < 0) {
254 					if (cqe->res == -EPIPE) {
255 						done = 1;
256 						break;
257 					}
258 					assert(cqe->res == -EAGAIN);
259 				} else {
260 					bytes_written += cqe->res;
261 				}
262 
263 				count++;
264 			}
265 
266 			assert(count <= 1);
267 			io_uring_cq_advance(&m_io_uring, count);
268 		}
269 		usleep(100000);
270 	}
271 
272 	shutdown(s0, SHUT_RDWR);
273 	close(s0);
274 	io_uring_queue_exit(&m_io_uring);
275 	return NULL;
276 }
277 
main(int argc,char * argv[])278 int main(int argc, char *argv[])
279 {
280 	struct params p;
281 	pthread_t t1, t2;
282 	void *res1, *res2;
283 	int i, exit_val = T_EXIT_PASS;
284 
285 	if (argc > 1)
286 		return T_EXIT_SKIP;
287 
288 	for (i = 0; i < 4; i++) {
289 		p.tcp = i & 1;
290 		p.non_blocking = (i & 2) >> 1;
291 
292 		rcv_ready = 0;
293 
294 		pthread_create(&t1, NULL, rcv, &p);
295 		pthread_create(&t2, NULL, snd, &p);
296 		pthread_join(t1, &res1);
297 		pthread_join(t2, &res2);
298 		if (res1 || res2) {
299 			fprintf(stderr, "Failed tcp=%d, non_blocking=%d\n", p.tcp, p.non_blocking);
300 			exit_val = T_EXIT_FAIL;
301 		}
302 	}
303 
304 	return exit_val;
305 }
306