1 /* SPDX-License-Identifier: MIT */
2 /*
3 * Test MSG_WAITALL with datagram sockets, with a send splice into two.
4 */
5 #include <errno.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <unistd.h>
10 #include <arpa/inet.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <pthread.h>
14
15 #include "liburing.h"
16 #include "helpers.h"
17
18 #define MAX_MSG 128
19 #define HOST "127.0.0.1"
20 static __be16 bind_port;
21 struct recv_data {
22 pthread_barrier_t barrier;
23 int use_recvmsg;
24 struct msghdr msg;
25 };
26
recv_prep(struct io_uring * ring,struct iovec * iov,int * sock,struct recv_data * rd)27 static int recv_prep(struct io_uring *ring, struct iovec *iov, int *sock,
28 struct recv_data *rd)
29 {
30 struct sockaddr_in saddr;
31 struct io_uring_sqe *sqe;
32 int sockfd, ret, val;
33
34 memset(&saddr, 0, sizeof(saddr));
35 saddr.sin_family = AF_INET;
36 saddr.sin_addr.s_addr = htonl(INADDR_ANY);
37
38 sockfd = socket(AF_INET, SOCK_DGRAM, 0);
39 if (sockfd < 0) {
40 perror("socket");
41 return 1;
42 }
43
44 val = 1;
45 setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
46
47 if (t_bind_ephemeral_port(sockfd, &saddr)) {
48 perror("bind");
49 goto err;
50 }
51 bind_port = saddr.sin_port;
52
53 sqe = io_uring_get_sqe(ring);
54 if (!rd->use_recvmsg) {
55 io_uring_prep_recv(sqe, sockfd, iov->iov_base, iov->iov_len,
56 MSG_WAITALL);
57 } else {
58 struct msghdr *msg = &rd->msg;
59
60 memset(msg, 0, sizeof(*msg));
61 msg->msg_namelen = sizeof(struct sockaddr_in);
62 msg->msg_iov = iov;
63 msg->msg_iovlen = 1;
64 io_uring_prep_recvmsg(sqe, sockfd, msg, MSG_WAITALL);
65 }
66
67 sqe->user_data = 2;
68
69 ret = io_uring_submit(ring);
70 if (ret <= 0) {
71 fprintf(stderr, "submit failed: %d\n", ret);
72 goto err;
73 }
74
75 *sock = sockfd;
76 return 0;
77 err:
78 close(sockfd);
79 return 1;
80 }
81
do_recv(struct io_uring * ring)82 static int do_recv(struct io_uring *ring)
83 {
84 struct io_uring_cqe *cqe;
85 int ret;
86
87 ret = io_uring_wait_cqe(ring, &cqe);
88 if (ret) {
89 fprintf(stdout, "wait_cqe: %d\n", ret);
90 goto err;
91 }
92 if (cqe->res == -EINVAL) {
93 fprintf(stdout, "recv not supported, skipping\n");
94 return 0;
95 }
96 if (cqe->res < 0) {
97 fprintf(stderr, "failed cqe: %d\n", cqe->res);
98 goto err;
99 }
100 if (cqe->res != MAX_MSG * sizeof(int) / 2) {
101 fprintf(stderr, "got wrong length: %d\n", cqe->res);
102 goto err;
103 }
104
105 io_uring_cqe_seen(ring, cqe);
106 return 0;
107 err:
108 return 1;
109 }
110
recv_fn(void * data)111 static void *recv_fn(void *data)
112 {
113 struct recv_data *rd = data;
114 int buf[MAX_MSG];
115 struct iovec iov = {
116 .iov_base = buf,
117 .iov_len = sizeof(buf),
118 };
119 struct io_uring_params p = { };
120 struct io_uring ring;
121 int ret, sock;
122
123 ret = t_create_ring_params(1, &ring, &p);
124 if (ret == T_SETUP_SKIP) {
125 pthread_barrier_wait(&rd->barrier);
126 ret = 0;
127 goto err;
128 } else if (ret < 0) {
129 pthread_barrier_wait(&rd->barrier);
130 goto err;
131 }
132
133 ret = recv_prep(&ring, &iov, &sock, rd);
134 if (ret) {
135 fprintf(stderr, "recv_prep failed: %d\n", ret);
136 goto err;
137 }
138 pthread_barrier_wait(&rd->barrier);
139 ret = do_recv(&ring);
140 close(sock);
141 io_uring_queue_exit(&ring);
142 err:
143 return (void *)(intptr_t)ret;
144 }
145
do_send(void)146 static int do_send(void)
147 {
148 struct sockaddr_in saddr;
149 struct io_uring ring;
150 struct io_uring_cqe *cqe;
151 struct io_uring_sqe *sqe;
152 int sockfd, ret, i;
153 struct iovec iov;
154 int *buf;
155
156 ret = io_uring_queue_init(2, &ring, 0);
157 if (ret) {
158 fprintf(stderr, "queue init failed: %d\n", ret);
159 return 1;
160 }
161
162 buf = malloc(MAX_MSG * sizeof(int));
163 for (i = 0; i < MAX_MSG; i++)
164 buf[i] = i;
165
166 memset(&saddr, 0, sizeof(saddr));
167 saddr.sin_family = AF_INET;
168 saddr.sin_port = bind_port;
169 inet_pton(AF_INET, HOST, &saddr.sin_addr);
170
171 sockfd = socket(AF_INET, SOCK_DGRAM, 0);
172 if (sockfd < 0) {
173 perror("socket");
174 return 1;
175 }
176
177 ret = connect(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
178 if (ret < 0) {
179 perror("connect");
180 return 1;
181 }
182
183 iov.iov_base = buf;
184 iov.iov_len = MAX_MSG * sizeof(int) / 2;
185 for (i = 0; i < 2; i++) {
186 sqe = io_uring_get_sqe(&ring);
187 io_uring_prep_send(sqe, sockfd, iov.iov_base, iov.iov_len, 0);
188 sqe->user_data = 1;
189
190 ret = io_uring_submit(&ring);
191 if (ret <= 0) {
192 fprintf(stderr, "submit failed: %d\n", ret);
193 goto err;
194 }
195 usleep(10000);
196 iov.iov_base += iov.iov_len;
197 }
198
199 for (i = 0; i < 2; i++) {
200 ret = io_uring_wait_cqe(&ring, &cqe);
201 if (cqe->res == -EINVAL) {
202 fprintf(stdout, "send not supported, skipping\n");
203 close(sockfd);
204 return 0;
205 }
206 if (cqe->res != iov.iov_len) {
207 fprintf(stderr, "failed cqe: %d\n", cqe->res);
208 goto err;
209 }
210 io_uring_cqe_seen(&ring, cqe);
211 }
212
213 close(sockfd);
214 return 0;
215 err:
216 close(sockfd);
217 return 1;
218 }
219
test(int use_recvmsg)220 static int test(int use_recvmsg)
221 {
222 pthread_t recv_thread;
223 struct recv_data rd;
224 int ret;
225 void *retval;
226
227 pthread_barrier_init(&rd.barrier, NULL, 2);
228 rd.use_recvmsg = use_recvmsg;
229
230 ret = pthread_create(&recv_thread, NULL, recv_fn, &rd);
231 if (ret) {
232 fprintf(stderr, "Thread create failed: %d\n", ret);
233 return 1;
234 }
235
236 pthread_barrier_wait(&rd.barrier);
237 do_send();
238 pthread_join(recv_thread, &retval);
239 pthread_barrier_destroy(&rd.barrier);
240 return (intptr_t)retval;
241 }
242
main(int argc,char * argv[])243 int main(int argc, char *argv[])
244 {
245 int ret;
246
247 if (argc > 1)
248 return 0;
249
250 ret = test(0);
251 if (ret) {
252 fprintf(stderr, "test recv failed\n");
253 return ret;
254 }
255
256 ret = test(1);
257 if (ret) {
258 fprintf(stderr, "test recvmsg failed\n");
259 return ret;
260 }
261
262 return 0;
263 }
264