• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* SPDX-License-Identifier: MIT */
2 /*
3  * Sample program that can act either as a packet sink, where it just receives
4  * packets and doesn't do anything with them, or it can act as a proxy where it
5  * receives packets and then sends them to a new destination. The proxy can
6  * be unidirectional (-B0), or bi-direction (-B1).
7  *
8  * Examples:
9  *
10  * Act as a proxy, listening on port 4444, and send data to 192.168.2.6 on port
11  * 4445. Use multishot receive, DEFER_TASKRUN, and fixed files
12  *
13  * 	./proxy -m1 -r4444 -H 192.168.2.6 -p4445
14  *
15  * Same as above, but utilize send bundles (-C1, requires -u1 send_ring) as well
16  * with ring provided send buffers, and recv bundles (-c1).
17  *
18  * 	./proxy -m1 -c1 -u1 -C1 -r4444 -H 192.168.2.6 -p4445
19  *
20  * Act as a bi-directional proxy, listening on port 8888, and send data back
21  * and forth between host and 192.168.2.6 on port 22. Use multishot receive,
22  * DEFER_TASKRUN, fixed files, and buffers of size 1500.
23  *
24  * 	./proxy -m1 -B1 -b1500 -r8888 -H 192.168.2.6 -p22
25  *
26  * Act a sink, listening on port 4445, using multishot receive, DEFER_TASKRUN,
27  * and fixed files:
28  *
29  * 	./proxy -m1 -s1 -r4445
30  *
31  * Run with -h to see a list of options, and their defaults.
32  *
33  * (C) 2024 Jens Axboe <axboe@kernel.dk>
34  *
35  */
36 #include <fcntl.h>
37 #include <stdint.h>
38 #include <netinet/in.h>
39 #include <netinet/tcp.h>
40 #include <arpa/inet.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #include <sys/socket.h>
45 #include <sys/time.h>
46 #include <unistd.h>
47 #include <sys/mman.h>
48 #include <linux/mman.h>
49 #include <locale.h>
50 #include <assert.h>
51 #include <pthread.h>
52 #include <liburing.h>
53 
54 #include "proxy.h"
55 #include "helpers.h"
56 
57 /*
58  * Will go away once/if bundles are upstreamed and we put the generic
59  * definitions in the kernel header.
60  */
61 #ifndef IORING_RECVSEND_BUNDLE
62 #define IORING_RECVSEND_BUNDLE		(1U << 4)
63 #endif
64 #ifndef IORING_FEAT_SEND_BUF_SELECT
65 #define IORING_FEAT_SEND_BUF_SELECT	(1U << 14)
66 #endif
67 
68 static int cur_bgid = 1;
69 static int nr_conns;
70 static int open_conns;
71 static long page_size;
72 
73 static unsigned long event_loops;
74 static unsigned long events;
75 
76 static int recv_mshot = 1;
77 static int sqpoll;
78 static int defer_tw = 1;
79 static int is_sink;
80 static int fixed_files = 1;
81 static char *host = "192.168.3.2";
82 static int send_port = 4445;
83 static int receive_port = 4444;
84 static int buf_size = 32;
85 static int buf_ring_inc;
86 static int bidi;
87 static int ipv6;
88 static int napi;
89 static int napi_timeout;
90 static int wait_batch = 1;
91 static int wait_usec = 1000000;
92 static int rcv_msg;
93 static int snd_msg;
94 static int snd_zc;
95 static int send_ring = -1;
96 static int snd_bundle;
97 static int rcv_bundle;
98 static int use_huge;
99 static int ext_stat;
100 static int verbose;
101 
102 static int nr_bufs = 256;
103 static int br_mask;
104 
105 static int ring_size = 128;
106 
107 static pthread_mutex_t thread_lock;
108 static struct timeval last_housekeeping;
109 
110 /*
111  * For sendmsg/recvmsg. recvmsg just has a single vec, sendmsg will have
112  * two vecs - one that is currently submitted and being sent, and one that
113  * is being prepared. When a new sendmsg is issued, we'll swap which one we
114  * use. For send, even though we don't pass in the iovec itself, we use the
115  * vec to serialize the sends to avoid reordering.
116  */
117 struct msg_vec {
118 	struct iovec *iov;
119 	/* length of allocated vec */
120 	int vec_size;
121 	/* length currently being used */
122 	int iov_len;
123 	/* only for send, current index we're processing */
124 	int cur_iov;
125 };
126 
127 struct io_msg {
128 	struct msghdr msg;
129 	struct msg_vec vecs[2];
130 	/* current msg_vec being prepared */
131 	int vec_index;
132 };
133 
134 /*
135  * Per socket stats per connection. For bi-directional, we'll have both
136  * sends and receives on each socket, this helps track them separately.
137  * For sink or one directional, each of the two stats will be only sends
138  * or receives, not both.
139  */
140 struct conn_dir {
141 	int index;
142 
143 	int pending_shutdown;
144 	int pending_send;
145 	int pending_recv;
146 
147 	int snd_notif;
148 
149 	int out_buffers;
150 
151 	int rcv, rcv_shrt, rcv_enobufs, rcv_mshot;
152 	int snd, snd_shrt, snd_enobufs, snd_busy, snd_mshot;
153 
154 	int snd_next_bid;
155 	int rcv_next_bid;
156 
157 	int *rcv_bucket;
158 	int *snd_bucket;
159 
160 	unsigned long in_bytes, out_bytes;
161 
162 	/* only ever have a single recv pending */
163 	struct io_msg io_rcv_msg;
164 
165 	/* one send that is inflight, and one being prepared for the next one */
166 	struct io_msg io_snd_msg;
167 };
168 
169 enum {
170 	CONN_F_STARTED		= 1,
171 	CONN_F_DISCONNECTING	= 2,
172 	CONN_F_DISCONNECTED	= 4,
173 	CONN_F_PENDING_SHUTDOWN	= 8,
174 	CONN_F_STATS_SHOWN	= 16,
175 	CONN_F_END_TIME		= 32,
176 	CONN_F_REAPED		= 64,
177 };
178 
179 /*
180  * buffer ring belonging to a connection
181  */
182 struct conn_buf_ring {
183 	struct io_uring_buf_ring *br;
184 	void *buf;
185 	int bgid;
186 };
187 
188 struct conn {
189 	struct io_uring ring;
190 
191 	/* receive side buffer ring, new data arrives here */
192 	struct conn_buf_ring in_br;
193 	/* if send_ring is used, outgoing data to send */
194 	struct conn_buf_ring out_br;
195 
196 	int tid;
197 	int in_fd, out_fd;
198 	int pending_cancels;
199 	int flags;
200 
201 	struct conn_dir cd[2];
202 
203 	struct timeval start_time, end_time;
204 
205 	union {
206 		struct sockaddr_in addr;
207 		struct sockaddr_in6 addr6;
208 	};
209 
210 	pthread_t thread;
211 	pthread_barrier_t startup_barrier;
212 };
213 
214 #define MAX_CONNS	1024
215 static struct conn conns[MAX_CONNS];
216 
217 #define vlog(str, ...) do {						\
218 	if (verbose)							\
219 		printf(str, ##__VA_ARGS__);				\
220 } while (0)
221 
222 static int prep_next_send(struct io_uring *ring, struct conn *c,
223 			  struct conn_dir *cd, int fd);
224 static void *thread_main(void *data);
225 
cqe_to_conn(struct io_uring_cqe * cqe)226 static struct conn *cqe_to_conn(struct io_uring_cqe *cqe)
227 {
228 	struct userdata ud = { .val = cqe->user_data };
229 
230 	return &conns[ud.op_tid & TID_MASK];
231 }
232 
cqe_to_conn_dir(struct conn * c,struct io_uring_cqe * cqe)233 static struct conn_dir *cqe_to_conn_dir(struct conn *c,
234 					struct io_uring_cqe *cqe)
235 {
236 	int fd = cqe_to_fd(cqe);
237 
238 	return &c->cd[fd != c->in_fd];
239 }
240 
other_dir_fd(struct conn * c,int fd)241 static int other_dir_fd(struct conn *c, int fd)
242 {
243 	if (c->in_fd == fd)
244 		return c->out_fd;
245 	return c->in_fd;
246 }
247 
248 /* currently active msg_vec */
msg_vec(struct io_msg * imsg)249 static struct msg_vec *msg_vec(struct io_msg *imsg)
250 {
251 	return &imsg->vecs[imsg->vec_index];
252 }
253 
snd_msg_vec(struct conn_dir * cd)254 static struct msg_vec *snd_msg_vec(struct conn_dir *cd)
255 {
256 	return msg_vec(&cd->io_snd_msg);
257 }
258 
259 /*
260  * Goes from accept new connection -> create socket, connect to end
261  * point, prepare recv, on receive do send (unless sink). If either ends
262  * disconnects, we transition to shutdown and then close.
263  */
264 enum {
265 	__ACCEPT	= 1,
266 	__SOCK		= 2,
267 	__CONNECT	= 3,
268 	__RECV		= 4,
269 	__RECVMSG	= 5,
270 	__SEND		= 6,
271 	__SENDMSG	= 7,
272 	__SHUTDOWN	= 8,
273 	__CANCEL	= 9,
274 	__CLOSE		= 10,
275 	__FD_PASS	= 11,
276 	__NOP		= 12,
277 	__STOP		= 13,
278 };
279 
280 struct error_handler {
281 	const char *name;
282 	int (*error_fn)(struct error_handler *, struct io_uring *, struct io_uring_cqe *);
283 };
284 
285 static int recv_error(struct error_handler *err, struct io_uring *ring,
286 		      struct io_uring_cqe *cqe);
287 static int send_error(struct error_handler *err, struct io_uring *ring,
288 		      struct io_uring_cqe *cqe);
289 
default_error(struct error_handler * err,struct io_uring * ring,struct io_uring_cqe * cqe)290 static int default_error(struct error_handler *err,
291 			 struct io_uring __attribute__((__unused__)) *ring,
292 			 struct io_uring_cqe *cqe)
293 {
294 	struct conn *c = cqe_to_conn(cqe);
295 
296 	fprintf(stderr, "%d: %s error %s\n", c->tid, err->name, strerror(-cqe->res));
297 	fprintf(stderr, "fd=%d, bid=%d\n", cqe_to_fd(cqe), cqe_to_bid(cqe));
298 	return 1;
299 }
300 
301 /*
302  * Move error handling out of the normal handling path, cleanly separating
303  * them. If an opcode doesn't need any error handling, set it to NULL. If
304  * it wants to stop the connection at that point and not do anything else,
305  * then the default handler can be used. Only receive has proper error
306  * handling, as we can get -ENOBUFS which is not a fatal condition. It just
307  * means we need to wait on buffer replenishing before re-arming the receive.
308  */
309 static struct error_handler error_handlers[] = {
310 	{ .name = "NULL",	.error_fn = NULL, },
311 	{ .name = "ACCEPT",	.error_fn = default_error, },
312 	{ .name = "SOCK",	.error_fn = default_error, },
313 	{ .name = "CONNECT",	.error_fn = default_error, },
314 	{ .name = "RECV",	.error_fn = recv_error, },
315 	{ .name = "RECVMSG",	.error_fn = recv_error, },
316 	{ .name = "SEND",	.error_fn = send_error, },
317 	{ .name = "SENDMSG",	.error_fn = send_error, },
318 	{ .name = "SHUTDOWN",	.error_fn = NULL, },
319 	{ .name = "CANCEL",	.error_fn = NULL, },
320 	{ .name = "CLOSE",	.error_fn = NULL, },
321 	{ .name = "FD_PASS",	.error_fn = default_error, },
322 	{ .name = "NOP",	.error_fn = NULL, },
323 	{ .name = "STOP",	.error_fn = default_error, },
324 };
325 
free_buffer_ring(struct io_uring * ring,struct conn_buf_ring * cbr)326 static void free_buffer_ring(struct io_uring *ring, struct conn_buf_ring *cbr)
327 {
328 	if (!cbr->br)
329 		return;
330 
331 	io_uring_free_buf_ring(ring, cbr->br, nr_bufs, cbr->bgid);
332 	cbr->br = NULL;
333 	if (use_huge)
334 		munmap(cbr->buf, buf_size * nr_bufs);
335 	else
336 		free(cbr->buf);
337 }
338 
free_buffer_rings(struct io_uring * ring,struct conn * c)339 static void free_buffer_rings(struct io_uring *ring, struct conn *c)
340 {
341 	free_buffer_ring(ring, &c->in_br);
342 	free_buffer_ring(ring, &c->out_br);
343 }
344 
345 /*
346  * Setup a ring provided buffer ring for each connection. If we get -ENOBUFS
347  * on receive, for multishot receive we'll wait for half the provided buffers
348  * to be returned by pending sends, then re-arm the multishot receive. If
349  * this happens too frequently (see enobufs= stat), then the ring size is
350  * likely too small. Use -nXX to make it bigger. See recv_enobufs().
351  *
352  * The alternative here would be to use the older style provided buffers,
353  * where you simply setup a buffer group and use SQEs with
354  * io_urign_prep_provide_buffers() to add to the pool. But that approach is
355  * slower and has been deprecated by using the faster ring provided buffers.
356  */
setup_recv_ring(struct io_uring * ring,struct conn * c)357 static int setup_recv_ring(struct io_uring *ring, struct conn *c)
358 {
359 	struct conn_buf_ring *cbr = &c->in_br;
360 	int br_flags = 0;
361 	int ret, i;
362 	size_t len;
363 	void *ptr;
364 
365 	len = buf_size * nr_bufs;
366 	if (use_huge) {
367 		cbr->buf = mmap(NULL, len, PROT_READ|PROT_WRITE,
368 				MAP_PRIVATE|MAP_HUGETLB|MAP_HUGE_2MB|MAP_ANONYMOUS,
369 				-1, 0);
370 		if (cbr->buf == MAP_FAILED) {
371 			perror("mmap");
372 			return 1;
373 		}
374 	} else {
375 		if (posix_memalign(&cbr->buf, page_size, len)) {
376 			perror("posix memalign");
377 			return 1;
378 		}
379 	}
380 	if (buf_ring_inc)
381 		br_flags = IOU_PBUF_RING_INC;
382 	cbr->br = io_uring_setup_buf_ring(ring, nr_bufs, cbr->bgid, br_flags, &ret);
383 	if (!cbr->br) {
384 		fprintf(stderr, "Buffer ring register failed %d\n", ret);
385 		return 1;
386 	}
387 
388 	ptr = cbr->buf;
389 	for (i = 0; i < nr_bufs; i++) {
390 		vlog("%d: add bid %d, data %p\n", c->tid, i, ptr);
391 		io_uring_buf_ring_add(cbr->br, ptr, buf_size, i, br_mask, i);
392 		ptr += buf_size;
393 	}
394 	io_uring_buf_ring_advance(cbr->br, nr_bufs);
395 	printf("%d: recv buffer ring bgid %d, bufs %d\n", c->tid, cbr->bgid, nr_bufs);
396 	return 0;
397 }
398 
399 /*
400  * If 'send_ring' is used and the kernel supports it, we can skip serializing
401  * sends as the data will be ordered regardless. This reduces the send handling
402  * complexity, as buffers can always be added to the outgoing ring and will be
403  * processed in the order in which they were added.
404  */
setup_send_ring(struct io_uring * ring,struct conn * c)405 static int setup_send_ring(struct io_uring *ring, struct conn *c)
406 {
407 	struct conn_buf_ring *cbr = &c->out_br;
408 	int br_flags = 0;
409 	int ret;
410 
411 	if (buf_ring_inc)
412 		br_flags = IOU_PBUF_RING_INC;
413 	cbr->br = io_uring_setup_buf_ring(ring, nr_bufs, cbr->bgid, br_flags, &ret);
414 	if (!cbr->br) {
415 		fprintf(stderr, "Buffer ring register failed %d\n", ret);
416 		return 1;
417 	}
418 
419 	printf("%d: send buffer ring bgid %d, bufs %d\n", c->tid, cbr->bgid, nr_bufs);
420 	return 0;
421 }
422 
setup_send_zc(struct io_uring * ring,struct conn * c)423 static int setup_send_zc(struct io_uring *ring, struct conn *c)
424 {
425 	struct iovec *iovs;
426 	void *buf;
427 	int i, ret;
428 
429 	if (snd_msg)
430 		return 0;
431 
432 	buf = c->in_br.buf;
433 	iovs = calloc(nr_bufs, sizeof(struct iovec));
434 	for (i = 0; i < nr_bufs; i++) {
435 		iovs[i].iov_base = buf;
436 		iovs[i].iov_len = buf_size;
437 		buf += buf_size;
438 	}
439 
440 	ret = io_uring_register_buffers(ring, iovs, nr_bufs);
441 	if (ret) {
442 		fprintf(stderr, "failed registering buffers: %d\n", ret);
443 		free(iovs);
444 		return ret;
445 	}
446 	free(iovs);
447 	return 0;
448 }
449 
450 /*
451  * Setup an input and output buffer ring.
452  */
setup_buffer_rings(struct io_uring * ring,struct conn * c)453 static int setup_buffer_rings(struct io_uring *ring, struct conn *c)
454 {
455 	int ret;
456 
457 	/* no locking needed on cur_bgid, parent serializes setup */
458 	c->in_br.bgid = cur_bgid++;
459 	c->out_br.bgid = cur_bgid++;
460 	c->out_br.br = NULL;
461 
462 	ret = setup_recv_ring(ring, c);
463 	if (ret)
464 		return ret;
465 	if (is_sink)
466 		return 0;
467 	if (snd_zc) {
468 		ret = setup_send_zc(ring, c);
469 		if (ret)
470 			return ret;
471 	}
472 	if (send_ring) {
473 		ret = setup_send_ring(ring, c);
474 		if (ret) {
475 			free_buffer_ring(ring, &c->in_br);
476 			return ret;
477 		}
478 	}
479 
480 	return 0;
481 }
482 
483 struct bucket_stat {
484 	int nr_packets;
485 	int count;
486 };
487 
stat_cmp(const void * p1,const void * p2)488 static int stat_cmp(const void *p1, const void *p2)
489 {
490 	const struct bucket_stat *b1 = p1;
491 	const struct bucket_stat *b2 = p2;
492 
493 	if (b1->count < b2->count)
494 		return 1;
495 	else if (b1->count > b2->count)
496 		return -1;
497 	return 0;
498 }
499 
show_buckets(struct conn_dir * cd)500 static void show_buckets(struct conn_dir *cd)
501 {
502 	unsigned long snd_total, rcv_total;
503 	struct bucket_stat *rstat, *sstat;
504 	int i;
505 
506 	if (!cd->rcv_bucket || !cd->snd_bucket)
507 		return;
508 
509 	rstat = calloc(nr_bufs + 1, sizeof(struct bucket_stat));
510 	sstat = calloc(nr_bufs + 1, sizeof(struct bucket_stat));
511 
512 	snd_total = rcv_total = 0;
513 	for (i = 0; i <= nr_bufs; i++) {
514 		snd_total += cd->snd_bucket[i];
515 		sstat[i].nr_packets = i;
516 		sstat[i].count = cd->snd_bucket[i];
517 		rcv_total += cd->rcv_bucket[i];
518 		rstat[i].nr_packets = i;
519 		rstat[i].count = cd->rcv_bucket[i];
520 	}
521 
522 	if (!snd_total && !rcv_total) {
523 		free(sstat);
524 		free(rstat);
525 	}
526 	if (snd_total)
527 		qsort(sstat, nr_bufs, sizeof(struct bucket_stat), stat_cmp);
528 	if (rcv_total)
529 		qsort(rstat, nr_bufs, sizeof(struct bucket_stat), stat_cmp);
530 
531 	printf("\t Packets per recv/send:\n");
532 	for (i = 0; i <= nr_bufs; i++) {
533 		double snd_prc = 0.0, rcv_prc = 0.0;
534 		if (!rstat[i].count && !sstat[i].count)
535 			continue;
536 		if (rstat[i].count)
537 			rcv_prc = 100.0 * (rstat[i].count / (double) rcv_total);
538 		if (sstat[i].count)
539 			snd_prc = 100.0 * (sstat[i].count / (double) snd_total);
540 		printf("\t bucket(%3d/%3d): rcv=%u (%.2f%%) snd=%u (%.2f%%)\n",
541 				rstat[i].nr_packets, sstat[i].nr_packets,
542 				rstat[i].count, rcv_prc,
543 				sstat[i].count, snd_prc);
544 	}
545 
546 	free(sstat);
547 	free(rstat);
548 }
549 
__show_stats(struct conn * c)550 static void __show_stats(struct conn *c)
551 {
552 	unsigned long msec, qps;
553 	unsigned long bytes, bw;
554 	struct conn_dir *cd;
555 	int i;
556 
557 	if (c->flags & (CONN_F_STATS_SHOWN | CONN_F_REAPED))
558 		return;
559 	if (!(c->flags & CONN_F_STARTED))
560 		return;
561 
562 	if (!(c->flags & CONN_F_END_TIME))
563 		gettimeofday(&c->end_time, NULL);
564 
565 	msec = (c->end_time.tv_sec - c->start_time.tv_sec) * 1000;
566 	msec += (c->end_time.tv_usec - c->start_time.tv_usec) / 1000;
567 
568 	qps = 0;
569 	for (i = 0; i < 2; i++)
570 		qps += c->cd[i].rcv + c->cd[i].snd;
571 
572 	if (!qps)
573 		return;
574 
575 	if (msec)
576 		qps = (qps * 1000) / msec;
577 
578 	printf("Conn %d/(in_fd=%d, out_fd=%d): qps=%lu, msec=%lu\n", c->tid,
579 					c->in_fd, c->out_fd, qps, msec);
580 
581 	bytes = 0;
582 	for (i = 0; i < 2; i++) {
583 		cd = &c->cd[i];
584 
585 		if (!cd->in_bytes && !cd->out_bytes && !cd->snd && !cd->rcv)
586 			continue;
587 
588 		bytes += cd->in_bytes;
589 		bytes += cd->out_bytes;
590 
591 		printf("\t%3d: rcv=%u (short=%u, enobufs=%d), snd=%u (short=%u,"
592 			" busy=%u, enobufs=%d)\n", i, cd->rcv, cd->rcv_shrt,
593 			cd->rcv_enobufs, cd->snd, cd->snd_shrt, cd->snd_busy,
594 			cd->snd_enobufs);
595 		printf("\t   : in_bytes=%lu (Kb %lu), out_bytes=%lu (Kb %lu)\n",
596 			cd->in_bytes, cd->in_bytes >> 10,
597 			cd->out_bytes, cd->out_bytes >> 10);
598 		printf("\t   : mshot_rcv=%d, mshot_snd=%d\n", cd->rcv_mshot,
599 			cd->snd_mshot);
600 		show_buckets(cd);
601 
602 	}
603 	if (msec) {
604 		bytes *= 8UL;
605 		bw = bytes / 1000;
606 		bw /= msec;
607 		printf("\tBW=%'luMbit\n", bw);
608 	}
609 
610 	c->flags |= CONN_F_STATS_SHOWN;
611 }
612 
show_stats(void)613 static void show_stats(void)
614 {
615 	float events_per_loop = 0.0;
616 	static int stats_shown;
617 	int i;
618 
619 	if (stats_shown)
620 		return;
621 
622 	if (events)
623 		events_per_loop = (float) events / (float) event_loops;
624 
625 	printf("Event loops: %lu, events %lu, events per loop %.2f\n", event_loops,
626 							events, events_per_loop);
627 
628 	for (i = 0; i < MAX_CONNS; i++) {
629 		struct conn *c = &conns[i];
630 
631 		__show_stats(c);
632 	}
633 	stats_shown = 1;
634 }
635 
sig_int(int sig)636 static void sig_int(int __attribute__((__unused__)) sig)
637 {
638 	printf("\n");
639 	show_stats();
640 	exit(1);
641 }
642 
643 /*
644  * Special cased for SQPOLL only, as we don't control when SQEs are consumed if
645  * that is used. Hence we may need to wait for the SQPOLL thread to keep up
646  * until we can get a new SQE. All other cases will break immediately, with a
647  * fresh SQE.
648  *
649  * If we grossly undersized our SQ ring, getting a NULL sqe can happen even
650  * for the !SQPOLL case if we're handling a lot of CQEs in our event loop
651  * and multishot isn't used. We can do io_uring_submit() to flush what we
652  * have here. Only caveat here is that if linked requests are used, SQEs
653  * would need to be allocated upfront as a link chain is only valid within
654  * a single submission cycle.
655  */
get_sqe(struct io_uring * ring)656 static struct io_uring_sqe *get_sqe(struct io_uring *ring)
657 {
658 	struct io_uring_sqe *sqe;
659 
660 	do {
661 		sqe = io_uring_get_sqe(ring);
662 		if (sqe)
663 			break;
664 		if (!sqpoll)
665 			io_uring_submit(ring);
666 		else
667 			io_uring_sqring_wait(ring);
668 	} while (1);
669 
670 	return sqe;
671 }
672 
673 /*
674  * See __encode_userdata() for how we encode sqe->user_data, which is passed
675  * back as cqe->user_data at completion time.
676  */
encode_userdata(struct io_uring_sqe * sqe,struct conn * c,int op,int bid,int fd)677 static void encode_userdata(struct io_uring_sqe *sqe, struct conn *c, int op,
678 			    int bid, int fd)
679 {
680 	__encode_userdata(sqe, c->tid, op, bid, fd);
681 }
682 
__submit_receive(struct io_uring * ring,struct conn * c,struct conn_dir * cd,int fd)683 static void __submit_receive(struct io_uring *ring, struct conn *c,
684 			     struct conn_dir *cd, int fd)
685 {
686 	struct conn_buf_ring *cbr = &c->in_br;
687 	struct io_uring_sqe *sqe;
688 
689 	vlog("%d: submit receive fd=%d\n", c->tid, fd);
690 
691 	assert(!cd->pending_recv);
692 	cd->pending_recv = 1;
693 
694 	/*
695 	 * For both recv and multishot receive, we use the ring provided
696 	 * buffers. These are handed to the application ahead of time, and
697 	 * are consumed when a receive triggers. Note that the address and
698 	 * length of the receive are set to NULL/0, and we assign the
699 	 * sqe->buf_group to tell the kernel which buffer group ID to pick
700 	 * a buffer from. Finally, IOSQE_BUFFER_SELECT is set to tell the
701 	 * kernel that we want a buffer picked for this request, we are not
702 	 * passing one in with the request.
703 	 */
704 	sqe = get_sqe(ring);
705 	if (rcv_msg) {
706 		struct io_msg *imsg = &cd->io_rcv_msg;
707 		struct msghdr *msg = &imsg->msg;
708 
709 		memset(msg, 0, sizeof(*msg));
710 		msg->msg_iov = msg_vec(imsg)->iov;
711 		msg->msg_iovlen = msg_vec(imsg)->iov_len;
712 
713 		if (recv_mshot) {
714 			cd->rcv_mshot++;
715 			io_uring_prep_recvmsg_multishot(sqe, fd, &imsg->msg, 0);
716 		} else {
717 			io_uring_prep_recvmsg(sqe, fd, &imsg->msg, 0);
718 		}
719 	} else {
720 		if (recv_mshot) {
721 			cd->rcv_mshot++;
722 			io_uring_prep_recv_multishot(sqe, fd, NULL, 0, 0);
723 		} else {
724 			io_uring_prep_recv(sqe, fd, NULL, 0, 0);
725 		}
726 	}
727 	encode_userdata(sqe, c, __RECV, 0, fd);
728 	sqe->buf_group = cbr->bgid;
729 	sqe->flags |= IOSQE_BUFFER_SELECT;
730 	if (fixed_files)
731 		sqe->flags |= IOSQE_FIXED_FILE;
732 	if (rcv_bundle)
733 		sqe->ioprio |= IORING_RECVSEND_BUNDLE;
734 }
735 
736 /*
737  * One directional just arms receive on our in_fd
738  */
submit_receive(struct io_uring * ring,struct conn * c)739 static void submit_receive(struct io_uring *ring, struct conn *c)
740 {
741 	__submit_receive(ring, c, &c->cd[0], c->in_fd);
742 }
743 
744 /*
745  * Bi-directional arms receive on both in and out fd
746  */
submit_bidi_receive(struct io_uring * ring,struct conn * c)747 static void submit_bidi_receive(struct io_uring *ring, struct conn *c)
748 {
749 	__submit_receive(ring, c, &c->cd[0], c->in_fd);
750 	__submit_receive(ring, c, &c->cd[1], c->out_fd);
751 }
752 
753 /*
754  * We hit -ENOBUFS, which means that we ran out of buffers in our current
755  * provided buffer group. This can happen if there's an imbalance between the
756  * receives coming in and the sends being processed, particularly with multishot
757  * receive as they can trigger very quickly. If this happens, defer arming a
758  * new receive until we've replenished half of the buffer pool by processing
759  * pending sends.
760  */
recv_enobufs(struct io_uring * ring,struct conn * c,struct conn_dir * cd,int fd)761 static void recv_enobufs(struct io_uring *ring, struct conn *c,
762 			 struct conn_dir *cd, int fd)
763 {
764 	vlog("%d: enobufs hit\n", c->tid);
765 
766 	cd->rcv_enobufs++;
767 
768 	/*
769 	 * If we're a sink, mark rcv as rearm. If we're not, then mark us as
770 	 * needing a rearm for receive and send. The completing send will
771 	 * kick the recv rearm.
772 	 */
773 	if (!is_sink) {
774 		int do_recv_arm = 1;
775 
776 		if (!cd->pending_send)
777 			do_recv_arm = !prep_next_send(ring, c, cd, fd);
778 		if (do_recv_arm)
779 			__submit_receive(ring, c, &c->cd[0], c->in_fd);
780 	} else {
781 		__submit_receive(ring, c, &c->cd[0], c->in_fd);
782 	}
783 }
784 
785 /*
786  * Kill this socket - submit a shutdown and link a close to it. We don't
787  * care about shutdown status, so mark it as not needing to post a CQE unless
788  * it fails.
789  */
queue_shutdown_close(struct io_uring * ring,struct conn * c,int fd)790 static void queue_shutdown_close(struct io_uring *ring, struct conn *c, int fd)
791 {
792 	struct io_uring_sqe *sqe1, *sqe2;
793 
794 	/*
795 	 * On the off chance that we run out of SQEs after the first one,
796 	 * grab two upfront. This it to prevent our link not working if
797 	 * get_sqe() ends up doing submissions to free up an SQE, as links
798 	 * are not valid across separate submissions.
799 	 */
800 	sqe1 = get_sqe(ring);
801 	sqe2 = get_sqe(ring);
802 
803 	io_uring_prep_shutdown(sqe1, fd, SHUT_RDWR);
804 	if (fixed_files)
805 		sqe1->flags |= IOSQE_FIXED_FILE;
806 	sqe1->flags |= IOSQE_IO_LINK | IOSQE_CQE_SKIP_SUCCESS;
807 	encode_userdata(sqe1, c, __SHUTDOWN, 0, fd);
808 
809 	if (fixed_files)
810 		io_uring_prep_close_direct(sqe2, fd);
811 	else
812 		io_uring_prep_close(sqe2, fd);
813 	encode_userdata(sqe2, c, __CLOSE, 0, fd);
814 }
815 
816 /*
817  * This connection is going away, queue a cancel for any pending recv, for
818  * example, we have pending for this ring. For completeness, we issue a cancel
819  * for any request we have pending for both in_fd and out_fd.
820  */
queue_cancel(struct io_uring * ring,struct conn * c)821 static void queue_cancel(struct io_uring *ring, struct conn *c)
822 {
823 	struct io_uring_sqe *sqe;
824 	int flags = 0;
825 
826 	if (fixed_files)
827 		flags |= IORING_ASYNC_CANCEL_FD_FIXED;
828 
829 	sqe = get_sqe(ring);
830 	io_uring_prep_cancel_fd(sqe, c->in_fd, flags);
831 	encode_userdata(sqe, c, __CANCEL, 0, c->in_fd);
832 	c->pending_cancels++;
833 
834 	if (c->out_fd != -1) {
835 		sqe = get_sqe(ring);
836 		io_uring_prep_cancel_fd(sqe, c->out_fd, flags);
837 		encode_userdata(sqe, c, __CANCEL, 0, c->out_fd);
838 		c->pending_cancels++;
839 	}
840 
841 	io_uring_submit(ring);
842 }
843 
pending_shutdown(struct conn * c)844 static int pending_shutdown(struct conn *c)
845 {
846 	return c->cd[0].pending_shutdown + c->cd[1].pending_shutdown;
847 }
848 
should_shutdown(struct conn * c)849 static bool should_shutdown(struct conn *c)
850 {
851 	int i;
852 
853 	if (!pending_shutdown(c))
854 		return false;
855 	if (is_sink)
856 		return true;
857 	if (!bidi)
858 		return c->cd[0].in_bytes == c->cd[1].out_bytes;
859 
860 	for (i = 0; i < 2; i++) {
861 		if (c->cd[0].rcv != c->cd[1].snd)
862 			return false;
863 		if (c->cd[1].rcv != c->cd[0].snd)
864 			return false;
865 	}
866 
867 	return true;
868 }
869 
870 /*
871  * Close this connection - send a ring message to the connection with intent
872  * to stop. When the client gets the message, it will initiate the stop.
873  */
__close_conn(struct io_uring * ring,struct conn * c)874 static void __close_conn(struct io_uring *ring, struct conn *c)
875 {
876 	struct io_uring_sqe *sqe;
877 	uint64_t user_data;
878 
879 	printf("Client %d: queueing stop\n", c->tid);
880 
881 	user_data = __raw_encode(c->tid, __STOP, 0, 0);
882 	sqe = io_uring_get_sqe(ring);
883 	io_uring_prep_msg_ring(sqe, c->ring.ring_fd, 0, user_data, 0);
884 	encode_userdata(sqe, c, __NOP, 0, 0);
885 	io_uring_submit(ring);
886 }
887 
close_cd(struct conn * c,struct conn_dir * cd)888 static void close_cd(struct conn *c, struct conn_dir *cd)
889 {
890 	cd->pending_shutdown = 1;
891 
892 	if (cd->pending_send)
893 		return;
894 
895 	if (!(c->flags & CONN_F_PENDING_SHUTDOWN)) {
896 		gettimeofday(&c->end_time, NULL);
897 		c->flags |= CONN_F_PENDING_SHUTDOWN | CONN_F_END_TIME;
898 	}
899 }
900 
901 /*
902  * We're done with this buffer, add it back to our pool so the kernel is
903  * free to use it again.
904  */
replenish_buffer(struct conn_buf_ring * cbr,int bid,int offset)905 static int replenish_buffer(struct conn_buf_ring *cbr, int bid, int offset)
906 {
907 	void *this_buf = cbr->buf + bid * buf_size;
908 
909 	assert(bid < nr_bufs);
910 
911 	io_uring_buf_ring_add(cbr->br, this_buf, buf_size, bid, br_mask, offset);
912 	return buf_size;
913 }
914 
915 /*
916  * Iterate buffers from '*bid' and with a total size of 'bytes' and add them
917  * back to our receive ring so they can be reused for new receives.
918  */
replenish_buffers(struct conn * c,int * bid,int bytes)919 static int replenish_buffers(struct conn *c, int *bid, int bytes)
920 {
921 	struct conn_buf_ring *cbr = &c->in_br;
922 	int nr_packets = 0;
923 
924 	while (bytes) {
925 		int this_len = replenish_buffer(cbr, *bid, nr_packets);
926 
927 		if (this_len > bytes)
928 			this_len = bytes;
929 		bytes -= this_len;
930 
931 		*bid = (*bid + 1) & (nr_bufs - 1);
932 		nr_packets++;
933 	}
934 
935 	io_uring_buf_ring_advance(cbr->br, nr_packets);
936 	return nr_packets;
937 }
938 
free_mvec(struct msg_vec * mvec)939 static void free_mvec(struct msg_vec *mvec)
940 {
941 	free(mvec->iov);
942 	mvec->iov = NULL;
943 }
944 
init_mvec(struct msg_vec * mvec)945 static void init_mvec(struct msg_vec *mvec)
946 {
947 	memset(mvec, 0, sizeof(*mvec));
948 	mvec->iov = malloc(sizeof(struct iovec));
949 	mvec->vec_size = 1;
950 }
951 
init_msgs(struct conn_dir * cd)952 static void init_msgs(struct conn_dir *cd)
953 {
954 	memset(&cd->io_snd_msg, 0, sizeof(cd->io_snd_msg));
955 	memset(&cd->io_rcv_msg, 0, sizeof(cd->io_rcv_msg));
956 	init_mvec(&cd->io_snd_msg.vecs[0]);
957 	init_mvec(&cd->io_snd_msg.vecs[1]);
958 	init_mvec(&cd->io_rcv_msg.vecs[0]);
959 }
960 
free_msgs(struct conn_dir * cd)961 static void free_msgs(struct conn_dir *cd)
962 {
963 	free_mvec(&cd->io_snd_msg.vecs[0]);
964 	free_mvec(&cd->io_snd_msg.vecs[1]);
965 	free_mvec(&cd->io_rcv_msg.vecs[0]);
966 }
967 
968 /*
969  * Multishot accept completion triggered. If we're acting as a sink, we're
970  * good to go. Just issue a receive for that case. If we're acting as a proxy,
971  * then start opening a socket that we can use to connect to the other end.
972  */
handle_accept(struct io_uring * ring,struct io_uring_cqe * cqe)973 static int handle_accept(struct io_uring *ring, struct io_uring_cqe *cqe)
974 {
975 	struct conn *c;
976 	int i;
977 
978 	if (nr_conns == MAX_CONNS) {
979 		fprintf(stderr, "max clients reached %d\n", nr_conns);
980 		return 1;
981 	}
982 
983 	/* main thread handles this, which is obviously serialized */
984 	c = &conns[nr_conns];
985 	c->tid = nr_conns++;
986 	c->in_fd = -1;
987 	c->out_fd = -1;
988 
989 	for (i = 0; i < 2; i++) {
990 		struct conn_dir *cd = &c->cd[i];
991 
992 		cd->index = i;
993 		cd->snd_next_bid = -1;
994 		cd->rcv_next_bid = -1;
995 		if (ext_stat) {
996 			cd->rcv_bucket = calloc(nr_bufs + 1, sizeof(int));
997 			cd->snd_bucket = calloc(nr_bufs + 1, sizeof(int));
998 		}
999 		init_msgs(cd);
1000 	}
1001 
1002 	printf("New client: id=%d, in=%d\n", c->tid, c->in_fd);
1003 	gettimeofday(&c->start_time, NULL);
1004 
1005 	pthread_barrier_init(&c->startup_barrier, NULL, 2);
1006 	pthread_create(&c->thread, NULL, thread_main, c);
1007 
1008 	/*
1009 	 * Wait for thread to have its ring setup, then either assign the fd
1010 	 * if it's non-fixed, or pass the fixed one
1011 	 */
1012 	pthread_barrier_wait(&c->startup_barrier);
1013 	if (!fixed_files) {
1014 		c->in_fd = cqe->res;
1015 	} else {
1016 		struct io_uring_sqe *sqe;
1017 		uint64_t user_data;
1018 
1019 		/*
1020 		 * Ring has just been setup, we'll use index 0 as the descriptor
1021 		 * value.
1022 		 */
1023 		user_data = __raw_encode(c->tid, __FD_PASS, 0, 0);
1024 		sqe = io_uring_get_sqe(ring);
1025 		io_uring_prep_msg_ring_fd(sqe, c->ring.ring_fd, cqe->res, 0,
1026 						user_data, 0);
1027 		encode_userdata(sqe, c, __NOP, 0, cqe->res);
1028 	}
1029 
1030 	return 0;
1031 }
1032 
1033 /*
1034  * Our socket request completed, issue a connect request to the other end.
1035  */
handle_sock(struct io_uring * ring,struct io_uring_cqe * cqe)1036 static int handle_sock(struct io_uring *ring, struct io_uring_cqe *cqe)
1037 {
1038 	struct conn *c = cqe_to_conn(cqe);
1039 	struct io_uring_sqe *sqe;
1040 	int ret;
1041 
1042 	vlog("%d: sock: res=%d\n", c->tid, cqe->res);
1043 
1044 	c->out_fd = cqe->res;
1045 
1046 	if (ipv6) {
1047 		memset(&c->addr6, 0, sizeof(c->addr6));
1048 		c->addr6.sin6_family = AF_INET6;
1049 		c->addr6.sin6_port = htons(send_port);
1050 		ret = inet_pton(AF_INET6, host, &c->addr6.sin6_addr);
1051 	} else {
1052 		memset(&c->addr, 0, sizeof(c->addr));
1053 		c->addr.sin_family = AF_INET;
1054 		c->addr.sin_port = htons(send_port);
1055 		ret = inet_pton(AF_INET, host, &c->addr.sin_addr);
1056 	}
1057 	if (ret <= 0) {
1058 		if (!ret)
1059 			fprintf(stderr, "host not in right format\n");
1060 		else
1061 			perror("inet_pton");
1062 		return 1;
1063 	}
1064 
1065 	sqe = get_sqe(ring);
1066 	if (ipv6) {
1067 		io_uring_prep_connect(sqe, c->out_fd,
1068 					(struct sockaddr *) &c->addr6,
1069 					sizeof(c->addr6));
1070 	} else {
1071 		io_uring_prep_connect(sqe, c->out_fd,
1072 					(struct sockaddr *) &c->addr,
1073 					sizeof(c->addr));
1074 	}
1075 	encode_userdata(sqe, c, __CONNECT, 0, c->out_fd);
1076 	if (fixed_files)
1077 		sqe->flags |= IOSQE_FIXED_FILE;
1078 	return 0;
1079 }
1080 
1081 /*
1082  * Connection to the other end is done, submit a receive to start receiving
1083  * data. If we're a bidirectional proxy, issue a receive on both ends. If not,
1084  * then just a single recv will do.
1085  */
handle_connect(struct io_uring * ring,struct io_uring_cqe * cqe)1086 static int handle_connect(struct io_uring *ring, struct io_uring_cqe *cqe)
1087 {
1088 	struct conn *c = cqe_to_conn(cqe);
1089 
1090 	pthread_mutex_lock(&thread_lock);
1091 	open_conns++;
1092 	pthread_mutex_unlock(&thread_lock);
1093 
1094 	if (bidi)
1095 		submit_bidi_receive(ring, c);
1096 	else
1097 		submit_receive(ring, c);
1098 
1099 	return 0;
1100 }
1101 
1102 /*
1103  * Append new segment to our currently active msg_vec. This will be submitted
1104  * as a sendmsg (with all of it), or as separate sends, later. If we're using
1105  * send_ring, then we won't hit this path. Instead, outgoing buffers are
1106  * added directly to our outgoing send buffer ring.
1107  */
send_append_vec(struct conn_dir * cd,void * data,int len)1108 static void send_append_vec(struct conn_dir *cd, void *data, int len)
1109 {
1110 	struct msg_vec *mvec = snd_msg_vec(cd);
1111 
1112 	if (mvec->iov_len == mvec->vec_size) {
1113 		mvec->vec_size <<= 1;
1114 		mvec->iov = realloc(mvec->iov, mvec->vec_size * sizeof(struct iovec));
1115 	}
1116 
1117 	mvec->iov[mvec->iov_len].iov_base = data;
1118 	mvec->iov[mvec->iov_len].iov_len = len;
1119 	mvec->iov_len++;
1120 }
1121 
1122 /*
1123  * Queue a send based on the data received in this cqe, which came from
1124  * a completed receive operation.
1125  */
send_append(struct conn * c,struct conn_dir * cd,void * data,int bid,int len)1126 static void send_append(struct conn *c, struct conn_dir *cd, void *data,
1127 			int bid, int len)
1128 {
1129 	vlog("%d: send %d (%p, bid %d)\n", c->tid, len, data, bid);
1130 
1131 	assert(bid < nr_bufs);
1132 
1133 	/* if using provided buffers for send, add it upfront */
1134 	if (send_ring) {
1135 		struct conn_buf_ring *cbr = &c->out_br;
1136 
1137 		io_uring_buf_ring_add(cbr->br, data, len, bid, br_mask, 0);
1138 		io_uring_buf_ring_advance(cbr->br, 1);
1139 	} else {
1140 		send_append_vec(cd, data, len);
1141 	}
1142 }
1143 
1144 /*
1145  * For non recvmsg && multishot, a zero receive marks the end. For recvmsg
1146  * with multishot, we always get the header regardless. Hence a "zero receive"
1147  * is the size of the header.
1148  */
recv_done_res(int res)1149 static int recv_done_res(int res)
1150 {
1151 	if (!res)
1152 		return 1;
1153 	if (rcv_msg && recv_mshot && res == sizeof(struct io_uring_recvmsg_out))
1154 		return 1;
1155 	return 0;
1156 }
1157 
recv_inc(struct conn * c,struct conn_dir * cd,int * bid,struct io_uring_cqe * cqe)1158 static int recv_inc(struct conn *c, struct conn_dir *cd, int *bid,
1159 		    struct io_uring_cqe *cqe)
1160 {
1161 	struct conn_buf_ring *cbr = &c->out_br;
1162 	struct conn_buf_ring *in_cbr = &c->in_br;
1163 	void *data;
1164 
1165 	if (!cqe->res)
1166 		return 0;
1167 	if (cqe->flags & IORING_CQE_F_BUF_MORE)
1168 		return 0;
1169 
1170 	data = in_cbr->buf + *bid * buf_size;
1171 	if (is_sink) {
1172 		io_uring_buf_ring_add(in_cbr->br, data, buf_size, *bid, br_mask, 0);
1173 		io_uring_buf_ring_advance(in_cbr->br, 1);
1174 	} else if (send_ring) {
1175 		io_uring_buf_ring_add(cbr->br, data, buf_size, *bid, br_mask, 0);
1176 		io_uring_buf_ring_advance(cbr->br, 1);
1177 	} else {
1178 		send_append(c, cd, data, *bid, buf_size);
1179 	}
1180 	*bid = (*bid + 1) & (nr_bufs - 1);
1181 	return 1;
1182 }
1183 
1184 /*
1185  * Any receive that isn't recvmsg with multishot can be handled the same way.
1186  * Iterate from '*bid' and 'in_bytes' in total, and append the data to the
1187  * outgoing queue.
1188  */
recv_bids(struct conn * c,struct conn_dir * cd,int * bid,int in_bytes)1189 static int recv_bids(struct conn *c, struct conn_dir *cd, int *bid, int in_bytes)
1190 {
1191 	struct conn_buf_ring *cbr = &c->out_br;
1192 	struct conn_buf_ring *in_cbr = &c->in_br;
1193 	struct io_uring_buf *buf;
1194 	int nr_packets = 0;
1195 
1196 	while (in_bytes) {
1197 		int this_bytes;
1198 		void *data;
1199 
1200 		buf = &in_cbr->br->bufs[*bid];
1201 		data = (void *) (unsigned long) buf->addr;
1202 		this_bytes = buf->len;
1203 		if (this_bytes > in_bytes)
1204 			this_bytes = in_bytes;
1205 
1206 		in_bytes -= this_bytes;
1207 
1208 		if (send_ring)
1209 			io_uring_buf_ring_add(cbr->br, data, this_bytes, *bid,
1210 						br_mask, nr_packets);
1211 		else
1212 			send_append(c, cd, data, *bid, this_bytes);
1213 
1214 		*bid = (*bid + 1) & (nr_bufs - 1);
1215 		nr_packets++;
1216 	}
1217 
1218 	if (send_ring)
1219 		io_uring_buf_ring_advance(cbr->br, nr_packets);
1220 
1221 	return nr_packets;
1222 }
1223 
1224 /*
1225  * Special handling of recvmsg with multishot
1226  */
recv_mshot_msg(struct conn * c,struct conn_dir * cd,int * bid,int in_bytes)1227 static int recv_mshot_msg(struct conn *c, struct conn_dir *cd, int *bid,
1228 			  int in_bytes)
1229 {
1230 	struct conn_buf_ring *cbr = &c->out_br;
1231 	struct conn_buf_ring *in_cbr = &c->in_br;
1232 	struct io_uring_buf *buf;
1233 	int nr_packets = 0;
1234 
1235 	while (in_bytes) {
1236 		struct io_uring_recvmsg_out *pdu;
1237 		int this_bytes;
1238 		void *data;
1239 
1240 		buf = &in_cbr->br->bufs[*bid];
1241 
1242 		/*
1243 		 * multishot recvmsg puts a header in front of the data - we
1244 		 * have to take that into account for the send setup, and
1245 		 * adjust the actual data read to not take this metadata into
1246 		 * account. For this use case, namelen and controllen will not
1247 		 * be set. If they were, they would need to be factored in too.
1248 		 */
1249 		buf->len -= sizeof(struct io_uring_recvmsg_out);
1250 		in_bytes -= sizeof(struct io_uring_recvmsg_out);
1251 
1252 		pdu = (void *) (unsigned long) buf->addr;
1253 		vlog("pdu namelen %d, controllen %d, payload %d flags %x\n",
1254 				pdu->namelen, pdu->controllen, pdu->payloadlen,
1255 				pdu->flags);
1256 		data = (void *) (pdu + 1);
1257 
1258 		this_bytes = pdu->payloadlen;
1259 		if (this_bytes > in_bytes)
1260 			this_bytes = in_bytes;
1261 
1262 		in_bytes -= this_bytes;
1263 
1264 		if (send_ring)
1265 			io_uring_buf_ring_add(cbr->br, data, this_bytes, *bid,
1266 						br_mask, nr_packets);
1267 		else
1268 			send_append(c, cd, data, *bid, this_bytes);
1269 
1270 		*bid = (*bid + 1) & (nr_bufs - 1);
1271 		nr_packets++;
1272 	}
1273 
1274 	if (send_ring)
1275 		io_uring_buf_ring_advance(cbr->br, nr_packets);
1276 
1277 	return nr_packets;
1278 }
1279 
__handle_recv(struct io_uring * ring,struct conn * c,struct conn_dir * cd,struct io_uring_cqe * cqe)1280 static int __handle_recv(struct io_uring *ring, struct conn *c,
1281 			 struct conn_dir *cd, struct io_uring_cqe *cqe)
1282 {
1283 	struct conn_dir *ocd = &c->cd[!cd->index];
1284 	int bid, nr_packets;
1285 
1286 	/*
1287 	 * Not having a buffer attached should only happen if we get a zero
1288 	 * sized receive, because the other end closed the connection. It
1289 	 * cannot happen otherwise, as all our receives are using provided
1290 	 * buffers and hence it's not possible to return a CQE with a non-zero
1291 	 * result and not have a buffer attached.
1292 	 */
1293 	if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
1294 		cd->pending_recv = 0;
1295 
1296 		if (!recv_done_res(cqe->res)) {
1297 			fprintf(stderr, "no buffer assigned, res=%d\n", cqe->res);
1298 			return 1;
1299 		}
1300 start_close:
1301 		prep_next_send(ring, c, ocd, other_dir_fd(c, cqe_to_fd(cqe)));
1302 		close_cd(c, cd);
1303 		return 0;
1304 	}
1305 
1306 	if (cqe->res && cqe->res < buf_size)
1307 		cd->rcv_shrt++;
1308 
1309 	bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
1310 
1311 	/*
1312 	 * BIDI will use the same buffer pool and do receive on both CDs,
1313 	 * so can't reliably check. TODO.
1314 	 */
1315 	if (!bidi && cd->rcv_next_bid != -1 && bid != cd->rcv_next_bid) {
1316 		fprintf(stderr, "recv bid %d, wanted %d\n", bid, cd->rcv_next_bid);
1317 		goto start_close;
1318 	}
1319 
1320 	vlog("%d: recv: bid=%d, res=%d, cflags=%x\n", c->tid, bid, cqe->res, cqe->flags);
1321 	/*
1322 	 * If we're a sink, we're done here. Just replenish the buffer back
1323 	 * to the pool. For proxy mode, we will send the data to the other
1324 	 * end and the buffer will be replenished once the send is done with
1325 	 * it.
1326 	 */
1327 	if (buf_ring_inc)
1328 		nr_packets = recv_inc(c, ocd, &bid, cqe);
1329 	else if (is_sink)
1330 		nr_packets = replenish_buffers(c, &bid, cqe->res);
1331 	else if (rcv_msg && recv_mshot)
1332 		nr_packets = recv_mshot_msg(c, ocd, &bid, cqe->res);
1333 	else
1334 		nr_packets = recv_bids(c, ocd, &bid, cqe->res);
1335 
1336 	if (cd->rcv_bucket)
1337 		cd->rcv_bucket[nr_packets]++;
1338 
1339 	if (!is_sink) {
1340 		ocd->out_buffers += nr_packets;
1341 		assert(ocd->out_buffers <= nr_bufs);
1342 	}
1343 
1344 	cd->rcv++;
1345 	cd->rcv_next_bid = bid;
1346 
1347 	/*
1348 	 * If IORING_CQE_F_MORE isn't set, then this is either a normal recv
1349 	 * that needs rearming, or it's a multishot that won't post any further
1350 	 * completions. Setup a new one for these cases.
1351 	 */
1352 	if (!(cqe->flags & IORING_CQE_F_MORE)) {
1353 		cd->pending_recv = 0;
1354 		if (recv_done_res(cqe->res))
1355 			goto start_close;
1356 		if (is_sink || !ocd->pending_send)
1357 			__submit_receive(ring, c, &c->cd[0], c->in_fd);
1358 	}
1359 
1360 	/*
1361 	 * Submit a send if we won't get anymore notifications from this
1362 	 * recv, or if we have nr_bufs / 2 queued up. If BIDI mode, send
1363 	 * every buffer. We assume this is interactive mode, and hence don't
1364 	 * delay anything.
1365 	 */
1366 	if (((!ocd->pending_send && (bidi || (ocd->out_buffers >= nr_bufs / 2))) ||
1367 	    !(cqe->flags & IORING_CQE_F_MORE)) && !is_sink)
1368 		prep_next_send(ring, c, ocd, other_dir_fd(c, cqe_to_fd(cqe)));
1369 
1370 	if (!recv_done_res(cqe->res))
1371 		cd->in_bytes += cqe->res;
1372 	return 0;
1373 }
1374 
handle_recv(struct io_uring * ring,struct io_uring_cqe * cqe)1375 static int handle_recv(struct io_uring *ring, struct io_uring_cqe *cqe)
1376 {
1377 	struct conn *c = cqe_to_conn(cqe);
1378 	struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
1379 
1380 	return __handle_recv(ring, c, cd, cqe);
1381 }
1382 
recv_error(struct error_handler * err,struct io_uring * ring,struct io_uring_cqe * cqe)1383 static int recv_error(struct error_handler *err, struct io_uring *ring,
1384 		      struct io_uring_cqe *cqe)
1385 {
1386 	struct conn *c = cqe_to_conn(cqe);
1387 	struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
1388 
1389 	cd->pending_recv = 0;
1390 
1391 	if (cqe->res != -ENOBUFS)
1392 		return default_error(err, ring, cqe);
1393 
1394 	recv_enobufs(ring, c, cd, other_dir_fd(c, cqe_to_fd(cqe)));
1395 	return 0;
1396 }
1397 
submit_send(struct io_uring * ring,struct conn * c,struct conn_dir * cd,int fd,void * data,int len,int bid,int flags)1398 static void submit_send(struct io_uring *ring, struct conn *c,
1399 			struct conn_dir *cd, int fd, void *data, int len,
1400 			int bid, int flags)
1401 {
1402 	struct io_uring_sqe *sqe;
1403 	int bgid = c->out_br.bgid;
1404 
1405 	if (cd->pending_send)
1406 		return;
1407 	cd->pending_send = 1;
1408 
1409 	flags |= MSG_WAITALL | MSG_NOSIGNAL;
1410 
1411 	sqe = get_sqe(ring);
1412 	if (snd_msg) {
1413 		struct io_msg *imsg = &cd->io_snd_msg;
1414 
1415 		if (snd_zc) {
1416 			io_uring_prep_sendmsg_zc(sqe, fd, &imsg->msg, flags);
1417 			cd->snd_notif++;
1418 		} else {
1419 			io_uring_prep_sendmsg(sqe, fd, &imsg->msg, flags);
1420 		}
1421 	} else if (send_ring) {
1422 		io_uring_prep_send(sqe, fd, NULL, 0, flags);
1423 	} else if (!snd_zc) {
1424 		io_uring_prep_send(sqe, fd, data, len, flags);
1425 	} else {
1426 		io_uring_prep_send_zc(sqe, fd, data, len, flags, 0);
1427 		sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
1428 		sqe->buf_index = bid;
1429 		cd->snd_notif++;
1430 	}
1431 	encode_userdata(sqe, c, __SEND, bid, fd);
1432 	if (fixed_files)
1433 		sqe->flags |= IOSQE_FIXED_FILE;
1434 	if (send_ring) {
1435 		sqe->flags |= IOSQE_BUFFER_SELECT;
1436 		sqe->buf_group = bgid;
1437 	}
1438 	if (snd_bundle) {
1439 		sqe->ioprio |= IORING_RECVSEND_BUNDLE;
1440 		cd->snd_mshot++;
1441 	} else if (send_ring)
1442 		cd->snd_mshot++;
1443 }
1444 
1445 /*
1446  * Prepare the next send request, if we need to. If one is already pending,
1447  * or if we're a sink and we don't need to do sends, then there's nothing
1448  * to do.
1449  *
1450  * Return 1 if another send completion is expected, 0 if not.
1451  */
prep_next_send(struct io_uring * ring,struct conn * c,struct conn_dir * cd,int fd)1452 static int prep_next_send(struct io_uring *ring, struct conn *c,
1453 			   struct conn_dir *cd, int fd)
1454 {
1455 	int bid;
1456 
1457 	if (cd->pending_send || is_sink)
1458 		return 0;
1459 	if (!cd->out_buffers)
1460 		return 0;
1461 
1462 	bid = cd->snd_next_bid;
1463 	if (bid == -1)
1464 		bid = 0;
1465 
1466 	if (send_ring) {
1467 		/*
1468 		 * send_ring mode is easy, there's nothing to do but submit
1469 		 * our next send request. That will empty the entire outgoing
1470 		 * queue.
1471 		 */
1472 		submit_send(ring, c, cd, fd, NULL, 0, bid, 0);
1473 		return 1;
1474 	} else if (snd_msg) {
1475 		/*
1476 		 * For sendmsg mode, submit our currently prepared iovec, if
1477 		 * we have one, and swap our iovecs so that any further
1478 		 * receives will start preparing that one.
1479 		 */
1480 		struct io_msg *imsg = &cd->io_snd_msg;
1481 
1482 		if (!msg_vec(imsg)->iov_len)
1483 			return 0;
1484 		imsg->msg.msg_iov = msg_vec(imsg)->iov;
1485 		imsg->msg.msg_iovlen = msg_vec(imsg)->iov_len;
1486 		msg_vec(imsg)->iov_len = 0;
1487 		imsg->vec_index = !imsg->vec_index;
1488 		submit_send(ring, c, cd, fd, NULL, 0, bid, 0);
1489 		return 1;
1490 	} else {
1491 		/*
1492 		 * send without send_ring - submit the next available vec,
1493 		 * if any. If this vec is the last one in the current series,
1494 		 * then swap to the next vec. We flag each send with MSG_MORE,
1495 		 * unless this is the last part of the current vec.
1496 		 */
1497 		struct io_msg *imsg = &cd->io_snd_msg;
1498 		struct msg_vec *mvec = msg_vec(imsg);
1499 		int flags = !snd_zc ? MSG_MORE : 0;
1500 		struct iovec *iov;
1501 
1502 		if (mvec->iov_len == mvec->cur_iov)
1503 			return 0;
1504 		imsg->msg.msg_iov = msg_vec(imsg)->iov;
1505 		iov = &mvec->iov[mvec->cur_iov];
1506 		mvec->cur_iov++;
1507 		if (mvec->cur_iov == mvec->iov_len) {
1508 			mvec->iov_len = 0;
1509 			mvec->cur_iov = 0;
1510 			imsg->vec_index = !imsg->vec_index;
1511 			flags = 0;
1512 		}
1513 		submit_send(ring, c, cd, fd, iov->iov_base, iov->iov_len, bid, flags);
1514 		return 1;
1515 	}
1516 }
1517 
handle_send_inc(struct conn * c,struct conn_dir * cd,int bid,struct io_uring_cqe * cqe)1518 static int handle_send_inc(struct conn *c, struct conn_dir *cd, int bid,
1519 			   struct io_uring_cqe *cqe)
1520 {
1521 	struct conn_buf_ring *in_cbr = &c->in_br;
1522 	int ret = 0;
1523 	void *data;
1524 
1525 	if (!cqe->res)
1526 		goto out;
1527 	if (cqe->flags & IORING_CQE_F_BUF_MORE)
1528 		return 0;
1529 
1530 	assert(cqe->res <= buf_size);
1531 	cd->out_bytes += cqe->res;
1532 
1533 	data = in_cbr->buf + bid * buf_size;
1534 	io_uring_buf_ring_add(in_cbr->br, data, buf_size, bid, br_mask, 0);
1535 	io_uring_buf_ring_advance(in_cbr->br, 1);
1536 	bid = (bid + 1) & (nr_bufs - 1);
1537 	ret = 1;
1538 out:
1539 	if (pending_shutdown(c))
1540 		close_cd(c, cd);
1541 
1542 	return ret;
1543 }
1544 
1545 /*
1546  * Handling a send with an outgoing send ring. Get the buffers from the
1547  * receive side, and add them to the ingoing buffer ring again.
1548  */
handle_send_ring(struct conn * c,struct conn_dir * cd,int bid,int bytes)1549 static int handle_send_ring(struct conn *c, struct conn_dir *cd, int bid,
1550 			    int bytes)
1551 {
1552 	struct conn_buf_ring *in_cbr = &c->in_br;
1553 	struct conn_buf_ring *out_cbr = &c->out_br;
1554 	int i = 0;
1555 
1556 	while (bytes) {
1557 		struct io_uring_buf *buf = &out_cbr->br->bufs[bid];
1558 		int this_bytes;
1559 		void *this_buf;
1560 
1561 		this_bytes = buf->len;
1562 		if (this_bytes > bytes)
1563 			this_bytes = bytes;
1564 
1565 		cd->out_bytes += this_bytes;
1566 
1567 		vlog("%d: send: bid=%d, len=%d\n", c->tid, bid, this_bytes);
1568 
1569 		this_buf = in_cbr->buf + bid * buf_size;
1570 		io_uring_buf_ring_add(in_cbr->br, this_buf, buf_size, bid, br_mask, i);
1571 		/*
1572 		 * Find the provided buffer that the receive consumed, and
1573 		 * which we then used for the send, and add it back to the
1574 		 * pool so it can get picked by another receive. Once the send
1575 		 * is done, we're done with it.
1576 		 */
1577 		bid = (bid + 1) & (nr_bufs - 1);
1578 		bytes -= this_bytes;
1579 		i++;
1580 	}
1581 	cd->snd_next_bid = bid;
1582 	io_uring_buf_ring_advance(in_cbr->br, i);
1583 
1584 	if (pending_shutdown(c))
1585 		close_cd(c, cd);
1586 
1587 	return i;
1588 }
1589 
1590 /*
1591  * sendmsg, or send without a ring. Just add buffers back to the ingoing
1592  * ring for receives.
1593  */
handle_send_buf(struct conn * c,struct conn_dir * cd,int bid,int bytes)1594 static int handle_send_buf(struct conn *c, struct conn_dir *cd, int bid,
1595 			   int bytes)
1596 {
1597 	struct conn_buf_ring *in_cbr = &c->in_br;
1598 	int i = 0;
1599 
1600 	while (bytes) {
1601 		struct io_uring_buf *buf = &in_cbr->br->bufs[bid];
1602 		int this_bytes;
1603 
1604 		this_bytes = bytes;
1605 		if (this_bytes > buf->len)
1606 			this_bytes = buf->len;
1607 
1608 		vlog("%d: send: bid=%d, len=%d\n", c->tid, bid, this_bytes);
1609 
1610 		cd->out_bytes += this_bytes;
1611 		/* each recvmsg mshot package has this overhead */
1612 		if (rcv_msg && recv_mshot)
1613 			cd->out_bytes += sizeof(struct io_uring_recvmsg_out);
1614 		replenish_buffer(in_cbr, bid, i);
1615 		bid = (bid + 1) & (nr_bufs - 1);
1616 		bytes -= this_bytes;
1617 		i++;
1618 	}
1619 	io_uring_buf_ring_advance(in_cbr->br, i);
1620 	cd->snd_next_bid = bid;
1621 	return i;
1622 }
1623 
__handle_send(struct io_uring * ring,struct conn * c,struct conn_dir * cd,struct io_uring_cqe * cqe)1624 static int __handle_send(struct io_uring *ring, struct conn *c,
1625 			 struct conn_dir *cd, struct io_uring_cqe *cqe)
1626 {
1627 	struct conn_dir *ocd;
1628 	int bid, nr_packets;
1629 
1630 	if (send_ring) {
1631 		if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
1632 			fprintf(stderr, "no buffer in send?! %d\n", cqe->res);
1633 			return 1;
1634 		}
1635 		bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
1636 	} else {
1637 		bid = cqe_to_bid(cqe);
1638 	}
1639 
1640 	/*
1641 	 * CQE notifications only happen with send/sendmsg zerocopy. They
1642 	 * tell us that the data has been acked, and that hence the buffer
1643 	 * is now free to reuse. Waiting on an ACK for each packet will slow
1644 	 * us down tremendously, so do all of our sends and then wait for
1645 	 * the ACKs to come in. They tend to come in bundles anyway. Once
1646 	 * all acks are done (cd->snd_notif == 0), then fire off the next
1647 	 * receive.
1648 	 */
1649 	if (cqe->flags & IORING_CQE_F_NOTIF) {
1650 		cd->snd_notif--;
1651 	} else {
1652 		if (cqe->res && cqe->res < buf_size)
1653 			cd->snd_shrt++;
1654 
1655 		/*
1656 		 * BIDI will use the same buffer pool and do sends on both CDs,
1657 		 * so can't reliably check. TODO.
1658 		 */
1659 		if (!bidi && send_ring && cd->snd_next_bid != -1 &&
1660 		    bid != cd->snd_next_bid) {
1661 			fprintf(stderr, "send bid %d, wanted %d at %lu\n", bid,
1662 					cd->snd_next_bid, cd->out_bytes);
1663 			goto out_close;
1664 		}
1665 
1666 		assert(bid <= nr_bufs);
1667 
1668 		vlog("send: got %d, %lu\n", cqe->res, cd->out_bytes);
1669 
1670 		if (buf_ring_inc)
1671 			nr_packets = handle_send_inc(c, cd, bid, cqe);
1672 		else if (send_ring)
1673 			nr_packets = handle_send_ring(c, cd, bid, cqe->res);
1674 		else
1675 			nr_packets = handle_send_buf(c, cd, bid, cqe->res);
1676 
1677 		if (cd->snd_bucket)
1678 			cd->snd_bucket[nr_packets]++;
1679 
1680 		cd->out_buffers -= nr_packets;
1681 		assert(cd->out_buffers >= 0);
1682 
1683 		cd->snd++;
1684 	}
1685 
1686 	if (!(cqe->flags & IORING_CQE_F_MORE)) {
1687 		int do_recv_arm;
1688 
1689 		cd->pending_send = 0;
1690 
1691 		/*
1692 		 * send done - see if the current vec has data to submit, and
1693 		 * do so if it does. if it doesn't have data yet, nothing to
1694 		 * do.
1695 		 */
1696 		do_recv_arm = !prep_next_send(ring, c, cd, cqe_to_fd(cqe));
1697 
1698 		ocd = &c->cd[!cd->index];
1699 		if (!cd->snd_notif && do_recv_arm && !ocd->pending_recv) {
1700 			int fd = other_dir_fd(c, cqe_to_fd(cqe));
1701 
1702 			__submit_receive(ring, c, ocd, fd);
1703 		}
1704 out_close:
1705 		if (pending_shutdown(c))
1706 			close_cd(c, cd);
1707 	}
1708 
1709 	vlog("%d: pending sends %d\n", c->tid, cd->pending_send);
1710 	return 0;
1711 }
1712 
handle_send(struct io_uring * ring,struct io_uring_cqe * cqe)1713 static int handle_send(struct io_uring *ring, struct io_uring_cqe *cqe)
1714 {
1715 	struct conn *c = cqe_to_conn(cqe);
1716 	struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
1717 
1718 	return __handle_send(ring, c, cd, cqe);
1719 }
1720 
send_error(struct error_handler * err,struct io_uring * ring,struct io_uring_cqe * cqe)1721 static int send_error(struct error_handler *err, struct io_uring *ring,
1722 		      struct io_uring_cqe *cqe)
1723 {
1724 	struct conn *c = cqe_to_conn(cqe);
1725 	struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
1726 
1727 	cd->pending_send = 0;
1728 
1729 	/* res can have high bit set */
1730 	if (cqe->flags & IORING_CQE_F_NOTIF)
1731 		return handle_send(ring, cqe);
1732 	if (cqe->res != -ENOBUFS)
1733 		return default_error(err, ring, cqe);
1734 
1735 	cd->snd_enobufs++;
1736 	return 0;
1737 }
1738 
1739 /*
1740  * We don't expect to get here, as we marked it with skipping posting a
1741  * CQE if it was successful. If it does trigger, than means it fails and
1742  * that our close has not been done. Log the shutdown error and issue a new
1743  * separate close.
1744  */
handle_shutdown(struct io_uring * ring,struct io_uring_cqe * cqe)1745 static int handle_shutdown(struct io_uring *ring, struct io_uring_cqe *cqe)
1746 {
1747 	struct conn *c = cqe_to_conn(cqe);
1748 	struct io_uring_sqe *sqe;
1749 	int fd = cqe_to_fd(cqe);
1750 
1751 	fprintf(stderr, "Got shutdown notification on fd %d\n", fd);
1752 
1753 	if (!cqe->res)
1754 		fprintf(stderr, "Unexpected success shutdown CQE\n");
1755 	else if (cqe->res < 0)
1756 		fprintf(stderr, "Shutdown got %s\n", strerror(-cqe->res));
1757 
1758 	sqe = get_sqe(ring);
1759 	if (fixed_files)
1760 		io_uring_prep_close_direct(sqe, fd);
1761 	else
1762 		io_uring_prep_close(sqe, fd);
1763 	encode_userdata(sqe, c, __CLOSE, 0, fd);
1764 	return 0;
1765 }
1766 
1767 /*
1768  * Final stage of a connection, the shutdown and close has finished. Mark
1769  * it as disconnected and let the main loop reap it.
1770  */
handle_close(struct io_uring * ring,struct io_uring_cqe * cqe)1771 static int handle_close(struct io_uring *ring, struct io_uring_cqe *cqe)
1772 {
1773 	struct conn *c = cqe_to_conn(cqe);
1774 	int fd = cqe_to_fd(cqe);
1775 
1776 	printf("Closed client: id=%d, in_fd=%d, out_fd=%d\n", c->tid, c->in_fd, c->out_fd);
1777 	if (fd == c->in_fd)
1778 		c->in_fd = -1;
1779 	else if (fd == c->out_fd)
1780 		c->out_fd = -1;
1781 
1782 	if (c->in_fd == -1 && c->out_fd == -1) {
1783 		c->flags |= CONN_F_DISCONNECTED;
1784 
1785 		pthread_mutex_lock(&thread_lock);
1786 		__show_stats(c);
1787 		open_conns--;
1788 		pthread_mutex_unlock(&thread_lock);
1789 		free_buffer_rings(ring, c);
1790 		free_msgs(&c->cd[0]);
1791 		free_msgs(&c->cd[1]);
1792 		free(c->cd[0].rcv_bucket);
1793 		free(c->cd[0].snd_bucket);
1794 	}
1795 
1796 	return 0;
1797 }
1798 
handle_cancel(struct io_uring * ring,struct io_uring_cqe * cqe)1799 static int handle_cancel(struct io_uring *ring, struct io_uring_cqe *cqe)
1800 {
1801 	struct conn *c = cqe_to_conn(cqe);
1802 	int fd = cqe_to_fd(cqe);
1803 
1804 	c->pending_cancels--;
1805 
1806 	vlog("%d: got cancel fd %d, refs %d\n", c->tid, fd, c->pending_cancels);
1807 
1808 	if (!c->pending_cancels) {
1809 		queue_shutdown_close(ring, c, c->in_fd);
1810 		if (c->out_fd != -1)
1811 			queue_shutdown_close(ring, c, c->out_fd);
1812 		io_uring_submit(ring);
1813 	}
1814 
1815 	return 0;
1816 }
1817 
open_socket(struct conn * c)1818 static void open_socket(struct conn *c)
1819 {
1820 	if (is_sink) {
1821 		pthread_mutex_lock(&thread_lock);
1822 		open_conns++;
1823 		pthread_mutex_unlock(&thread_lock);
1824 
1825 		submit_receive(&c->ring, c);
1826 	} else {
1827 		struct io_uring_sqe *sqe;
1828 		int domain;
1829 
1830 		if (ipv6)
1831 			domain = AF_INET6;
1832 		else
1833 			domain = AF_INET;
1834 
1835 		/*
1836 		 * If fixed_files is set, proxy will use fixed files for any new
1837 		 * file descriptors it instantiates. Fixd files, or fixed
1838 		 * descriptors, are io_uring private file descriptors. They
1839 		 * cannot be accessed outside of io_uring. io_uring holds a
1840 		 * fixed reference to them, which means that we do not need to
1841 		 * grab per-request references to them. Particularly for
1842 		 * threaded applications, grabbing and dropping file references
1843 		 * for each operation can be costly as the file table is shared.
1844 		 * This generally shows up as fget/fput related overhead in any
1845 		 * workload profiles.
1846 		 *
1847 		 * Fixed descriptors are passed in via the 'fd' field just like
1848 		 * regular descriptors, and then marked as such by setting the
1849 		 * IOSQE_FIXED_FILE flag in the sqe->flags field. Some helpers
1850 		 * do that automatically, like the below, others will need it
1851 		 * set manually if they don't have a *direct*() helper.
1852 		 *
1853 		 * For operations that instantiate them, like the opening of a
1854 		 * direct socket, the application may either ask the kernel to
1855 		 * find a free one (as is done below), or the application may
1856 		 * manage the space itself and pass in an index for a currently
1857 		 * free slot in the table. If the kernel is asked to allocate a
1858 		 * free direct descriptor, note that io_uring does not abide by
1859 		 * the POSIX mandated "lowest free must be returned". It may
1860 		 * return any free descriptor of its choosing.
1861 		 */
1862 		sqe = get_sqe(&c->ring);
1863 		if (fixed_files)
1864 			io_uring_prep_socket_direct_alloc(sqe, domain, SOCK_STREAM, 0, 0);
1865 		else
1866 			io_uring_prep_socket(sqe, domain, SOCK_STREAM, 0, 0);
1867 		encode_userdata(sqe, c, __SOCK, 0, 0);
1868 	}
1869 }
1870 
1871 /*
1872  * Start of connection, we got our in descriptor.
1873  */
handle_fd_pass(struct io_uring_cqe * cqe)1874 static int handle_fd_pass(struct io_uring_cqe *cqe)
1875 {
1876 	struct conn *c = cqe_to_conn(cqe);
1877 	int fd = cqe_to_fd(cqe);
1878 
1879 	vlog("%d: got fd pass %d\n", c->tid, fd);
1880 	c->in_fd = fd;
1881 	open_socket(c);
1882 	return 0;
1883 }
1884 
handle_stop(struct io_uring_cqe * cqe)1885 static int handle_stop(struct io_uring_cqe *cqe)
1886 {
1887 	struct conn *c = cqe_to_conn(cqe);
1888 
1889 	printf("Client %d: queueing shutdown\n", c->tid);
1890 	queue_cancel(&c->ring, c);
1891 	return 0;
1892 }
1893 
1894 /*
1895  * Called for each CQE that we receive. Decode the request type that it
1896  * came from, and call the appropriate handler.
1897  */
handle_cqe(struct io_uring * ring,struct io_uring_cqe * cqe)1898 static int handle_cqe(struct io_uring *ring, struct io_uring_cqe *cqe)
1899 {
1900 	int ret;
1901 
1902 	/*
1903 	 * Unlikely, but there's an error in this CQE. If an error handler
1904 	 * is defined, call it, and that will deal with it. If no error
1905 	 * handler is defined, the opcode handler either doesn't care or will
1906 	 * handle it on its own.
1907 	 */
1908 	if (cqe->res < 0) {
1909 		struct error_handler *err = &error_handlers[cqe_to_op(cqe)];
1910 
1911 		if (err->error_fn)
1912 			return err->error_fn(err, ring, cqe);
1913 	}
1914 
1915 	switch (cqe_to_op(cqe)) {
1916 	case __ACCEPT:
1917 		ret = handle_accept(ring, cqe);
1918 		break;
1919 	case __SOCK:
1920 		ret = handle_sock(ring, cqe);
1921 		break;
1922 	case __CONNECT:
1923 		ret = handle_connect(ring, cqe);
1924 		break;
1925 	case __RECV:
1926 	case __RECVMSG:
1927 		ret = handle_recv(ring, cqe);
1928 		break;
1929 	case __SEND:
1930 	case __SENDMSG:
1931 		ret = handle_send(ring, cqe);
1932 		break;
1933 	case __CANCEL:
1934 		ret = handle_cancel(ring, cqe);
1935 		break;
1936 	case __SHUTDOWN:
1937 		ret = handle_shutdown(ring, cqe);
1938 		break;
1939 	case __CLOSE:
1940 		ret = handle_close(ring, cqe);
1941 		break;
1942 	case __FD_PASS:
1943 		ret = handle_fd_pass(cqe);
1944 		break;
1945 	case __STOP:
1946 		ret = handle_stop(cqe);
1947 		break;
1948 	case __NOP:
1949 		ret = 0;
1950 		break;
1951 	default:
1952 		fprintf(stderr, "bad user data %lx\n", (long) cqe->user_data);
1953 		return 1;
1954 	}
1955 
1956 	return ret;
1957 }
1958 
house_keeping(struct io_uring * ring)1959 static void house_keeping(struct io_uring *ring)
1960 {
1961 	static unsigned long last_bytes;
1962 	unsigned long bytes, elapsed;
1963 	struct conn *c;
1964 	int i, j;
1965 
1966 	vlog("House keeping entered\n");
1967 
1968 	bytes = 0;
1969 	for (i = 0; i < nr_conns; i++) {
1970 		c = &conns[i];
1971 
1972 		for (j = 0; j < 2; j++) {
1973 			struct conn_dir *cd = &c->cd[j];
1974 
1975 			bytes += cd->in_bytes + cd->out_bytes;
1976 		}
1977 		if (c->flags & CONN_F_DISCONNECTED) {
1978 			vlog("%d: disconnected\n", i);
1979 
1980 			if (!(c->flags & CONN_F_REAPED)) {
1981 				void *ret;
1982 
1983 				pthread_join(c->thread, &ret);
1984 				c->flags |= CONN_F_REAPED;
1985 			}
1986 			continue;
1987 		}
1988 		if (c->flags & CONN_F_DISCONNECTING)
1989 			continue;
1990 
1991 		if (should_shutdown(c)) {
1992 			__close_conn(ring, c);
1993 			c->flags |= CONN_F_DISCONNECTING;
1994 		}
1995 	}
1996 
1997 	elapsed = mtime_since_now(&last_housekeeping);
1998 	if (bytes && elapsed >= 900) {
1999 		unsigned long bw;
2000 
2001 		bw = (8 * (bytes - last_bytes) / 1000UL) / elapsed;
2002 		if (bw) {
2003 			if (open_conns)
2004 				printf("Bandwidth (threads=%d): %'luMbit\n", open_conns, bw);
2005 			gettimeofday(&last_housekeeping, NULL);
2006 			last_bytes = bytes;
2007 		}
2008 	}
2009 }
2010 
2011 /*
2012  * Event loop shared between the parent, and the connections. Could be
2013  * split in two, as they don't handle the same types of events. For the per
2014  * connection loop, 'c' is valid. For the main loop, it's NULL.
2015  */
__event_loop(struct io_uring * ring,struct conn * c)2016 static int __event_loop(struct io_uring *ring, struct conn *c)
2017 {
2018 	struct __kernel_timespec active_ts, idle_ts;
2019 	int flags;
2020 
2021 	idle_ts.tv_sec = 0;
2022 	idle_ts.tv_nsec = 100000000LL;
2023 	active_ts = idle_ts;
2024 	if (wait_usec > 1000000) {
2025 		active_ts.tv_sec = wait_usec / 1000000;
2026 		wait_usec -= active_ts.tv_sec * 1000000;
2027 	}
2028 	active_ts.tv_nsec = wait_usec * 1000;
2029 
2030 	gettimeofday(&last_housekeeping, NULL);
2031 
2032 	flags = 0;
2033 	while (1) {
2034 		struct __kernel_timespec *ts = &idle_ts;
2035 		struct io_uring_cqe *cqe;
2036 		unsigned int head;
2037 		int ret, i, to_wait;
2038 
2039 		/*
2040 		 * If wait_batch is set higher than 1, then we'll wait on
2041 		 * that amount of CQEs to be posted each loop. If used with
2042 		 * DEFER_TASKRUN, this can provide a substantial reduction
2043 		 * in context switch rate as the task isn't woken until the
2044 		 * requested number of events can be returned.
2045 		 *
2046 		 * Can be used with -t to set a wait_usec timeout as well.
2047 		 * For example, if an application can deal with 250 usec
2048 		 * of wait latencies, it can set -w8 -t250 which will cause
2049 		 * io_uring to return when either 8 events have been received,
2050 		 * or if 250 usec of waiting has passed.
2051 		 *
2052 		 * If we don't have any open connections, wait on just 1
2053 		 * always.
2054 		 */
2055 		to_wait = 1;
2056 		if (open_conns && !flags) {
2057 			ts = &active_ts;
2058 			to_wait = wait_batch;
2059 		}
2060 
2061 		vlog("Submit and wait for %d\n", to_wait);
2062 		ret = io_uring_submit_and_wait_timeout(ring, &cqe, to_wait, ts, NULL);
2063 
2064 		if (*ring->cq.koverflow)
2065 			printf("overflow %u\n", *ring->cq.koverflow);
2066 		if (*ring->sq.kflags &  IORING_SQ_CQ_OVERFLOW)
2067 			printf("saw overflow\n");
2068 
2069 		vlog("Submit and wait: %d\n", ret);
2070 
2071 		i = flags = 0;
2072 		io_uring_for_each_cqe(ring, head, cqe) {
2073 			if (handle_cqe(ring, cqe))
2074 				return 1;
2075 			flags |= cqe_to_conn(cqe)->flags;
2076 			++i;
2077 		}
2078 
2079 		vlog("Handled %d events\n", i);
2080 
2081 		/*
2082 		 * Advance the CQ ring for seen events when we've processed
2083 		 * all of them in this loop. This can also be done with
2084 		 * io_uring_cqe_seen() in each handler above, which just marks
2085 		 * that single CQE as seen. However, it's more efficient to
2086 		 * mark a batch as seen when we're done with that batch.
2087 		 */
2088 		if (i) {
2089 			io_uring_cq_advance(ring, i);
2090 			events += i;
2091 		}
2092 
2093 		event_loops++;
2094 		if (c) {
2095 			if (c->flags & CONN_F_DISCONNECTED)
2096 				break;
2097 		} else {
2098 			house_keeping(ring);
2099 		}
2100 	}
2101 
2102 	return 0;
2103 }
2104 
2105 /*
2106  * Main event loop, Submit our multishot accept request, and then just loop
2107  * around handling incoming connections.
2108  */
parent_loop(struct io_uring * ring,int fd)2109 static int parent_loop(struct io_uring *ring, int fd)
2110 {
2111 	struct io_uring_sqe *sqe;
2112 
2113 	/*
2114 	 * proxy provides a way to use either multishot receive or not, but
2115 	 * for accept, we always use multishot. A multishot accept request
2116 	 * needs only be armed once, and then it'll trigger a completion and
2117 	 * post a CQE whenever a new connection is accepted. No need to do
2118 	 * anything else, unless the multishot accept terminates. This happens
2119 	 * if it encounters an error. Applications should check for
2120 	 * IORING_CQE_F_MORE in cqe->flags - this tells you if more completions
2121 	 * are expected from this request or not. Non-multishot never have
2122 	 * this set, where multishot will always have this set unless an error
2123 	 * occurs.
2124 	 */
2125 	sqe = get_sqe(ring);
2126 	if (fixed_files)
2127 		io_uring_prep_multishot_accept_direct(sqe, fd, NULL, NULL, 0);
2128 	else
2129 		io_uring_prep_multishot_accept(sqe, fd, NULL, NULL, 0);
2130 	__encode_userdata(sqe, 0, __ACCEPT, 0, fd);
2131 
2132 	return __event_loop(ring, NULL);
2133 }
2134 
init_ring(struct io_uring * ring,int nr_files)2135 static int init_ring(struct io_uring *ring, int nr_files)
2136 {
2137 	struct io_uring_params params;
2138 	int ret;
2139 
2140 	/*
2141 	 * By default, set us up with a big CQ ring. Not strictly needed
2142 	 * here, but it's very important to never overflow the CQ ring.
2143 	 * Events will not be dropped if this happens, but it does slow
2144 	 * the application down in dealing with overflown events.
2145 	 *
2146 	 * Set SINGLE_ISSUER, which tells the kernel that only one thread
2147 	 * is doing IO submissions. This enables certain optimizations in
2148 	 * the kernel.
2149 	 */
2150 	memset(&params, 0, sizeof(params));
2151 	params.flags |= IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_CLAMP;
2152 	params.flags |= IORING_SETUP_CQSIZE;
2153 	params.cq_entries = 1024;
2154 
2155 	/*
2156 	 * If use_huge is set, setup the ring with IORING_SETUP_NO_MMAP. This
2157 	 * means that the application allocates the memory for the ring, and
2158 	 * the kernel maps it. The alternative is having the kernel allocate
2159 	 * the memory, and then liburing will mmap it. But we can't really
2160 	 * support huge pages that way. If this fails, then ensure that the
2161 	 * system has huge pages set aside upfront.
2162 	 */
2163 	if (use_huge)
2164 		params.flags |= IORING_SETUP_NO_MMAP;
2165 
2166 	/*
2167 	 * DEFER_TASKRUN decouples async event reaping and retrying from
2168 	 * regular system calls. If this isn't set, then io_uring uses
2169 	 * normal task_work for this. task_work is always being run on any
2170 	 * exit to userspace. Real applications do more than just call IO
2171 	 * related system calls, and hence we can be running this work way
2172 	 * too often. Using DEFER_TASKRUN defers any task_work running to
2173 	 * when the application enters the kernel anyway to wait on new
2174 	 * events. It's generally the preferred and recommended way to setup
2175 	 * a ring.
2176 	 */
2177 	if (defer_tw) {
2178 		params.flags |= IORING_SETUP_DEFER_TASKRUN;
2179 		sqpoll = 0;
2180 	}
2181 
2182 	/*
2183 	 * SQPOLL offloads any request submission and retry operations to a
2184 	 * dedicated thread. This enables an application to do IO without
2185 	 * ever having to enter the kernel itself. The SQPOLL thread will
2186 	 * stay busy as long as there's work to do, and go to sleep if
2187 	 * sq_thread_idle msecs have passed. If it's running, submitting new
2188 	 * IO just needs to make them visible to the SQPOLL thread, it needs
2189 	 * not enter the kernel. For submission, the application will only
2190 	 * enter the kernel if the SQPOLL has been idle long enough that it
2191 	 * has gone to sleep.
2192 	 *
2193 	 * Waiting on events still need to enter the kernel, if none are
2194 	 * available. The application may also use io_uring_peek_cqe() to
2195 	 * check for new events without entering the kernel, as completions
2196 	 * will be continually produced to the CQ ring by the SQPOLL thread
2197 	 * as they occur.
2198 	 */
2199 	if (sqpoll) {
2200 		params.flags |= IORING_SETUP_SQPOLL;
2201 		params.sq_thread_idle = 1000;
2202 		defer_tw = 0;
2203 	}
2204 
2205 	/*
2206 	 * If neither DEFER_TASKRUN or SQPOLL is used, set COOP_TASKRUN. This
2207 	 * avoids heavy signal based notifications, which can force an
2208 	 * application to enter the kernel and process it as soon as they
2209 	 * occur.
2210 	 */
2211 	if (!sqpoll && !defer_tw)
2212 		params.flags |= IORING_SETUP_COOP_TASKRUN;
2213 
2214 	/*
2215 	 * The SQ ring size need not be larger than any batch of requests
2216 	 * that need to be prepared before submit. Normally in a loop we'd
2217 	 * only need a few, if any, particularly if multishot is used.
2218 	 */
2219 	ret = io_uring_queue_init_params(ring_size, ring, &params);
2220 	if (ret) {
2221 		fprintf(stderr, "%s\n", strerror(-ret));
2222 		return 1;
2223 	}
2224 
2225 	/*
2226 	 * If send serialization is available and no option was given to use
2227 	 * it or not, default it to on. If it was turned on and the kernel
2228 	 * doesn't support it, turn it off.
2229 	 */
2230 	if (params.features & IORING_FEAT_SEND_BUF_SELECT) {
2231 		if (send_ring == -1)
2232 			send_ring = 1;
2233 	} else {
2234 		if (send_ring == 1) {
2235 			fprintf(stderr, "Kernel doesn't support ring provided "
2236 				"buffers for sends, disabled\n");
2237 		}
2238 		send_ring = 0;
2239 	}
2240 
2241 	if (!send_ring && snd_bundle) {
2242 		fprintf(stderr, "Can't use send bundle without send_ring\n");
2243 		snd_bundle = 0;
2244 	}
2245 
2246 	if (fixed_files) {
2247 		/*
2248 		 * If fixed files are used, we need to allocate a fixed file
2249 		 * table upfront where new direct descriptors can be managed.
2250 		 */
2251 		ret = io_uring_register_files_sparse(ring, nr_files);
2252 		if (ret) {
2253 			fprintf(stderr, "file register: %d\n", ret);
2254 			return 1;
2255 		}
2256 
2257 		/*
2258 		 * If fixed files are used, we also register the ring fd. See
2259 		 * comment near io_uring_prep_socket_direct_alloc() further
2260 		 * down. This avoids the fget/fput overhead associated with
2261 		 * the io_uring_enter(2) system call itself, which is used to
2262 		 * submit and wait on events.
2263 		 */
2264 		ret = io_uring_register_ring_fd(ring);
2265 		if (ret != 1) {
2266 			fprintf(stderr, "ring register: %d\n", ret);
2267 			return 1;
2268 		}
2269 	}
2270 
2271 	if (napi) {
2272 		struct io_uring_napi n = {
2273 			.prefer_busy_poll = napi > 1 ? 1 : 0,
2274 			.busy_poll_to = napi_timeout,
2275 		};
2276 
2277 		ret = io_uring_register_napi(ring, &n);
2278 		if (ret) {
2279 			fprintf(stderr, "io_uring_register_napi: %d\n", ret);
2280 			if (ret != -EINVAL)
2281 				return 1;
2282 			fprintf(stderr, "NAPI not available, turned off\n");
2283 		}
2284 	}
2285 
2286 	return 0;
2287 }
2288 
thread_main(void * data)2289 static void *thread_main(void *data)
2290 {
2291 	struct conn *c = data;
2292 	int ret;
2293 
2294 	c->flags |= CONN_F_STARTED;
2295 
2296 	/* we need a max of 4 descriptors for each client */
2297 	ret = init_ring(&c->ring, 4);
2298 	if (ret)
2299 		goto done;
2300 
2301 	if (setup_buffer_rings(&c->ring, c))
2302 		goto done;
2303 
2304 	/*
2305 	 * If we're using fixed files, then we need to wait for the parent
2306 	 * to install the c->in_fd into our direct descriptor table. When
2307 	 * that happens, we'll set things up. If we're not using fixed files,
2308 	 * we can set up the receive or connect now.
2309 	 */
2310 	if (!fixed_files)
2311 		open_socket(c);
2312 
2313 	/* we're ready */
2314 	pthread_barrier_wait(&c->startup_barrier);
2315 
2316 	__event_loop(&c->ring, c);
2317 done:
2318 	return NULL;
2319 }
2320 
usage(const char * name)2321 static void usage(const char *name)
2322 {
2323 	printf("%s:\n", name);
2324 	printf("\t-m:\t\tUse multishot receive (%d)\n", recv_mshot);
2325 	printf("\t-d:\t\tUse DEFER_TASKRUN (%d)\n", defer_tw);
2326 	printf("\t-S:\t\tUse SQPOLL (%d)\n", sqpoll);
2327 	printf("\t-f:\t\tUse only fixed files (%d)\n", fixed_files);
2328 	printf("\t-a:\t\tUse huge pages for the ring (%d)\n", use_huge);
2329 	printf("\t-t:\t\tTimeout for waiting on CQEs (usec) (%d)\n", wait_usec);
2330 	printf("\t-w:\t\tNumber of CQEs to wait for each loop (%d)\n", wait_batch);
2331 	printf("\t-B:\t\tUse bi-directional mode (%d)\n", bidi);
2332 	printf("\t-s:\t\tAct only as a sink (%d)\n", is_sink);
2333 	printf("\t-q:\t\tRing size to use (%d)\n", ring_size);
2334 	printf("\t-H:\t\tHost to connect to (%s)\n", host);
2335 	printf("\t-r:\t\tPort to receive on (%d)\n", receive_port);
2336 	printf("\t-p:\t\tPort to connect to (%d)\n", send_port);
2337 	printf("\t-6:\t\tUse IPv6 (%d)\n", ipv6);
2338 	printf("\t-N:\t\tUse NAPI polling (%d)\n", napi);
2339 	printf("\t-T:\t\tNAPI timeout (usec) (%d)\n", napi_timeout);
2340 	printf("\t-b:\t\tSend/receive buf size (%d)\n", buf_size);
2341 	printf("\t-n:\t\tNumber of provided buffers (pow2) (%d)\n", nr_bufs);
2342 	printf("\t-u:\t\tUse provided buffers for send (%d)\n", send_ring);
2343 	printf("\t-C:\t\tUse bundles for send (%d)\n", snd_bundle);
2344 	printf("\t-z:\t\tUse zerocopy send (%d)\n", snd_zc);
2345 	printf("\t-c:\t\tUse bundles for recv (%d)\n", snd_bundle);
2346 	printf("\t-M:\t\tUse sendmsg (%d)\n", snd_msg);
2347 	printf("\t-M:\t\tUse recvmsg (%d)\n", rcv_msg);
2348 	printf("\t-x:\t\tShow extended stats (%d)\n", ext_stat);
2349 	printf("\t-V:\t\tIncrease verbosity (%d)\n", verbose);
2350 }
2351 
2352 /*
2353  * Options parsing the ring / net setup
2354  */
main(int argc,char * argv[])2355 int main(int argc, char *argv[])
2356 {
2357 	struct io_uring ring;
2358 	struct sigaction sa = { };
2359 	const char *optstring;
2360 	int opt, ret, fd;
2361 
2362 	setlocale(LC_NUMERIC, "en_US");
2363 
2364 	page_size = sysconf(_SC_PAGESIZE);
2365 	if (page_size < 0) {
2366 		perror("sysconf(_SC_PAGESIZE)");
2367 		return 1;
2368 	}
2369 
2370 	pthread_mutex_init(&thread_lock, NULL);
2371 
2372 	optstring = "m:d:S:s:b:f:H:r:p:n:B:N:T:w:t:M:R:u:c:C:q:a:x:z:i:6Vh?";
2373 	while ((opt = getopt(argc, argv, optstring)) != -1) {
2374 		switch (opt) {
2375 		case 'm':
2376 			recv_mshot = !!atoi(optarg);
2377 			break;
2378 		case 'S':
2379 			sqpoll = !!atoi(optarg);
2380 			break;
2381 		case 'd':
2382 			defer_tw = !!atoi(optarg);
2383 			break;
2384 		case 'b':
2385 			buf_size = atoi(optarg);
2386 			break;
2387 		case 'n':
2388 			nr_bufs = atoi(optarg);
2389 			break;
2390 		case 'u':
2391 			send_ring = !!atoi(optarg);
2392 			break;
2393 		case 'c':
2394 			rcv_bundle = !!atoi(optarg);
2395 			break;
2396 		case 'C':
2397 			snd_bundle = !!atoi(optarg);
2398 			break;
2399 		case 'w':
2400 			wait_batch = atoi(optarg);
2401 			break;
2402 		case 't':
2403 			wait_usec = atoi(optarg);
2404 			break;
2405 		case 's':
2406 			is_sink = !!atoi(optarg);
2407 			break;
2408 		case 'f':
2409 			fixed_files = !!atoi(optarg);
2410 			break;
2411 		case 'H':
2412 			host = strdup(optarg);
2413 			break;
2414 		case 'r':
2415 			receive_port = atoi(optarg);
2416 			break;
2417 		case 'p':
2418 			send_port = atoi(optarg);
2419 			break;
2420 		case 'B':
2421 			bidi = !!atoi(optarg);
2422 			break;
2423 		case 'N':
2424 			napi = !!atoi(optarg);
2425 			break;
2426 		case 'T':
2427 			napi_timeout = atoi(optarg);
2428 			break;
2429 		case '6':
2430 			ipv6 = true;
2431 			break;
2432 		case 'M':
2433 			snd_msg = !!atoi(optarg);
2434 			break;
2435 		case 'z':
2436 			snd_zc = !!atoi(optarg);
2437 			break;
2438 		case 'R':
2439 			rcv_msg = !!atoi(optarg);
2440 			break;
2441 		case 'q':
2442 			ring_size = atoi(optarg);
2443 			break;
2444 		case 'i':
2445 			buf_ring_inc = !!atoi(optarg);
2446 			break;
2447 		case 'a':
2448 			use_huge = !!atoi(optarg);
2449 			break;
2450 		case 'x':
2451 			ext_stat = !!atoi(optarg);
2452 			break;
2453 		case 'V':
2454 			verbose++;
2455 			break;
2456 		case 'h':
2457 		default:
2458 			usage(argv[0]);
2459 			return 1;
2460 		}
2461 	}
2462 
2463 	if (bidi && is_sink) {
2464 		fprintf(stderr, "Can't be both bidi proxy and sink\n");
2465 		return 1;
2466 	}
2467 	if (snd_msg && sqpoll) {
2468 		fprintf(stderr, "SQPOLL with msg variants disabled\n");
2469 		snd_msg = 0;
2470 	}
2471 	if (rcv_msg && rcv_bundle) {
2472 		fprintf(stderr, "Can't use bundles with recvmsg\n");
2473 		rcv_msg = 0;
2474 	}
2475 	if (snd_msg && snd_bundle) {
2476 		fprintf(stderr, "Can't use bundles with sendmsg\n");
2477 		snd_msg = 0;
2478 	}
2479 	if (snd_msg && send_ring) {
2480 		fprintf(stderr, "Can't use send ring sendmsg\n");
2481 		snd_msg = 0;
2482 	}
2483 	if (snd_zc && (send_ring || snd_bundle)) {
2484 		fprintf(stderr, "Can't use send zc with bundles or ring\n");
2485 		send_ring = snd_bundle = 0;
2486 	}
2487 	/*
2488 	 * For recvmsg w/multishot, we waste some data at the head of the
2489 	 * packet every time. Adjust the buffer size to account for that,
2490 	 * so we're still handing 'buf_size' actual payload of data.
2491 	 */
2492 	if (rcv_msg && recv_mshot) {
2493 		fprintf(stderr, "Adjusted buf size for recvmsg w/multishot\n");
2494 		buf_size += sizeof(struct io_uring_recvmsg_out);
2495 	}
2496 
2497 	br_mask = nr_bufs - 1;
2498 
2499 	fd = setup_listening_socket(receive_port, ipv6);
2500 	if (is_sink)
2501 		send_port = -1;
2502 
2503 	if (fd == -1)
2504 		return 1;
2505 
2506 	atexit(show_stats);
2507 	sa.sa_handler = sig_int;
2508 	sa.sa_flags = SA_RESTART;
2509 	sigaction(SIGINT, &sa, NULL);
2510 
2511 	ret = init_ring(&ring, MAX_CONNS * 3);
2512 	if (ret)
2513 		return ret;
2514 
2515 	printf("Backend: sqpoll=%d, defer_tw=%d, fixed_files=%d, "
2516 		"is_sink=%d, buf_size=%d, nr_bufs=%d, host=%s, send_port=%d, "
2517 		"receive_port=%d, napi=%d, napi_timeout=%d, huge_page=%d\n",
2518 			sqpoll, defer_tw, fixed_files, is_sink,
2519 			buf_size, nr_bufs, host, send_port, receive_port,
2520 			napi, napi_timeout, use_huge);
2521 	printf(" recv options: recvmsg=%d, recv_mshot=%d, recv_bundle=%d\n",
2522 			rcv_msg, recv_mshot, rcv_bundle);
2523 	printf(" send options: sendmsg=%d, send_ring=%d, send_bundle=%d, "
2524 		"send_zerocopy=%d\n", snd_msg, send_ring, snd_bundle,
2525 			snd_zc);
2526 
2527 	return parent_loop(&ring, fd);
2528 }
2529