• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* SPDX-License-Identifier: MIT */
2 /*
3  * Simple ping/pong backend which can use the io_uring NAPI support.
4  *
5  * Needs to be run as root because it sets SCHED_FIFO scheduling class,
6  * but will work without that.
7  *
8  * Example:
9  *
10  * sudo examples/napi-busy-poll-server -l -a 192.168.2.2 -n100000 \
11  *	-p4444 -t10 -b -u
12  *
13  * will respond to 100k packages, using NAPI.
14  */
15 #include <ctype.h>
16 #include <errno.h>
17 #include <getopt.h>
18 #include <liburing.h>
19 #include <math.h>
20 #include <sched.h>
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/types.h>
25 #include <sys/socket.h>
26 #include <time.h>
27 #include <unistd.h>
28 #include <arpa/inet.h>
29 #include <netdb.h>
30 #include <netinet/in.h>
31 
32 #define MAXBUFLEN 100
33 #define PORTNOLEN 10
34 #define ADDRLEN   80
35 #define RINGSIZE  1024
36 
37 #define printable(ch) (isprint((unsigned char)ch) ? ch : '#')
38 
39 enum {
40 	IOURING_RECV,
41 	IOURING_SEND,
42 	IOURING_RECVMSG,
43 	IOURING_SENDMSG
44 };
45 
46 struct ctx
47 {
48 	struct io_uring     ring;
49 	union {
50 		struct sockaddr_in6 saddr6;
51 		struct sockaddr_in saddr;
52 	};
53 	struct iovec        iov;
54 	struct msghdr       msg;
55 
56 	int sockfd;
57 	int buffer_len;
58 	int num_pings;
59 	bool napi_check;
60 
61 	union {
62 		char buffer[MAXBUFLEN];
63 		struct timespec ts;
64 	};
65 };
66 
67 struct options
68 {
69 	int  num_pings;
70 	__u32 timeout;
71 
72 	bool listen;
73 	bool defer_tw;
74 	bool sq_poll;
75 	bool busy_loop;
76 	bool prefer_busy_poll;
77 	bool ipv6;
78 
79 	char port[PORTNOLEN];
80 	char addr[ADDRLEN];
81 };
82 
83 static struct options opt;
84 
85 static struct option longopts[] =
86 {
87 	{"address"  , 1, NULL, 'a'},
88 	{"busy"     , 0, NULL, 'b'},
89 	{"help"     , 0, NULL, 'h'},
90 	{"listen"   , 0, NULL, 'l'},
91 	{"num_pings", 1, NULL, 'n'},
92 	{"port"     , 1, NULL, 'p'},
93 	{"prefer"   , 1, NULL, 'u'},
94 	{"sqpoll"   , 0, NULL, 's'},
95 	{"timeout"  , 1, NULL, 't'},
96 	{NULL       , 0, NULL,  0 }
97 };
98 
printUsage(const char * name)99 static void printUsage(const char *name)
100 {
101 	fprintf(stderr,
102 	"Usage: %s [-l|--listen] [-a|--address ip_address] [-p|--port port-no] [-s|--sqpoll]"
103 	" [-b|--busy] [-n|--num pings] [-t|--timeout busy-poll-timeout] [-u|--prefer] [-6] [-h|--help]\n"
104 	" --listen\n"
105 	"-l        : Server mode\n"
106 	"--address\n"
107 	"-a        : remote or local ipv6 address\n"
108 	"--busy\n"
109 	"-b        : busy poll io_uring instead of blocking.\n"
110 	"--num_pings\n"
111 	"-n        : number of pings\n"
112 	"--port\n"
113 	"-p        : port\n"
114 	"--sqpoll\n"
115 	"-s        : Configure io_uring to use SQPOLL thread\n"
116 	"--timeout\n"
117 	"-t        : Configure NAPI busy poll timeout"
118 	"--prefer\n"
119 	"-u        : prefer NAPI busy poll\n"
120 	"-6        : use IPV6\n"
121 	"--help\n"
122 	"-h        : Display this usage message\n\n",
123 	name);
124 }
125 
printError(const char * msg,int opt)126 static void printError(const char *msg, int opt)
127 {
128 	if (msg && opt)
129 		fprintf(stderr, "%s (-%c)\n", msg, printable(opt));
130 }
131 
setProcessScheduler(void)132 static void setProcessScheduler(void)
133 {
134 	struct sched_param param;
135 
136 	param.sched_priority = sched_get_priority_max(SCHED_FIFO);
137 	if (sched_setscheduler(0, SCHED_FIFO, &param) < 0)
138 		fprintf(stderr, "sched_setscheduler() failed: (%d) %s\n",
139 			errno, strerror(errno));
140 }
141 
encodeUserData(char type,int fd)142 static uint64_t encodeUserData(char type, int fd)
143 {
144 	return (uint32_t)fd | ((__u64)type << 56);
145 }
146 
decodeUserData(uint64_t data,char * type,int * fd)147 static void decodeUserData(uint64_t data, char *type, int *fd)
148 {
149 	*type = data >> 56;
150 	*fd   = data & 0xffffffffU;
151 }
152 
opTypeToStr(char type)153 static const char *opTypeToStr(char type)
154 {
155 	const char *res;
156 
157 	switch (type) {
158 	case IOURING_RECV:
159 		res = "IOURING_RECV";
160 		break;
161 	case IOURING_SEND:
162 		res = "IOURING_SEND";
163 		break;
164 	case IOURING_RECVMSG:
165 		res = "IOURING_RECVMSG";
166 		break;
167 	case IOURING_SENDMSG:
168 		res = "IOURING_SENDMSG";
169 		break;
170 	default:
171 		res = "Unknown";
172 	}
173 
174 	return res;
175 }
176 
reportNapi(struct ctx * ctx)177 static void reportNapi(struct ctx *ctx)
178 {
179 	unsigned int napi_id = 0;
180 	socklen_t len = sizeof(napi_id);
181 
182 	getsockopt(ctx->sockfd, SOL_SOCKET, SO_INCOMING_NAPI_ID, &napi_id, &len);
183 	if (napi_id)
184 		printf(" napi id: %d\n", napi_id);
185 	else
186 		printf(" unassigned napi id\n");
187 
188 	ctx->napi_check = true;
189 }
190 
sendPing(struct ctx * ctx)191 static void sendPing(struct ctx *ctx)
192 {
193 	struct io_uring_sqe *sqe = io_uring_get_sqe(&ctx->ring);
194 
195 	io_uring_prep_sendmsg(sqe, ctx->sockfd, &ctx->msg, 0);
196 	sqe->user_data = encodeUserData(IOURING_SENDMSG, ctx->sockfd);
197 }
198 
receivePing(struct ctx * ctx)199 static void receivePing(struct ctx *ctx)
200 {
201 	struct io_uring_sqe *sqe;
202 
203 	bzero(&ctx->msg, sizeof(struct msghdr));
204 	if (opt.ipv6) {
205 		ctx->msg.msg_name    = &ctx->saddr6;
206 		ctx->msg.msg_namelen = sizeof(struct sockaddr_in6);
207 	} else {
208 		ctx->msg.msg_name    = &ctx->saddr;
209 		ctx->msg.msg_namelen = sizeof(struct sockaddr_in);
210 	}
211 	ctx->iov.iov_base    = ctx->buffer;
212 	ctx->iov.iov_len     = MAXBUFLEN;
213 	ctx->msg.msg_iov     = &ctx->iov;
214 	ctx->msg.msg_iovlen  = 1;
215 
216 	sqe = io_uring_get_sqe(&ctx->ring);
217 	io_uring_prep_recvmsg(sqe, ctx->sockfd, &ctx->msg, 0);
218 	sqe->user_data = encodeUserData(IOURING_RECVMSG, ctx->sockfd);
219 }
220 
completion(struct ctx * ctx,struct io_uring_cqe * cqe)221 static void completion(struct ctx *ctx, struct io_uring_cqe *cqe)
222 {
223 	char type;
224 	int  fd;
225 	int  res = cqe->res;
226 
227 	decodeUserData(cqe->user_data, &type, &fd);
228 	if (res < 0) {
229 		fprintf(stderr, "unexpected %s failure: (%d) %s\n",
230 			opTypeToStr(type), -res, strerror(-res));
231 		abort();
232 	}
233 
234 	switch (type) {
235 	case IOURING_SENDMSG:
236 		receivePing(ctx);
237 		--ctx->num_pings;
238 		break;
239 	case IOURING_RECVMSG:
240 		ctx->iov.iov_len = res;
241 		sendPing(ctx);
242 		if (!ctx->napi_check)
243 			reportNapi(ctx);
244 		break;
245 	default:
246 		fprintf(stderr, "unexpected %s completion\n",
247 			opTypeToStr(type));
248 		abort();
249 		break;
250 	}
251 }
252 
main(int argc,char * argv[])253 int main(int argc, char *argv[])
254 {
255 	int flag;
256 	struct ctx       ctx;
257 	struct __kernel_timespec *tsPtr;
258 	struct __kernel_timespec ts;
259 	struct io_uring_params params;
260 	struct io_uring_napi napi;
261 	int ret, af;
262 
263 	memset(&opt, 0, sizeof(struct options));
264 
265 	// Process flags.
266 	while ((flag = getopt_long(argc, argv, ":lhs:bua:n:p:t:6d:", longopts, NULL)) != -1) {
267 		switch (flag) {
268 		case 'a':
269 			strcpy(opt.addr, optarg);
270 			break;
271 		case 'b':
272 			opt.busy_loop = true;
273 			break;
274 		case 'h':
275 			printUsage(argv[0]);
276 			exit(0);
277 			break;
278 		case 'l':
279 			opt.listen = true;
280 			break;
281 		case 'n':
282 			opt.num_pings = atoi(optarg) + 1;
283 			break;
284 		case 'p':
285 			strcpy(opt.port, optarg);
286 			break;
287 		case 's':
288 			opt.sq_poll = !!atoi(optarg);
289 			break;
290 		case 't':
291 			opt.timeout = atoi(optarg);
292 			break;
293 		case 'u':
294 			opt.prefer_busy_poll = true;
295 			break;
296 		case '6':
297 			opt.ipv6 = true;
298 			break;
299 		case 'd':
300 			opt.defer_tw = !!atoi(optarg);
301 			break;
302 		case ':':
303 			printError("Missing argument", optopt);
304 			printUsage(argv[0]);
305 			exit(-1);
306 			break;
307 		case '?':
308 			printError("Unrecognized option", optopt);
309 			printUsage(argv[0]);
310 			exit(-1);
311 			break;
312 
313 		default:
314 			fprintf(stderr, "Fatal: Unexpected case in CmdLineProcessor switch()\n");
315 			exit(-1);
316 			break;
317 		}
318 	}
319 
320 	if (strlen(opt.addr) == 0) {
321 		fprintf(stderr, "address option is mandatory\n");
322 		printUsage(argv[0]);
323 		exit(1);
324 	}
325 
326 	if (opt.ipv6) {
327 		af = AF_INET6;
328 		ctx.saddr6.sin6_port   = htons(atoi(opt.port));
329 		ctx.saddr6.sin6_family = AF_INET6;
330 	} else {
331 		af = AF_INET;
332 		ctx.saddr.sin_port   = htons(atoi(opt.port));
333 		ctx.saddr.sin_family = AF_INET;
334 	}
335 
336 	if (opt.ipv6)
337 		ret = inet_pton(AF_INET6, opt.addr, &ctx.saddr6.sin6_addr);
338 	else
339 		ret = inet_pton(AF_INET, opt.addr, &ctx.saddr.sin_addr);
340 	if (ret <= 0) {
341 		fprintf(stderr, "inet_pton error for %s\n", optarg);
342 		printUsage(argv[0]);
343 		exit(1);
344 	}
345 
346 	// Connect to server.
347 	fprintf(stdout, "Listening %s : %s...\n", opt.addr, opt.port);
348 
349 	if ((ctx.sockfd = socket(af, SOCK_DGRAM, 0)) < 0) {
350 		fprintf(stderr, "socket() failed: (%d) %s\n", errno, strerror(errno));
351 		exit(1);
352 	}
353 
354 	if (opt.ipv6)
355 		ret = bind(ctx.sockfd, (struct sockaddr *)&ctx.saddr6, sizeof(struct sockaddr_in6));
356 	else
357 		ret = bind(ctx.sockfd, (struct sockaddr *)&ctx.saddr, sizeof(struct sockaddr_in));
358 	if (ret < 0) {
359 		fprintf(stderr, "bind() failed: (%d) %s\n", errno, strerror(errno));
360 		exit(1);
361 	}
362 
363 	// Setup ring.
364 	memset(&params, 0, sizeof(params));
365 	memset(&ts, 0, sizeof(ts));
366 	memset(&napi, 0, sizeof(napi));
367 
368 	params.flags = IORING_SETUP_SINGLE_ISSUER;
369 	if (opt.defer_tw) {
370 		params.flags |= IORING_SETUP_DEFER_TASKRUN;
371 	} else if (opt.sq_poll) {
372 		params.flags = IORING_SETUP_SQPOLL;
373 		params.sq_thread_idle = 50;
374 	} else {
375 		params.flags |= IORING_SETUP_COOP_TASKRUN;
376 	}
377 
378 	ret = io_uring_queue_init_params(RINGSIZE, &ctx.ring, &params);
379 	if (ret) {
380 		fprintf(stderr, "io_uring_queue_init_params() failed: (%d) %s\n",
381 			ret, strerror(-ret));
382 		exit(1);
383 	}
384 
385 	if (opt.timeout || opt.prefer_busy_poll) {
386 		napi.prefer_busy_poll = opt.prefer_busy_poll;
387 		napi.busy_poll_to = opt.timeout;
388 
389 		ret = io_uring_register_napi(&ctx.ring, &napi);
390 		if (ret) {
391 			fprintf(stderr, "io_uring_register_napi: %d\n", ret);
392 			exit(1);
393 		}
394 	}
395 
396 	if (opt.busy_loop)
397 		tsPtr = &ts;
398 	else
399 		tsPtr = NULL;
400 
401 	// Use realtime scheduler.
402 	setProcessScheduler();
403 
404 	// Copy payload.
405 	clock_gettime(CLOCK_REALTIME, &ctx.ts);
406 
407 	// Setup context.
408 	ctx.napi_check = false;
409 	ctx.buffer_len = sizeof(struct timespec);
410 	ctx.num_pings  = opt.num_pings;
411 
412 	// Receive initial message to get napi id.
413 	receivePing(&ctx);
414 
415 	while (ctx.num_pings != 0) {
416 		int res;
417 		unsigned int num_completed = 0;
418 		unsigned int head;
419 		struct io_uring_cqe *cqe;
420 
421 		do {
422 			res = io_uring_submit_and_wait_timeout(&ctx.ring, &cqe, 1, tsPtr, NULL);
423 			if (res >= 0)
424 				break;
425 			else if (res == -ETIME)
426 				continue;
427 			fprintf(stderr, "submit_and_wait: %d\n", res);
428 			exit(1);
429 		} while (1);
430 
431 		io_uring_for_each_cqe(&ctx.ring, head, cqe) {
432 			++num_completed;
433 			completion(&ctx, cqe);
434 		}
435 
436 		if (num_completed)
437 			io_uring_cq_advance(&ctx.ring, num_completed);
438 	}
439 
440 	// Clean up.
441 	if (opt.timeout || opt.prefer_busy_poll) {
442 		ret = io_uring_unregister_napi(&ctx.ring, &napi);
443 		if (ret)
444 			fprintf(stderr, "io_uring_unregister_napi: %d\n", ret);
445 	}
446 
447 	io_uring_queue_exit(&ctx.ring);
448 	close(ctx.sockfd);
449 	return 0;
450 }
451