1 /* SPDX-License-Identifier: MIT */
2 /*
3 * Test MSG_WAITALL with datagram sockets, with a send splice into two.
4 */
5 #include <errno.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <unistd.h>
10 #include <arpa/inet.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <pthread.h>
14
15 #include "liburing.h"
16 #include "helpers.h"
17
18 #define MAX_MSG 128
19 #define HOST "127.0.0.1"
20 static __be16 bind_port;
21 struct recv_data {
22 pthread_barrier_t barrier;
23 int use_recvmsg;
24 struct msghdr msg;
25 };
26
recv_prep(struct io_uring * ring,struct iovec * iov,int * sock,struct recv_data * rd)27 static int recv_prep(struct io_uring *ring, struct iovec *iov, int *sock,
28 struct recv_data *rd)
29 {
30 struct sockaddr_in saddr;
31 struct io_uring_sqe *sqe;
32 int sockfd, ret, val;
33
34 memset(&saddr, 0, sizeof(saddr));
35 saddr.sin_family = AF_INET;
36 saddr.sin_addr.s_addr = htonl(INADDR_ANY);
37
38 sockfd = socket(AF_INET, SOCK_DGRAM, 0);
39 if (sockfd < 0) {
40 perror("socket");
41 return 1;
42 }
43
44 val = 1;
45 setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
46
47 if (t_bind_ephemeral_port(sockfd, &saddr)) {
48 perror("bind");
49 goto err;
50 }
51 bind_port = saddr.sin_port;
52
53 sqe = io_uring_get_sqe(ring);
54 if (!rd->use_recvmsg) {
55 io_uring_prep_recv(sqe, sockfd, iov->iov_base, iov->iov_len,
56 MSG_WAITALL);
57 } else {
58 struct msghdr *msg = &rd->msg;
59
60 memset(msg, 0, sizeof(*msg));
61 msg->msg_namelen = sizeof(struct sockaddr_in);
62 msg->msg_iov = iov;
63 msg->msg_iovlen = 1;
64 io_uring_prep_recvmsg(sqe, sockfd, msg, MSG_WAITALL);
65 }
66
67 sqe->user_data = 2;
68
69 ret = io_uring_submit(ring);
70 if (ret <= 0) {
71 fprintf(stderr, "submit failed: %d\n", ret);
72 goto err;
73 }
74
75 *sock = sockfd;
76 return 0;
77 err:
78 close(sockfd);
79 return 1;
80 }
81
do_recv(struct io_uring * ring)82 static int do_recv(struct io_uring *ring)
83 {
84 struct io_uring_cqe *cqe;
85 int ret;
86
87 ret = io_uring_wait_cqe(ring, &cqe);
88 if (ret) {
89 fprintf(stdout, "wait_cqe: %d\n", ret);
90 goto err;
91 }
92 if (cqe->res == -EINVAL) {
93 fprintf(stdout, "recv not supported, skipping\n");
94 return 0;
95 }
96 if (cqe->res < 0) {
97 fprintf(stderr, "failed cqe: %d\n", cqe->res);
98 goto err;
99 }
100 if (cqe->res != MAX_MSG * sizeof(int) / 2) {
101 fprintf(stderr, "got wrong length: %d\n", cqe->res);
102 goto err;
103 }
104
105 io_uring_cqe_seen(ring, cqe);
106 return 0;
107 err:
108 return 1;
109 }
110
recv_fn(void * data)111 static void *recv_fn(void *data)
112 {
113 struct recv_data *rd = data;
114 int buf[MAX_MSG];
115 struct iovec iov = {
116 .iov_base = buf,
117 .iov_len = sizeof(buf),
118 };
119 struct io_uring_params p = { };
120 struct io_uring ring;
121 int ret, sock;
122
123 ret = t_create_ring_params(1, &ring, &p);
124 if (ret == T_SETUP_SKIP) {
125 pthread_barrier_wait(&rd->barrier);
126 ret = 0;
127 goto err;
128 } else if (ret < 0) {
129 pthread_barrier_wait(&rd->barrier);
130 goto err;
131 }
132
133 ret = recv_prep(&ring, &iov, &sock, rd);
134 if (ret) {
135 fprintf(stderr, "recv_prep failed: %d\n", ret);
136 goto err;
137 }
138 pthread_barrier_wait(&rd->barrier);
139 ret = do_recv(&ring);
140 close(sock);
141 io_uring_queue_exit(&ring);
142 err:
143 return (void *)(intptr_t)ret;
144 }
145
do_send(void)146 static int do_send(void)
147 {
148 struct sockaddr_in saddr;
149 struct io_uring ring;
150 struct io_uring_cqe *cqe;
151 struct io_uring_sqe *sqe;
152 int sockfd, ret, i;
153 struct iovec iov;
154 int *buf;
155
156 ret = io_uring_queue_init(2, &ring, 0);
157 if (ret) {
158 fprintf(stderr, "queue init failed: %d\n", ret);
159 return 1;
160 }
161
162 buf = malloc(MAX_MSG * sizeof(int));
163 for (i = 0; i < MAX_MSG; i++)
164 buf[i] = i;
165
166 memset(&saddr, 0, sizeof(saddr));
167 saddr.sin_family = AF_INET;
168 saddr.sin_port = bind_port;
169 inet_pton(AF_INET, HOST, &saddr.sin_addr);
170
171 sockfd = socket(AF_INET, SOCK_DGRAM, 0);
172 if (sockfd < 0) {
173 perror("socket");
174 free(buf);
175 return 1;
176 }
177
178 ret = connect(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
179 if (ret < 0) {
180 perror("connect");
181 free(buf);
182 return 1;
183 }
184
185 iov.iov_base = buf;
186 iov.iov_len = MAX_MSG * sizeof(int) / 2;
187 for (i = 0; i < 2; i++) {
188 sqe = io_uring_get_sqe(&ring);
189 io_uring_prep_send(sqe, sockfd, iov.iov_base, iov.iov_len, 0);
190 sqe->user_data = 1;
191
192 ret = io_uring_submit(&ring);
193 if (ret <= 0) {
194 fprintf(stderr, "submit failed: %d\n", ret);
195 goto err;
196 }
197 usleep(10000);
198 iov.iov_base += iov.iov_len;
199 }
200
201 for (i = 0; i < 2; i++) {
202 ret = io_uring_wait_cqe(&ring, &cqe);
203 if (cqe->res == -EINVAL) {
204 fprintf(stdout, "send not supported, skipping\n");
205 close(sockfd);
206 free(buf);
207 return 0;
208 }
209 if (cqe->res != iov.iov_len) {
210 fprintf(stderr, "failed cqe: %d\n", cqe->res);
211 goto err;
212 }
213 io_uring_cqe_seen(&ring, cqe);
214 }
215
216 close(sockfd);
217 free(buf);
218 return 0;
219 err:
220 close(sockfd);
221 free(buf);
222 return 1;
223 }
224
test(int use_recvmsg)225 static int test(int use_recvmsg)
226 {
227 pthread_t recv_thread;
228 struct recv_data rd;
229 int ret;
230 void *retval;
231
232 pthread_barrier_init(&rd.barrier, NULL, 2);
233 rd.use_recvmsg = use_recvmsg;
234
235 ret = pthread_create(&recv_thread, NULL, recv_fn, &rd);
236 if (ret) {
237 fprintf(stderr, "Thread create failed: %d\n", ret);
238 return 1;
239 }
240
241 pthread_barrier_wait(&rd.barrier);
242 do_send();
243 pthread_join(recv_thread, &retval);
244 pthread_barrier_destroy(&rd.barrier);
245 return (intptr_t)retval;
246 }
247
main(int argc,char * argv[])248 int main(int argc, char *argv[])
249 {
250 int ret;
251
252 if (argc > 1)
253 return 0;
254
255 ret = test(0);
256 if (ret) {
257 fprintf(stderr, "test recv failed\n");
258 return ret;
259 }
260
261 ret = test(1);
262 if (ret) {
263 fprintf(stderr, "test recvmsg failed\n");
264 return ret;
265 }
266
267 return 0;
268 }
269