• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* SPDX-License-Identifier: MIT */
2 /*
3  * Sample program that can act either as a packet sink, where it just receives
4  * packets and doesn't do anything with them, or it can act as a proxy where it
5  * receives packets and then sends them to a new destination. The proxy can
6  * be unidirectional (-B0), or bi-direction (-B1).
7  *
8  * Examples:
9  *
10  * Act as a proxy, listening on port 4444, and send data to 192.168.2.6 on port
11  * 4445. Use multishot receive, DEFER_TASKRUN, and fixed files
12  *
13  * 	./proxy -m1 -r4444 -H 192.168.2.6 -p4445
14  *
15  * Same as above, but utilize send bundles (-C1, requires -u1 send_ring) as well
16  * with ring provided send buffers, and recv bundles (-c1).
17  *
18  * 	./proxy -m1 -c1 -u1 -C1 -r4444 -H 192.168.2.6 -p4445
19  *
20  * Act as a bi-directional proxy, listening on port 8888, and send data back
21  * and forth between host and 192.168.2.6 on port 22. Use multishot receive,
22  * DEFER_TASKRUN, fixed files, and buffers of size 1500.
23  *
24  * 	./proxy -m1 -B1 -b1500 -r8888 -H 192.168.2.6 -p22
25  *
26  * Act a sink, listening on port 4445, using multishot receive, DEFER_TASKRUN,
27  * and fixed files:
28  *
29  * 	./proxy -m1 -s1 -r4445
30  *
31  * Run with -h to see a list of options, and their defaults.
32  *
33  * (C) 2024 Jens Axboe <axboe@kernel.dk>
34  *
35  */
36 #include <fcntl.h>
37 #include <stdint.h>
38 #include <netinet/in.h>
39 #include <netinet/tcp.h>
40 #include <arpa/inet.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #include <sys/socket.h>
45 #include <sys/time.h>
46 #include <unistd.h>
47 #include <sys/mman.h>
48 #include <linux/mman.h>
49 #include <locale.h>
50 #include <assert.h>
51 #include <pthread.h>
52 #include <liburing.h>
53 
54 #include "proxy.h"
55 #include "helpers.h"
56 
57 /*
58  * Will go away once/if bundles are upstreamed and we put the generic
59  * definitions in the kernel header.
60  */
61 #ifndef IORING_RECVSEND_BUNDLE
62 #define IORING_RECVSEND_BUNDLE		(1U << 4)
63 #endif
64 #ifndef IORING_FEAT_SEND_BUF_SELECT
65 #define IORING_FEAT_SEND_BUF_SELECT	(1U << 14)
66 #endif
67 
68 static int cur_bgid = 1;
69 static int nr_conns;
70 static int open_conns;
71 static long page_size;
72 
73 static unsigned long event_loops;
74 static unsigned long events;
75 
76 static int recv_mshot = 1;
77 static int sqpoll;
78 static int defer_tw = 1;
79 static int is_sink;
80 static int fixed_files = 1;
81 static char *host = "192.168.3.2";
82 static int send_port = 4445;
83 static int receive_port = 4444;
84 static int buf_size = 32;
85 static int bidi;
86 static int ipv6;
87 static int napi;
88 static int napi_timeout;
89 static int wait_batch = 1;
90 static int wait_usec = 1000000;
91 static int rcv_msg;
92 static int snd_msg;
93 static int snd_zc;
94 static int send_ring = -1;
95 static int snd_bundle;
96 static int rcv_bundle;
97 static int use_huge;
98 static int ext_stat;
99 static int verbose;
100 
101 static int nr_bufs = 256;
102 static int br_mask;
103 
104 static int ring_size = 128;
105 
106 static pthread_mutex_t thread_lock;
107 static struct timeval last_housekeeping;
108 
109 /*
110  * For sendmsg/recvmsg. recvmsg just has a single vec, sendmsg will have
111  * two vecs - one that is currently submitted and being sent, and one that
112  * is being prepared. When a new sendmsg is issued, we'll swap which one we
113  * use. For send, even though we don't pass in the iovec itself, we use the
114  * vec to serialize the sends to avoid reordering.
115  */
116 struct msg_vec {
117 	struct iovec *iov;
118 	/* length of allocated vec */
119 	int vec_size;
120 	/* length currently being used */
121 	int iov_len;
122 	/* only for send, current index we're processing */
123 	int cur_iov;
124 };
125 
126 struct io_msg {
127 	struct msghdr msg;
128 	struct msg_vec vecs[2];
129 	/* current msg_vec being prepared */
130 	int vec_index;
131 };
132 
133 /*
134  * Per socket stats per connection. For bi-directional, we'll have both
135  * sends and receives on each socket, this helps track them seperately.
136  * For sink or one directional, each of the two stats will be only sends
137  * or receives, not both.
138  */
139 struct conn_dir {
140 	int index;
141 
142 	int pending_shutdown;
143 	int pending_send;
144 	int pending_recv;
145 
146 	int snd_notif;
147 
148 	int out_buffers;
149 
150 	int rcv, rcv_shrt, rcv_enobufs, rcv_mshot;
151 	int snd, snd_shrt, snd_enobufs, snd_busy, snd_mshot;
152 
153 	int snd_next_bid;
154 	int rcv_next_bid;
155 
156 	int *rcv_bucket;
157 	int *snd_bucket;
158 
159 	unsigned long in_bytes, out_bytes;
160 
161 	/* only ever have a single recv pending */
162 	struct io_msg io_rcv_msg;
163 
164 	/* one send that is inflight, and one being prepared for the next one */
165 	struct io_msg io_snd_msg;
166 };
167 
168 enum {
169 	CONN_F_STARTED		= 1,
170 	CONN_F_DISCONNECTING	= 2,
171 	CONN_F_DISCONNECTED	= 4,
172 	CONN_F_PENDING_SHUTDOWN	= 8,
173 	CONN_F_STATS_SHOWN	= 16,
174 	CONN_F_END_TIME		= 32,
175 	CONN_F_REAPED		= 64,
176 };
177 
178 /*
179  * buffer ring belonging to a connection
180  */
181 struct conn_buf_ring {
182 	struct io_uring_buf_ring *br;
183 	void *buf;
184 	int bgid;
185 };
186 
187 struct conn {
188 	struct io_uring ring;
189 
190 	/* receive side buffer ring, new data arrives here */
191 	struct conn_buf_ring in_br;
192 	/* if send_ring is used, outgoing data to send */
193 	struct conn_buf_ring out_br;
194 
195 	int tid;
196 	int in_fd, out_fd;
197 	int pending_cancels;
198 	int flags;
199 
200 	struct conn_dir cd[2];
201 
202 	struct timeval start_time, end_time;
203 
204 	union {
205 		struct sockaddr_in addr;
206 		struct sockaddr_in6 addr6;
207 	};
208 
209 	pthread_t thread;
210 	pthread_barrier_t startup_barrier;
211 };
212 
213 #define MAX_CONNS	1024
214 static struct conn conns[MAX_CONNS];
215 
216 #define vlog(str, ...) do {						\
217 	if (verbose)							\
218 		printf(str, ##__VA_ARGS__);				\
219 } while (0)
220 
221 static int prep_next_send(struct io_uring *ring, struct conn *c,
222 			  struct conn_dir *cd, int fd);
223 static void *thread_main(void *data);
224 
cqe_to_conn(struct io_uring_cqe * cqe)225 static struct conn *cqe_to_conn(struct io_uring_cqe *cqe)
226 {
227 	struct userdata ud = { .val = cqe->user_data };
228 
229 	return &conns[ud.op_tid & TID_MASK];
230 }
231 
cqe_to_conn_dir(struct conn * c,struct io_uring_cqe * cqe)232 static struct conn_dir *cqe_to_conn_dir(struct conn *c,
233 					struct io_uring_cqe *cqe)
234 {
235 	int fd = cqe_to_fd(cqe);
236 
237 	return &c->cd[fd != c->in_fd];
238 }
239 
other_dir_fd(struct conn * c,int fd)240 static int other_dir_fd(struct conn *c, int fd)
241 {
242 	if (c->in_fd == fd)
243 		return c->out_fd;
244 	return c->in_fd;
245 }
246 
247 /* currently active msg_vec */
msg_vec(struct io_msg * imsg)248 static struct msg_vec *msg_vec(struct io_msg *imsg)
249 {
250 	return &imsg->vecs[imsg->vec_index];
251 }
252 
snd_msg_vec(struct conn_dir * cd)253 static struct msg_vec *snd_msg_vec(struct conn_dir *cd)
254 {
255 	return msg_vec(&cd->io_snd_msg);
256 }
257 
258 /*
259  * Goes from accept new connection -> create socket, connect to end
260  * point, prepare recv, on receive do send (unless sink). If either ends
261  * disconnects, we transition to shutdown and then close.
262  */
263 enum {
264 	__ACCEPT	= 1,
265 	__SOCK		= 2,
266 	__CONNECT	= 3,
267 	__RECV		= 4,
268 	__RECVMSG	= 5,
269 	__SEND		= 6,
270 	__SENDMSG	= 7,
271 	__SHUTDOWN	= 8,
272 	__CANCEL	= 9,
273 	__CLOSE		= 10,
274 	__FD_PASS	= 11,
275 	__NOP		= 12,
276 	__STOP		= 13,
277 };
278 
279 struct error_handler {
280 	const char *name;
281 	int (*error_fn)(struct error_handler *, struct io_uring *, struct io_uring_cqe *);
282 };
283 
284 static int recv_error(struct error_handler *err, struct io_uring *ring,
285 		      struct io_uring_cqe *cqe);
286 static int send_error(struct error_handler *err, struct io_uring *ring,
287 		      struct io_uring_cqe *cqe);
288 
default_error(struct error_handler * err,struct io_uring * ring,struct io_uring_cqe * cqe)289 static int default_error(struct error_handler *err,
290 			 struct io_uring __attribute__((__unused__)) *ring,
291 			 struct io_uring_cqe *cqe)
292 {
293 	struct conn *c = cqe_to_conn(cqe);
294 
295 	fprintf(stderr, "%d: %s error %s\n", c->tid, err->name, strerror(-cqe->res));
296 	fprintf(stderr, "fd=%d, bid=%d\n", cqe_to_fd(cqe), cqe_to_bid(cqe));
297 	return 1;
298 }
299 
300 /*
301  * Move error handling out of the normal handling path, cleanly seperating
302  * them. If an opcode doesn't need any error handling, set it to NULL. If
303  * it wants to stop the connection at that point and not do anything else,
304  * then the default handler can be used. Only receive has proper error
305  * handling, as we can get -ENOBUFS which is not a fatal condition. It just
306  * means we need to wait on buffer replenishing before re-arming the receive.
307  */
308 static struct error_handler error_handlers[] = {
309 	{ .name = "NULL",	.error_fn = NULL, },
310 	{ .name = "ACCEPT",	.error_fn = default_error, },
311 	{ .name = "SOCK",	.error_fn = default_error, },
312 	{ .name = "CONNECT",	.error_fn = default_error, },
313 	{ .name = "RECV",	.error_fn = recv_error, },
314 	{ .name = "RECVMSG",	.error_fn = recv_error, },
315 	{ .name = "SEND",	.error_fn = send_error, },
316 	{ .name = "SENDMSG",	.error_fn = send_error, },
317 	{ .name = "SHUTDOWN",	.error_fn = NULL, },
318 	{ .name = "CANCEL",	.error_fn = NULL, },
319 	{ .name = "CLOSE",	.error_fn = NULL, },
320 	{ .name = "FD_PASS",	.error_fn = default_error, },
321 	{ .name = "NOP",	.error_fn = NULL, },
322 	{ .name = "STOP",	.error_fn = default_error, },
323 };
324 
free_buffer_ring(struct io_uring * ring,struct conn_buf_ring * cbr)325 static void free_buffer_ring(struct io_uring *ring, struct conn_buf_ring *cbr)
326 {
327 	if (!cbr->br)
328 		return;
329 
330 	io_uring_free_buf_ring(ring, cbr->br, nr_bufs, cbr->bgid);
331 	cbr->br = NULL;
332 	if (use_huge)
333 		munmap(cbr->buf, buf_size * nr_bufs);
334 	else
335 		free(cbr->buf);
336 }
337 
free_buffer_rings(struct io_uring * ring,struct conn * c)338 static void free_buffer_rings(struct io_uring *ring, struct conn *c)
339 {
340 	free_buffer_ring(ring, &c->in_br);
341 	free_buffer_ring(ring, &c->out_br);
342 }
343 
344 /*
345  * Setup a ring provided buffer ring for each connection. If we get -ENOBUFS
346  * on receive, for multishot receive we'll wait for half the provided buffers
347  * to be returned by pending sends, then re-arm the multishot receive. If
348  * this happens too frequently (see enobufs= stat), then the ring size is
349  * likely too small. Use -nXX to make it bigger. See recv_enobufs().
350  *
351  * The alternative here would be to use the older style provided buffers,
352  * where you simply setup a buffer group and use SQEs with
353  * io_urign_prep_provide_buffers() to add to the pool. But that approach is
354  * slower and has been deprecated by using the faster ring provided buffers.
355  */
setup_recv_ring(struct io_uring * ring,struct conn * c)356 static int setup_recv_ring(struct io_uring *ring, struct conn *c)
357 {
358 	struct conn_buf_ring *cbr = &c->in_br;
359 	int ret, i;
360 	size_t len;
361 	void *ptr;
362 
363 	len = buf_size * nr_bufs;
364 	if (use_huge) {
365 		cbr->buf = mmap(NULL, len, PROT_READ|PROT_WRITE,
366 				MAP_PRIVATE|MAP_HUGETLB|MAP_HUGE_2MB|MAP_ANONYMOUS,
367 				-1, 0);
368 		if (cbr->buf == MAP_FAILED) {
369 			perror("mmap");
370 			return 1;
371 		}
372 	} else {
373 		if (posix_memalign(&cbr->buf, page_size, len)) {
374 			perror("posix memalign");
375 			return 1;
376 		}
377 	}
378 	cbr->br = io_uring_setup_buf_ring(ring, nr_bufs, cbr->bgid, 0, &ret);
379 	if (!cbr->br) {
380 		fprintf(stderr, "Buffer ring register failed %d\n", ret);
381 		return 1;
382 	}
383 
384 	ptr = cbr->buf;
385 	for (i = 0; i < nr_bufs; i++) {
386 		vlog("%d: add bid %d, data %p\n", c->tid, i, ptr);
387 		io_uring_buf_ring_add(cbr->br, ptr, buf_size, i, br_mask, i);
388 		ptr += buf_size;
389 	}
390 	io_uring_buf_ring_advance(cbr->br, nr_bufs);
391 	printf("%d: recv buffer ring bgid %d, bufs %d\n", c->tid, cbr->bgid, nr_bufs);
392 	return 0;
393 }
394 
395 /*
396  * If 'send_ring' is used and the kernel supports it, we can skip serializing
397  * sends as the data will be ordered regardless. This reduces the send handling
398  * complexity, as buffers can always be added to the outgoing ring and will be
399  * processed in the order in which they were added.
400  */
setup_send_ring(struct io_uring * ring,struct conn * c)401 static int setup_send_ring(struct io_uring *ring, struct conn *c)
402 {
403 	struct conn_buf_ring *cbr = &c->out_br;
404 	int ret;
405 
406 	cbr->br = io_uring_setup_buf_ring(ring, nr_bufs, cbr->bgid, 0, &ret);
407 	if (!cbr->br) {
408 		fprintf(stderr, "Buffer ring register failed %d\n", ret);
409 		return 1;
410 	}
411 
412 	printf("%d: send buffer ring bgid %d, bufs %d\n", c->tid, cbr->bgid, nr_bufs);
413 	return 0;
414 }
415 
setup_send_zc(struct io_uring * ring,struct conn * c)416 static int setup_send_zc(struct io_uring *ring, struct conn *c)
417 {
418 	struct iovec *iovs;
419 	void *buf;
420 	int i, ret;
421 
422 	if (snd_msg)
423 		return 0;
424 
425 	buf = c->in_br.buf;
426 	iovs = calloc(nr_bufs, sizeof(struct iovec));
427 	for (i = 0; i < nr_bufs; i++) {
428 		iovs[i].iov_base = buf;
429 		iovs[i].iov_len = buf_size;
430 		buf += buf_size;
431 	}
432 
433 	ret = io_uring_register_buffers(ring, iovs, nr_bufs);
434 	if (ret) {
435 		fprintf(stderr, "failed registering buffers: %d\n", ret);
436 		free(iovs);
437 		return ret;
438 	}
439 	free(iovs);
440 	return 0;
441 }
442 
443 /*
444  * Setup an input and output buffer ring.
445  */
setup_buffer_rings(struct io_uring * ring,struct conn * c)446 static int setup_buffer_rings(struct io_uring *ring, struct conn *c)
447 {
448 	int ret;
449 
450 	/* no locking needed on cur_bgid, parent serializes setup */
451 	c->in_br.bgid = cur_bgid++;
452 	c->out_br.bgid = cur_bgid++;
453 	c->out_br.br = NULL;
454 
455 	ret = setup_recv_ring(ring, c);
456 	if (ret)
457 		return ret;
458 	if (is_sink)
459 		return 0;
460 	if (snd_zc) {
461 		ret = setup_send_zc(ring, c);
462 		if (ret)
463 			return ret;
464 	}
465 	if (send_ring) {
466 		ret = setup_send_ring(ring, c);
467 		if (ret) {
468 			free_buffer_ring(ring, &c->in_br);
469 			return ret;
470 		}
471 	}
472 
473 	return 0;
474 }
475 
476 struct bucket_stat {
477 	int nr_packets;
478 	int count;
479 };
480 
stat_cmp(const void * p1,const void * p2)481 static int stat_cmp(const void *p1, const void *p2)
482 {
483 	const struct bucket_stat *b1 = p1;
484 	const struct bucket_stat *b2 = p2;
485 
486 	if (b1->count < b2->count)
487 		return 1;
488 	else if (b1->count > b2->count)
489 		return -1;
490 	return 0;
491 }
492 
show_buckets(struct conn_dir * cd)493 static void show_buckets(struct conn_dir *cd)
494 {
495 	unsigned long snd_total, rcv_total;
496 	struct bucket_stat *rstat, *sstat;
497 	int i;
498 
499 	if (!cd->rcv_bucket || !cd->snd_bucket)
500 		return;
501 
502 	rstat = calloc(nr_bufs + 1, sizeof(struct bucket_stat));
503 	sstat = calloc(nr_bufs + 1, sizeof(struct bucket_stat));
504 
505 	snd_total = rcv_total = 0;
506 	for (i = 0; i <= nr_bufs; i++) {
507 		snd_total += cd->snd_bucket[i];
508 		sstat[i].nr_packets = i;
509 		sstat[i].count = cd->snd_bucket[i];
510 		rcv_total += cd->rcv_bucket[i];
511 		rstat[i].nr_packets = i;
512 		rstat[i].count = cd->rcv_bucket[i];
513 	}
514 
515 	if (!snd_total && !rcv_total) {
516 		free(sstat);
517 		free(rstat);
518 	}
519 	if (snd_total)
520 		qsort(sstat, nr_bufs, sizeof(struct bucket_stat), stat_cmp);
521 	if (rcv_total)
522 		qsort(rstat, nr_bufs, sizeof(struct bucket_stat), stat_cmp);
523 
524 	printf("\t Packets per recv/send:\n");
525 	for (i = 0; i <= nr_bufs; i++) {
526 		double snd_prc = 0.0, rcv_prc = 0.0;
527 		if (!rstat[i].count && !sstat[i].count)
528 			continue;
529 		if (rstat[i].count)
530 			rcv_prc = 100.0 * (rstat[i].count / (double) rcv_total);
531 		if (sstat[i].count)
532 			snd_prc = 100.0 * (sstat[i].count / (double) snd_total);
533 		printf("\t bucket(%3d/%3d): rcv=%u (%.2f%%) snd=%u (%.2f%%)\n",
534 				rstat[i].nr_packets, sstat[i].nr_packets,
535 				rstat[i].count, rcv_prc,
536 				sstat[i].count, snd_prc);
537 	}
538 
539 	free(sstat);
540 	free(rstat);
541 }
542 
__show_stats(struct conn * c)543 static void __show_stats(struct conn *c)
544 {
545 	unsigned long msec, qps;
546 	unsigned long bytes, bw;
547 	struct conn_dir *cd;
548 	int i;
549 
550 	if (c->flags & (CONN_F_STATS_SHOWN | CONN_F_REAPED))
551 		return;
552 	if (!(c->flags & CONN_F_STARTED))
553 		return;
554 
555 	if (!(c->flags & CONN_F_END_TIME))
556 		gettimeofday(&c->end_time, NULL);
557 
558 	msec = (c->end_time.tv_sec - c->start_time.tv_sec) * 1000;
559 	msec += (c->end_time.tv_usec - c->start_time.tv_usec) / 1000;
560 
561 	qps = 0;
562 	for (i = 0; i < 2; i++)
563 		qps += c->cd[i].rcv + c->cd[i].snd;
564 
565 	if (!qps)
566 		return;
567 
568 	if (msec)
569 		qps = (qps * 1000) / msec;
570 
571 	printf("Conn %d/(in_fd=%d, out_fd=%d): qps=%lu, msec=%lu\n", c->tid,
572 					c->in_fd, c->out_fd, qps, msec);
573 
574 	bytes = 0;
575 	for (i = 0; i < 2; i++) {
576 		cd = &c->cd[i];
577 
578 		if (!cd->in_bytes && !cd->out_bytes && !cd->snd && !cd->rcv)
579 			continue;
580 
581 		bytes += cd->in_bytes;
582 		bytes += cd->out_bytes;
583 
584 		printf("\t%3d: rcv=%u (short=%u, enobufs=%d), snd=%u (short=%u,"
585 			" busy=%u, enobufs=%d)\n", i, cd->rcv, cd->rcv_shrt,
586 			cd->rcv_enobufs, cd->snd, cd->snd_shrt, cd->snd_busy,
587 			cd->snd_enobufs);
588 		printf("\t   : in_bytes=%lu (Kb %lu), out_bytes=%lu (Kb %lu)\n",
589 			cd->in_bytes, cd->in_bytes >> 10,
590 			cd->out_bytes, cd->out_bytes >> 10);
591 		printf("\t   : mshot_rcv=%d, mshot_snd=%d\n", cd->rcv_mshot,
592 			cd->snd_mshot);
593 		show_buckets(cd);
594 
595 	}
596 	if (msec) {
597 		bytes *= 8UL;
598 		bw = bytes / 1000;
599 		bw /= msec;
600 		printf("\tBW=%'luMbit\n", bw);
601 	}
602 
603 	c->flags |= CONN_F_STATS_SHOWN;
604 }
605 
show_stats(void)606 static void show_stats(void)
607 {
608 	float events_per_loop = 0.0;
609 	static int stats_shown;
610 	int i;
611 
612 	if (stats_shown)
613 		return;
614 
615 	if (events)
616 		events_per_loop = (float) events / (float) event_loops;
617 
618 	printf("Event loops: %lu, events %lu, events per loop %.2f\n", event_loops,
619 							events, events_per_loop);
620 
621 	for (i = 0; i < MAX_CONNS; i++) {
622 		struct conn *c = &conns[i];
623 
624 		__show_stats(c);
625 	}
626 	stats_shown = 1;
627 }
628 
sig_int(int sig)629 static void sig_int(int __attribute__((__unused__)) sig)
630 {
631 	printf("\n");
632 	show_stats();
633 	exit(1);
634 }
635 
636 /*
637  * Special cased for SQPOLL only, as we don't control when SQEs are consumed if
638  * that is used. Hence we may need to wait for the SQPOLL thread to keep up
639  * until we can get a new SQE. All other cases will break immediately, with a
640  * fresh SQE.
641  *
642  * If we grossly undersized our SQ ring, getting a NULL sqe can happen even
643  * for the !SQPOLL case if we're handling a lot of CQEs in our event loop
644  * and multishot isn't used. We can do io_uring_submit() to flush what we
645  * have here. Only caveat here is that if linked requests are used, SQEs
646  * would need to be allocated upfront as a link chain is only valid within
647  * a single submission cycle.
648  */
get_sqe(struct io_uring * ring)649 static struct io_uring_sqe *get_sqe(struct io_uring *ring)
650 {
651 	struct io_uring_sqe *sqe;
652 
653 	do {
654 		sqe = io_uring_get_sqe(ring);
655 		if (sqe)
656 			break;
657 		if (!sqpoll)
658 			io_uring_submit(ring);
659 		else
660 			io_uring_sqring_wait(ring);
661 	} while (1);
662 
663 	return sqe;
664 }
665 
666 /*
667  * See __encode_userdata() for how we encode sqe->user_data, which is passed
668  * back as cqe->user_data at completion time.
669  */
encode_userdata(struct io_uring_sqe * sqe,struct conn * c,int op,int bid,int fd)670 static void encode_userdata(struct io_uring_sqe *sqe, struct conn *c, int op,
671 			    int bid, int fd)
672 {
673 	__encode_userdata(sqe, c->tid, op, bid, fd);
674 }
675 
__submit_receive(struct io_uring * ring,struct conn * c,struct conn_dir * cd,int fd)676 static void __submit_receive(struct io_uring *ring, struct conn *c,
677 			     struct conn_dir *cd, int fd)
678 {
679 	struct conn_buf_ring *cbr = &c->in_br;
680 	struct io_uring_sqe *sqe;
681 
682 	vlog("%d: submit receive fd=%d\n", c->tid, fd);
683 
684 	assert(!cd->pending_recv);
685 	cd->pending_recv = 1;
686 
687 	/*
688 	 * For both recv and multishot receive, we use the ring provided
689 	 * buffers. These are handed to the application ahead of time, and
690 	 * are consumed when a receive triggers. Note that the address and
691 	 * length of the receive are set to NULL/0, and we assign the
692 	 * sqe->buf_group to tell the kernel which buffer group ID to pick
693 	 * a buffer from. Finally, IOSQE_BUFFER_SELECT is set to tell the
694 	 * kernel that we want a buffer picked for this request, we are not
695 	 * passing one in with the request.
696 	 */
697 	sqe = get_sqe(ring);
698 	if (rcv_msg) {
699 		struct io_msg *imsg = &cd->io_rcv_msg;
700 		struct msghdr *msg = &imsg->msg;
701 
702 		memset(msg, 0, sizeof(*msg));
703 		msg->msg_iov = msg_vec(imsg)->iov;
704 		msg->msg_iovlen = msg_vec(imsg)->iov_len;
705 
706 		if (recv_mshot) {
707 			cd->rcv_mshot++;
708 			io_uring_prep_recvmsg_multishot(sqe, fd, &imsg->msg, 0);
709 		} else {
710 			io_uring_prep_recvmsg(sqe, fd, &imsg->msg, 0);
711 		}
712 	} else {
713 		if (recv_mshot) {
714 			cd->rcv_mshot++;
715 			io_uring_prep_recv_multishot(sqe, fd, NULL, 0, 0);
716 		} else {
717 			io_uring_prep_recv(sqe, fd, NULL, 0, 0);
718 		}
719 	}
720 	encode_userdata(sqe, c, __RECV, 0, fd);
721 	sqe->buf_group = cbr->bgid;
722 	sqe->flags |= IOSQE_BUFFER_SELECT;
723 	if (fixed_files)
724 		sqe->flags |= IOSQE_FIXED_FILE;
725 	if (rcv_bundle)
726 		sqe->ioprio |= IORING_RECVSEND_BUNDLE;
727 }
728 
729 /*
730  * One directional just arms receive on our in_fd
731  */
submit_receive(struct io_uring * ring,struct conn * c)732 static void submit_receive(struct io_uring *ring, struct conn *c)
733 {
734 	__submit_receive(ring, c, &c->cd[0], c->in_fd);
735 }
736 
737 /*
738  * Bi-directional arms receive on both in and out fd
739  */
submit_bidi_receive(struct io_uring * ring,struct conn * c)740 static void submit_bidi_receive(struct io_uring *ring, struct conn *c)
741 {
742 	__submit_receive(ring, c, &c->cd[0], c->in_fd);
743 	__submit_receive(ring, c, &c->cd[1], c->out_fd);
744 }
745 
746 /*
747  * We hit -ENOBUFS, which means that we ran out of buffers in our current
748  * provided buffer group. This can happen if there's an imbalance between the
749  * receives coming in and the sends being processed, particularly with multishot
750  * receive as they can trigger very quickly. If this happens, defer arming a
751  * new receive until we've replenished half of the buffer pool by processing
752  * pending sends.
753  */
recv_enobufs(struct io_uring * ring,struct conn * c,struct conn_dir * cd,int fd)754 static void recv_enobufs(struct io_uring *ring, struct conn *c,
755 			 struct conn_dir *cd, int fd)
756 {
757 	vlog("%d: enobufs hit\n", c->tid);
758 
759 	cd->rcv_enobufs++;
760 
761 	/*
762 	 * If we're a sink, mark rcv as rearm. If we're not, then mark us as
763 	 * needing a rearm for receive and send. The completing send will
764 	 * kick the recv rearm.
765 	 */
766 	if (!is_sink) {
767 		int do_recv_arm = 1;
768 
769 		if (!cd->pending_send)
770 			do_recv_arm = !prep_next_send(ring, c, cd, fd);
771 		if (do_recv_arm)
772 			__submit_receive(ring, c, &c->cd[0], c->in_fd);
773 	} else {
774 		__submit_receive(ring, c, &c->cd[0], c->in_fd);
775 	}
776 }
777 
778 /*
779  * Kill this socket - submit a shutdown and link a close to it. We don't
780  * care about shutdown status, so mark it as not needing to post a CQE unless
781  * it fails.
782  */
queue_shutdown_close(struct io_uring * ring,struct conn * c,int fd)783 static void queue_shutdown_close(struct io_uring *ring, struct conn *c, int fd)
784 {
785 	struct io_uring_sqe *sqe1, *sqe2;
786 
787 	/*
788 	 * On the off chance that we run out of SQEs after the first one,
789 	 * grab two upfront. This it to prevent our link not working if
790 	 * get_sqe() ends up doing submissions to free up an SQE, as links
791 	 * are not valid across separate submissions.
792 	 */
793 	sqe1 = get_sqe(ring);
794 	sqe2 = get_sqe(ring);
795 
796 	io_uring_prep_shutdown(sqe1, fd, SHUT_RDWR);
797 	if (fixed_files)
798 		sqe1->flags |= IOSQE_FIXED_FILE;
799 	sqe1->flags |= IOSQE_IO_LINK | IOSQE_CQE_SKIP_SUCCESS;
800 	encode_userdata(sqe1, c, __SHUTDOWN, 0, fd);
801 
802 	if (fixed_files)
803 		io_uring_prep_close_direct(sqe2, fd);
804 	else
805 		io_uring_prep_close(sqe2, fd);
806 	encode_userdata(sqe2, c, __CLOSE, 0, fd);
807 }
808 
809 /*
810  * This connection is going away, queue a cancel for any pending recv, for
811  * example, we have pending for this ring. For completeness, we issue a cancel
812  * for any request we have pending for both in_fd and out_fd.
813  */
queue_cancel(struct io_uring * ring,struct conn * c)814 static void queue_cancel(struct io_uring *ring, struct conn *c)
815 {
816 	struct io_uring_sqe *sqe;
817 	int flags = 0;
818 
819 	if (fixed_files)
820 		flags |= IORING_ASYNC_CANCEL_FD_FIXED;
821 
822 	sqe = get_sqe(ring);
823 	io_uring_prep_cancel_fd(sqe, c->in_fd, flags);
824 	encode_userdata(sqe, c, __CANCEL, 0, c->in_fd);
825 	c->pending_cancels++;
826 
827 	if (c->out_fd != -1) {
828 		sqe = get_sqe(ring);
829 		io_uring_prep_cancel_fd(sqe, c->out_fd, flags);
830 		encode_userdata(sqe, c, __CANCEL, 0, c->out_fd);
831 		c->pending_cancels++;
832 	}
833 
834 	io_uring_submit(ring);
835 }
836 
pending_shutdown(struct conn * c)837 static int pending_shutdown(struct conn *c)
838 {
839 	return c->cd[0].pending_shutdown + c->cd[1].pending_shutdown;
840 }
841 
should_shutdown(struct conn * c)842 static bool should_shutdown(struct conn *c)
843 {
844 	int i;
845 
846 	if (!pending_shutdown(c))
847 		return false;
848 	if (is_sink)
849 		return true;
850 	if (!bidi)
851 		return c->cd[0].in_bytes == c->cd[1].out_bytes;
852 
853 	for (i = 0; i < 2; i++) {
854 		if (c->cd[0].rcv != c->cd[1].snd)
855 			return false;
856 		if (c->cd[1].rcv != c->cd[0].snd)
857 			return false;
858 	}
859 
860 	return true;
861 }
862 
863 /*
864  * Close this connection - send a ring message to the connection with intent
865  * to stop. When the client gets the message, it will initiate the stop.
866  */
__close_conn(struct io_uring * ring,struct conn * c)867 static void __close_conn(struct io_uring *ring, struct conn *c)
868 {
869 	struct io_uring_sqe *sqe;
870 	uint64_t user_data;
871 
872 	printf("Client %d: queueing stop\n", c->tid);
873 
874 	user_data = __raw_encode(c->tid, __STOP, 0, 0);
875 	sqe = io_uring_get_sqe(ring);
876 	io_uring_prep_msg_ring(sqe, c->ring.ring_fd, 0, user_data, 0);
877 	encode_userdata(sqe, c, __NOP, 0, 0);
878 	io_uring_submit(ring);
879 }
880 
close_cd(struct conn * c,struct conn_dir * cd)881 static void close_cd(struct conn *c, struct conn_dir *cd)
882 {
883 	cd->pending_shutdown = 1;
884 
885 	if (cd->pending_send)
886 		return;
887 
888 	if (!(c->flags & CONN_F_PENDING_SHUTDOWN)) {
889 		gettimeofday(&c->end_time, NULL);
890 		c->flags |= CONN_F_PENDING_SHUTDOWN | CONN_F_END_TIME;
891 	}
892 }
893 
894 /*
895  * We're done with this buffer, add it back to our pool so the kernel is
896  * free to use it again.
897  */
replenish_buffer(struct conn_buf_ring * cbr,int bid,int offset)898 static int replenish_buffer(struct conn_buf_ring *cbr, int bid, int offset)
899 {
900 	void *this_buf = cbr->buf + bid * buf_size;
901 
902 	assert(bid < nr_bufs);
903 
904 	io_uring_buf_ring_add(cbr->br, this_buf, buf_size, bid, br_mask, offset);
905 	return buf_size;
906 }
907 
908 /*
909  * Iterate buffers from '*bid' and with a total size of 'bytes' and add them
910  * back to our receive ring so they can be reused for new receives.
911  */
replenish_buffers(struct conn * c,int * bid,int bytes)912 static int replenish_buffers(struct conn *c, int *bid, int bytes)
913 {
914 	struct conn_buf_ring *cbr = &c->in_br;
915 	int nr_packets = 0;
916 
917 	while (bytes) {
918 		int this_len = replenish_buffer(cbr, *bid, nr_packets);
919 
920 		if (this_len > bytes)
921 			this_len = bytes;
922 		bytes -= this_len;
923 
924 		*bid = (*bid + 1) & (nr_bufs - 1);
925 		nr_packets++;
926 	}
927 
928 	io_uring_buf_ring_advance(cbr->br, nr_packets);
929 	return nr_packets;
930 }
931 
free_mvec(struct msg_vec * mvec)932 static void free_mvec(struct msg_vec *mvec)
933 {
934 	free(mvec->iov);
935 	mvec->iov = NULL;
936 }
937 
init_mvec(struct msg_vec * mvec)938 static void init_mvec(struct msg_vec *mvec)
939 {
940 	memset(mvec, 0, sizeof(*mvec));
941 	mvec->iov = malloc(sizeof(struct iovec));
942 	mvec->vec_size = 1;
943 }
944 
init_msgs(struct conn_dir * cd)945 static void init_msgs(struct conn_dir *cd)
946 {
947 	memset(&cd->io_snd_msg, 0, sizeof(cd->io_snd_msg));
948 	memset(&cd->io_rcv_msg, 0, sizeof(cd->io_rcv_msg));
949 	init_mvec(&cd->io_snd_msg.vecs[0]);
950 	init_mvec(&cd->io_snd_msg.vecs[1]);
951 	init_mvec(&cd->io_rcv_msg.vecs[0]);
952 }
953 
free_msgs(struct conn_dir * cd)954 static void free_msgs(struct conn_dir *cd)
955 {
956 	free_mvec(&cd->io_snd_msg.vecs[0]);
957 	free_mvec(&cd->io_snd_msg.vecs[1]);
958 	free_mvec(&cd->io_rcv_msg.vecs[0]);
959 }
960 
961 /*
962  * Multishot accept completion triggered. If we're acting as a sink, we're
963  * good to go. Just issue a receive for that case. If we're acting as a proxy,
964  * then start opening a socket that we can use to connect to the other end.
965  */
handle_accept(struct io_uring * ring,struct io_uring_cqe * cqe)966 static int handle_accept(struct io_uring *ring, struct io_uring_cqe *cqe)
967 {
968 	struct conn *c;
969 	int i;
970 
971 	if (nr_conns == MAX_CONNS) {
972 		fprintf(stderr, "max clients reached %d\n", nr_conns);
973 		return 1;
974 	}
975 
976 	/* main thread handles this, which is obviously serialized */
977 	c = &conns[nr_conns];
978 	c->tid = nr_conns++;
979 	c->in_fd = -1;
980 	c->out_fd = -1;
981 
982 	for (i = 0; i < 2; i++) {
983 		struct conn_dir *cd = &c->cd[i];
984 
985 		cd->index = i;
986 		cd->snd_next_bid = -1;
987 		cd->rcv_next_bid = -1;
988 		if (ext_stat) {
989 			cd->rcv_bucket = calloc(nr_bufs + 1, sizeof(int));
990 			cd->snd_bucket = calloc(nr_bufs + 1, sizeof(int));
991 		}
992 		init_msgs(cd);
993 	}
994 
995 	printf("New client: id=%d, in=%d\n", c->tid, c->in_fd);
996 	gettimeofday(&c->start_time, NULL);
997 
998 	pthread_barrier_init(&c->startup_barrier, NULL, 2);
999 	pthread_create(&c->thread, NULL, thread_main, c);
1000 
1001 	/*
1002 	 * Wait for thread to have its ring setup, then either assign the fd
1003 	 * if it's non-fixed, or pass the fixed one
1004 	 */
1005 	pthread_barrier_wait(&c->startup_barrier);
1006 	if (!fixed_files) {
1007 		c->in_fd = cqe->res;
1008 	} else {
1009 		struct io_uring_sqe *sqe;
1010 		uint64_t user_data;
1011 
1012 		/*
1013 		 * Ring has just been setup, we'll use index 0 as the descriptor
1014 		 * value.
1015 		 */
1016 		user_data = __raw_encode(c->tid, __FD_PASS, 0, 0);
1017 		sqe = io_uring_get_sqe(ring);
1018 		io_uring_prep_msg_ring_fd(sqe, c->ring.ring_fd, cqe->res, 0,
1019 						user_data, 0);
1020 		encode_userdata(sqe, c, __NOP, 0, cqe->res);
1021 	}
1022 
1023 	return 0;
1024 }
1025 
1026 /*
1027  * Our socket request completed, issue a connect request to the other end.
1028  */
handle_sock(struct io_uring * ring,struct io_uring_cqe * cqe)1029 static int handle_sock(struct io_uring *ring, struct io_uring_cqe *cqe)
1030 {
1031 	struct conn *c = cqe_to_conn(cqe);
1032 	struct io_uring_sqe *sqe;
1033 	int ret;
1034 
1035 	vlog("%d: sock: res=%d\n", c->tid, cqe->res);
1036 
1037 	c->out_fd = cqe->res;
1038 
1039 	if (ipv6) {
1040 		memset(&c->addr6, 0, sizeof(c->addr6));
1041 		c->addr6.sin6_family = AF_INET6;
1042 		c->addr6.sin6_port = htons(send_port);
1043 		ret = inet_pton(AF_INET6, host, &c->addr6.sin6_addr);
1044 	} else {
1045 		memset(&c->addr, 0, sizeof(c->addr));
1046 		c->addr.sin_family = AF_INET;
1047 		c->addr.sin_port = htons(send_port);
1048 		ret = inet_pton(AF_INET, host, &c->addr.sin_addr);
1049 	}
1050 	if (ret <= 0) {
1051 		if (!ret)
1052 			fprintf(stderr, "host not in right format\n");
1053 		else
1054 			perror("inet_pton");
1055 		return 1;
1056 	}
1057 
1058 	sqe = get_sqe(ring);
1059 	if (ipv6) {
1060 		io_uring_prep_connect(sqe, c->out_fd,
1061 					(struct sockaddr *) &c->addr6,
1062 					sizeof(c->addr6));
1063 	} else {
1064 		io_uring_prep_connect(sqe, c->out_fd,
1065 					(struct sockaddr *) &c->addr,
1066 					sizeof(c->addr));
1067 	}
1068 	encode_userdata(sqe, c, __CONNECT, 0, c->out_fd);
1069 	if (fixed_files)
1070 		sqe->flags |= IOSQE_FIXED_FILE;
1071 	return 0;
1072 }
1073 
1074 /*
1075  * Connection to the other end is done, submit a receive to start receiving
1076  * data. If we're a bidirectional proxy, issue a receive on both ends. If not,
1077  * then just a single recv will do.
1078  */
handle_connect(struct io_uring * ring,struct io_uring_cqe * cqe)1079 static int handle_connect(struct io_uring *ring, struct io_uring_cqe *cqe)
1080 {
1081 	struct conn *c = cqe_to_conn(cqe);
1082 
1083 	pthread_mutex_lock(&thread_lock);
1084 	open_conns++;
1085 	pthread_mutex_unlock(&thread_lock);
1086 
1087 	if (bidi)
1088 		submit_bidi_receive(ring, c);
1089 	else
1090 		submit_receive(ring, c);
1091 
1092 	return 0;
1093 }
1094 
1095 /*
1096  * Append new segment to our currently active msg_vec. This will be submitted
1097  * as a sendmsg (with all of it), or as separate sends, later. If we're using
1098  * send_ring, then we won't hit this path. Instead, outgoing buffers are
1099  * added directly to our outgoing send buffer ring.
1100  */
send_append_vec(struct conn_dir * cd,void * data,int len)1101 static void send_append_vec(struct conn_dir *cd, void *data, int len)
1102 {
1103 	struct msg_vec *mvec = snd_msg_vec(cd);
1104 
1105 	if (mvec->iov_len == mvec->vec_size) {
1106 		mvec->vec_size <<= 1;
1107 		mvec->iov = realloc(mvec->iov, mvec->vec_size * sizeof(struct iovec));
1108 	}
1109 
1110 	mvec->iov[mvec->iov_len].iov_base = data;
1111 	mvec->iov[mvec->iov_len].iov_len = len;
1112 	mvec->iov_len++;
1113 }
1114 
1115 /*
1116  * Queue a send based on the data received in this cqe, which came from
1117  * a completed receive operation.
1118  */
send_append(struct conn * c,struct conn_dir * cd,void * data,int bid,int len)1119 static void send_append(struct conn *c, struct conn_dir *cd, void *data,
1120 			int bid, int len)
1121 {
1122 	vlog("%d: send %d (%p, bid %d)\n", c->tid, len, data, bid);
1123 
1124 	assert(bid < nr_bufs);
1125 
1126 	/* if using provided buffers for send, add it upfront */
1127 	if (send_ring) {
1128 		struct conn_buf_ring *cbr = &c->out_br;
1129 
1130 		io_uring_buf_ring_add(cbr->br, data, len, bid, br_mask, 0);
1131 		io_uring_buf_ring_advance(cbr->br, 1);
1132 	} else {
1133 		send_append_vec(cd, data, len);
1134 	}
1135 }
1136 
1137 /*
1138  * For non recvmsg && multishot, a zero receive marks the end. For recvmsg
1139  * with multishot, we always get the header regardless. Hence a "zero receive"
1140  * is the size of the header.
1141  */
recv_done_res(int res)1142 static int recv_done_res(int res)
1143 {
1144 	if (!res)
1145 		return 1;
1146 	if (rcv_msg && recv_mshot && res == sizeof(struct io_uring_recvmsg_out))
1147 		return 1;
1148 	return 0;
1149 }
1150 
1151 /*
1152  * Any receive that isn't recvmsg with multishot can be handled the same way.
1153  * Iterate from '*bid' and 'in_bytes' in total, and append the data to the
1154  * outgoing queue.
1155  */
recv_bids(struct conn * c,struct conn_dir * cd,int * bid,int in_bytes)1156 static int recv_bids(struct conn *c, struct conn_dir *cd, int *bid, int in_bytes)
1157 {
1158 	struct conn_buf_ring *cbr = &c->out_br;
1159 	struct conn_buf_ring *in_cbr = &c->in_br;
1160 	struct io_uring_buf *buf;
1161 	int nr_packets = 0;
1162 
1163 	while (in_bytes) {
1164 		int this_bytes;
1165 		void *data;
1166 
1167 		buf = &in_cbr->br->bufs[*bid];
1168 		data = (void *) (unsigned long) buf->addr;
1169 		this_bytes = buf->len;
1170 		if (this_bytes > in_bytes)
1171 			this_bytes = in_bytes;
1172 
1173 		in_bytes -= this_bytes;
1174 
1175 		if (send_ring)
1176 			io_uring_buf_ring_add(cbr->br, data, this_bytes, *bid,
1177 						br_mask, nr_packets);
1178 		else
1179 			send_append(c, cd, data, *bid, this_bytes);
1180 
1181 		*bid = (*bid + 1) & (nr_bufs - 1);
1182 		nr_packets++;
1183 	}
1184 
1185 	if (send_ring)
1186 		io_uring_buf_ring_advance(cbr->br, nr_packets);
1187 
1188 	return nr_packets;
1189 }
1190 
1191 /*
1192  * Special handling of recvmsg with multishot
1193  */
recv_mshot_msg(struct conn * c,struct conn_dir * cd,int * bid,int in_bytes)1194 static int recv_mshot_msg(struct conn *c, struct conn_dir *cd, int *bid,
1195 			  int in_bytes)
1196 {
1197 	struct conn_buf_ring *cbr = &c->out_br;
1198 	struct conn_buf_ring *in_cbr = &c->in_br;
1199 	struct io_uring_buf *buf;
1200 	int nr_packets = 0;
1201 
1202 	while (in_bytes) {
1203 		struct io_uring_recvmsg_out *pdu;
1204 		int this_bytes;
1205 		void *data;
1206 
1207 		buf = &in_cbr->br->bufs[*bid];
1208 
1209 		/*
1210 		 * multishot recvmsg puts a header in front of the data - we
1211 		 * have to take that into account for the send setup, and
1212 		 * adjust the actual data read to not take this metadata into
1213 		 * account. For this use case, namelen and controllen will not
1214 		 * be set. If they were, they would need to be factored in too.
1215 		 */
1216 		buf->len -= sizeof(struct io_uring_recvmsg_out);
1217 		in_bytes -= sizeof(struct io_uring_recvmsg_out);
1218 
1219 		pdu = (void *) (unsigned long) buf->addr;
1220 		vlog("pdu namelen %d, controllen %d, payload %d flags %x\n",
1221 				pdu->namelen, pdu->controllen, pdu->payloadlen,
1222 				pdu->flags);
1223 		data = (void *) (pdu + 1);
1224 
1225 		this_bytes = pdu->payloadlen;
1226 		if (this_bytes > in_bytes)
1227 			this_bytes = in_bytes;
1228 
1229 		in_bytes -= this_bytes;
1230 
1231 		if (send_ring)
1232 			io_uring_buf_ring_add(cbr->br, data, this_bytes, *bid,
1233 						br_mask, nr_packets);
1234 		else
1235 			send_append(c, cd, data, *bid, this_bytes);
1236 
1237 		*bid = (*bid + 1) & (nr_bufs - 1);
1238 		nr_packets++;
1239 	}
1240 
1241 	if (send_ring)
1242 		io_uring_buf_ring_advance(cbr->br, nr_packets);
1243 
1244 	return nr_packets;
1245 }
1246 
__handle_recv(struct io_uring * ring,struct conn * c,struct conn_dir * cd,struct io_uring_cqe * cqe)1247 static int __handle_recv(struct io_uring *ring, struct conn *c,
1248 			 struct conn_dir *cd, struct io_uring_cqe *cqe)
1249 {
1250 	struct conn_dir *ocd = &c->cd[!cd->index];
1251 	int bid, nr_packets;
1252 
1253 	/*
1254 	 * Not having a buffer attached should only happen if we get a zero
1255 	 * sized receive, because the other end closed the connection. It
1256 	 * cannot happen otherwise, as all our receives are using provided
1257 	 * buffers and hence it's not possible to return a CQE with a non-zero
1258 	 * result and not have a buffer attached.
1259 	 */
1260 	if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
1261 		cd->pending_recv = 0;
1262 
1263 		if (!recv_done_res(cqe->res)) {
1264 			fprintf(stderr, "no buffer assigned, res=%d\n", cqe->res);
1265 			return 1;
1266 		}
1267 start_close:
1268 		prep_next_send(ring, c, ocd, other_dir_fd(c, cqe_to_fd(cqe)));
1269 		close_cd(c, cd);
1270 		return 0;
1271 	}
1272 
1273 	if (cqe->res && cqe->res < buf_size)
1274 		cd->rcv_shrt++;
1275 
1276 	bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
1277 
1278 	/*
1279 	 * BIDI will use the same buffer pool and do receive on both CDs,
1280 	 * so can't reliably check. TODO.
1281 	 */
1282 	if (!bidi && cd->rcv_next_bid != -1 && bid != cd->rcv_next_bid) {
1283 		fprintf(stderr, "recv bid %d, wanted %d\n", bid, cd->rcv_next_bid);
1284 		goto start_close;
1285 	}
1286 
1287 	vlog("%d: recv: bid=%d, res=%d, cflags=%x\n", c->tid, bid, cqe->res, cqe->flags);
1288 	/*
1289 	 * If we're a sink, we're done here. Just replenish the buffer back
1290 	 * to the pool. For proxy mode, we will send the data to the other
1291 	 * end and the buffer will be replenished once the send is done with
1292 	 * it.
1293 	 */
1294 	if (is_sink)
1295 		nr_packets = replenish_buffers(c, &bid, cqe->res);
1296 	else if (rcv_msg && recv_mshot)
1297 		nr_packets = recv_mshot_msg(c, ocd, &bid, cqe->res);
1298 	else
1299 		nr_packets = recv_bids(c, ocd, &bid, cqe->res);
1300 
1301 	if (cd->rcv_bucket)
1302 		cd->rcv_bucket[nr_packets]++;
1303 
1304 	if (!is_sink) {
1305 		ocd->out_buffers += nr_packets;
1306 		assert(ocd->out_buffers <= nr_bufs);
1307 	}
1308 
1309 	cd->rcv++;
1310 	cd->rcv_next_bid = bid;
1311 
1312 	/*
1313 	 * If IORING_CQE_F_MORE isn't set, then this is either a normal recv
1314 	 * that needs rearming, or it's a multishot that won't post any further
1315 	 * completions. Setup a new one for these cases.
1316 	 */
1317 	if (!(cqe->flags & IORING_CQE_F_MORE)) {
1318 		cd->pending_recv = 0;
1319 		if (recv_done_res(cqe->res))
1320 			goto start_close;
1321 		if (is_sink)
1322 			__submit_receive(ring, c, &c->cd[0], c->in_fd);
1323 	}
1324 
1325 	/*
1326 	 * Submit a send if we won't get anymore notifications from this
1327 	 * recv, or if we have nr_bufs / 2 queued up. If BIDI mode, send
1328 	 * every buffer. We assume this is interactive mode, and hence don't
1329 	 * delay anything.
1330 	 */
1331 	if (((!ocd->pending_send && (bidi || (ocd->out_buffers >= nr_bufs / 2))) ||
1332 	    !(cqe->flags & IORING_CQE_F_MORE)) && !is_sink)
1333 		prep_next_send(ring, c, ocd, other_dir_fd(c, cqe_to_fd(cqe)));
1334 
1335 	if (!recv_done_res(cqe->res))
1336 		cd->in_bytes += cqe->res;
1337 	return 0;
1338 }
1339 
handle_recv(struct io_uring * ring,struct io_uring_cqe * cqe)1340 static int handle_recv(struct io_uring *ring, struct io_uring_cqe *cqe)
1341 {
1342 	struct conn *c = cqe_to_conn(cqe);
1343 	struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
1344 
1345 	return __handle_recv(ring, c, cd, cqe);
1346 }
1347 
recv_error(struct error_handler * err,struct io_uring * ring,struct io_uring_cqe * cqe)1348 static int recv_error(struct error_handler *err, struct io_uring *ring,
1349 		      struct io_uring_cqe *cqe)
1350 {
1351 	struct conn *c = cqe_to_conn(cqe);
1352 	struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
1353 
1354 	cd->pending_recv = 0;
1355 
1356 	if (cqe->res != -ENOBUFS)
1357 		return default_error(err, ring, cqe);
1358 
1359 	recv_enobufs(ring, c, cd, other_dir_fd(c, cqe_to_fd(cqe)));
1360 	return 0;
1361 }
1362 
submit_send(struct io_uring * ring,struct conn * c,struct conn_dir * cd,int fd,void * data,int len,int bid,int flags)1363 static void submit_send(struct io_uring *ring, struct conn *c,
1364 			struct conn_dir *cd, int fd, void *data, int len,
1365 			int bid, int flags)
1366 {
1367 	struct io_uring_sqe *sqe;
1368 	int bgid = c->out_br.bgid;
1369 
1370 	if (cd->pending_send)
1371 		return;
1372 	cd->pending_send = 1;
1373 
1374 	flags |= MSG_WAITALL | MSG_NOSIGNAL;
1375 
1376 	sqe = get_sqe(ring);
1377 	if (snd_msg) {
1378 		struct io_msg *imsg = &cd->io_snd_msg;
1379 
1380 		if (snd_zc) {
1381 			io_uring_prep_sendmsg_zc(sqe, fd, &imsg->msg, flags);
1382 			cd->snd_notif++;
1383 		} else {
1384 			io_uring_prep_sendmsg(sqe, fd, &imsg->msg, flags);
1385 		}
1386 	} else if (send_ring) {
1387 		io_uring_prep_send(sqe, fd, NULL, 0, flags);
1388 	} else if (!snd_zc) {
1389 		io_uring_prep_send(sqe, fd, data, len, flags);
1390 	} else {
1391 		io_uring_prep_send_zc(sqe, fd, data, len, flags, 0);
1392 		sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
1393 		sqe->buf_index = bid;
1394 		cd->snd_notif++;
1395 	}
1396 	encode_userdata(sqe, c, __SEND, bid, fd);
1397 	if (fixed_files)
1398 		sqe->flags |= IOSQE_FIXED_FILE;
1399 	if (send_ring) {
1400 		sqe->flags |= IOSQE_BUFFER_SELECT;
1401 		sqe->buf_group = bgid;
1402 	}
1403 	if (snd_bundle) {
1404 		sqe->ioprio |= IORING_RECVSEND_BUNDLE;
1405 		cd->snd_mshot++;
1406 	} else if (send_ring)
1407 		cd->snd_mshot++;
1408 }
1409 
1410 /*
1411  * Prepare the next send request, if we need to. If one is already pending,
1412  * or if we're a sink and we don't need to do sends, then there's nothing
1413  * to do.
1414  *
1415  * Return 1 if another send completion is expected, 0 if not.
1416  */
prep_next_send(struct io_uring * ring,struct conn * c,struct conn_dir * cd,int fd)1417 static int prep_next_send(struct io_uring *ring, struct conn *c,
1418 			   struct conn_dir *cd, int fd)
1419 {
1420 	int bid;
1421 
1422 	if (cd->pending_send || is_sink)
1423 		return 0;
1424 	if (!cd->out_buffers)
1425 		return 0;
1426 
1427 	bid = cd->snd_next_bid;
1428 	if (bid == -1)
1429 		bid = 0;
1430 
1431 	if (send_ring) {
1432 		/*
1433 		 * send_ring mode is easy, there's nothing to do but submit
1434 		 * our next send request. That will empty the entire outgoing
1435 		 * queue.
1436 		 */
1437 		submit_send(ring, c, cd, fd, NULL, 0, bid, 0);
1438 		return 1;
1439 	} else if (snd_msg) {
1440 		/*
1441 		 * For sendmsg mode, submit our currently prepared iovec, if
1442 		 * we have one, and swap our iovecs so that any further
1443 		 * receives will start preparing that one.
1444 		 */
1445 		struct io_msg *imsg = &cd->io_snd_msg;
1446 
1447 		if (!msg_vec(imsg)->iov_len)
1448 			return 0;
1449 		imsg->msg.msg_iov = msg_vec(imsg)->iov;
1450 		imsg->msg.msg_iovlen = msg_vec(imsg)->iov_len;
1451 		msg_vec(imsg)->iov_len = 0;
1452 		imsg->vec_index = !imsg->vec_index;
1453 		submit_send(ring, c, cd, fd, NULL, 0, bid, 0);
1454 		return 1;
1455 	} else {
1456 		/*
1457 		 * send without send_ring - submit the next available vec,
1458 		 * if any. If this vec is the last one in the current series,
1459 		 * then swap to the next vec. We flag each send with MSG_MORE,
1460 		 * unless this is the last part of the current vec.
1461 		 */
1462 		struct io_msg *imsg = &cd->io_snd_msg;
1463 		struct msg_vec *mvec = msg_vec(imsg);
1464 		int flags = !snd_zc ? MSG_MORE : 0;
1465 		struct iovec *iov;
1466 
1467 		if (mvec->iov_len == mvec->cur_iov)
1468 			return 0;
1469 		imsg->msg.msg_iov = msg_vec(imsg)->iov;
1470 		iov = &mvec->iov[mvec->cur_iov];
1471 		mvec->cur_iov++;
1472 		if (mvec->cur_iov == mvec->iov_len) {
1473 			mvec->iov_len = 0;
1474 			mvec->cur_iov = 0;
1475 			imsg->vec_index = !imsg->vec_index;
1476 			flags = 0;
1477 		}
1478 		submit_send(ring, c, cd, fd, iov->iov_base, iov->iov_len, bid, flags);
1479 		return 1;
1480 	}
1481 }
1482 
1483 /*
1484  * Handling a send with an outgoing send ring. Get the buffers from the
1485  * receive side, and add them to the ingoing buffer ring again.
1486  */
handle_send_ring(struct conn * c,struct conn_dir * cd,int bid,int bytes)1487 static int handle_send_ring(struct conn *c, struct conn_dir *cd,
1488 			    int bid, int bytes)
1489 {
1490 	struct conn_buf_ring *in_cbr = &c->in_br;
1491 	struct conn_buf_ring *out_cbr = &c->out_br;
1492 	int i = 0;
1493 
1494 	while (bytes) {
1495 		struct io_uring_buf *buf = &out_cbr->br->bufs[bid];
1496 		int this_bytes;
1497 		void *this_buf;
1498 
1499 		this_bytes = buf->len;
1500 		if (this_bytes > bytes)
1501 			this_bytes = bytes;
1502 
1503 		cd->out_bytes += this_bytes;
1504 
1505 		vlog("%d: send: bid=%d, len=%d\n", c->tid, bid, this_bytes);
1506 
1507 		this_buf = in_cbr->buf + bid * buf_size;
1508 		io_uring_buf_ring_add(in_cbr->br, this_buf, buf_size, bid, br_mask, i);
1509 		/*
1510 		 * Find the provided buffer that the receive consumed, and
1511 		 * which we then used for the send, and add it back to the
1512 		 * pool so it can get picked by another receive. Once the send
1513 		 * is done, we're done with it.
1514 		 */
1515 		bid = (bid + 1) & (nr_bufs - 1);
1516 		bytes -= this_bytes;
1517 		i++;
1518 	}
1519 	cd->snd_next_bid = bid;
1520 	io_uring_buf_ring_advance(in_cbr->br, i);
1521 
1522 	if (pending_shutdown(c))
1523 		close_cd(c, cd);
1524 
1525 	return i;
1526 }
1527 
1528 /*
1529  * sendmsg, or send without a ring. Just add buffers back to the ingoing
1530  * ring for receives.
1531  */
handle_send_buf(struct conn * c,struct conn_dir * cd,int bid,int bytes)1532 static int handle_send_buf(struct conn *c, struct conn_dir *cd, int bid,
1533 			   int bytes)
1534 {
1535 	struct conn_buf_ring *in_cbr = &c->in_br;
1536 	int i = 0;
1537 
1538 	while (bytes) {
1539 		struct io_uring_buf *buf = &in_cbr->br->bufs[bid];
1540 		int this_bytes;
1541 
1542 		this_bytes = bytes;
1543 		if (this_bytes > buf->len)
1544 			this_bytes = buf->len;
1545 
1546 		vlog("%d: send: bid=%d, len=%d\n", c->tid, bid, this_bytes);
1547 
1548 		cd->out_bytes += this_bytes;
1549 		/* each recvmsg mshot package has this overhead */
1550 		if (rcv_msg && recv_mshot)
1551 			cd->out_bytes += sizeof(struct io_uring_recvmsg_out);
1552 		replenish_buffer(in_cbr, bid, i);
1553 		bid = (bid + 1) & (nr_bufs - 1);
1554 		bytes -= this_bytes;
1555 		i++;
1556 	}
1557 	io_uring_buf_ring_advance(in_cbr->br, i);
1558 	cd->snd_next_bid = bid;
1559 	return i;
1560 }
1561 
__handle_send(struct io_uring * ring,struct conn * c,struct conn_dir * cd,struct io_uring_cqe * cqe)1562 static int __handle_send(struct io_uring *ring, struct conn *c,
1563 			 struct conn_dir *cd, struct io_uring_cqe *cqe)
1564 {
1565 	struct conn_dir *ocd;
1566 	int bid, nr_packets;
1567 
1568 	if (send_ring) {
1569 		if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
1570 			fprintf(stderr, "no buffer in send?! %d\n", cqe->res);
1571 			return 1;
1572 		}
1573 		bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
1574 	} else {
1575 		bid = cqe_to_bid(cqe);
1576 	}
1577 
1578 	/*
1579 	 * CQE notifications only happen with send/sendmsg zerocopy. They
1580 	 * tell us that the data has been acked, and that hence the buffer
1581 	 * is now free to reuse. Waiting on an ACK for each packet will slow
1582 	 * us down tremendously, so do all of our sends and then wait for
1583 	 * the ACKs to come in. They tend to come in bundles anyway. Once
1584 	 * all acks are done (cd->snd_notif == 0), then fire off the next
1585 	 * receive.
1586 	 */
1587 	if (cqe->flags & IORING_CQE_F_NOTIF) {
1588 		cd->snd_notif--;
1589 	} else {
1590 		if (cqe->res && cqe->res < buf_size)
1591 			cd->snd_shrt++;
1592 
1593 		/*
1594 		 * BIDI will use the same buffer pool and do sends on both CDs,
1595 		 * so can't reliably check. TODO.
1596 		 */
1597 		if (!bidi && send_ring && cd->snd_next_bid != -1 &&
1598 		    bid != cd->snd_next_bid) {
1599 			fprintf(stderr, "send bid %d, wanted %d at %lu\n", bid,
1600 					cd->snd_next_bid, cd->out_bytes);
1601 			goto out_close;
1602 		}
1603 
1604 		assert(bid <= nr_bufs);
1605 
1606 		vlog("send: got %d, %lu\n", cqe->res, cd->out_bytes);
1607 
1608 		if (send_ring)
1609 			nr_packets = handle_send_ring(c, cd, bid, cqe->res);
1610 		else
1611 			nr_packets = handle_send_buf(c, cd, bid, cqe->res);
1612 
1613 		if (cd->snd_bucket)
1614 			cd->snd_bucket[nr_packets]++;
1615 
1616 		cd->out_buffers -= nr_packets;
1617 		assert(cd->out_buffers >= 0);
1618 
1619 		cd->snd++;
1620 	}
1621 
1622 	if (!(cqe->flags & IORING_CQE_F_MORE)) {
1623 		int do_recv_arm;
1624 
1625 		cd->pending_send = 0;
1626 
1627 		/*
1628 		 * send done - see if the current vec has data to submit, and
1629 		 * do so if it does. if it doesn't have data yet, nothing to
1630 		 * do.
1631 		 */
1632 		do_recv_arm = !prep_next_send(ring, c, cd, cqe_to_fd(cqe));
1633 
1634 		ocd = &c->cd[!cd->index];
1635 		if (!cd->snd_notif && do_recv_arm && !ocd->pending_recv) {
1636 			int fd = other_dir_fd(c, cqe_to_fd(cqe));
1637 
1638 			__submit_receive(ring, c, ocd, fd);
1639 		}
1640 out_close:
1641 		if (pending_shutdown(c))
1642 			close_cd(c, cd);
1643 	}
1644 
1645 	vlog("%d: pending sends %d\n", c->tid, cd->pending_send);
1646 	return 0;
1647 }
1648 
handle_send(struct io_uring * ring,struct io_uring_cqe * cqe)1649 static int handle_send(struct io_uring *ring, struct io_uring_cqe *cqe)
1650 {
1651 	struct conn *c = cqe_to_conn(cqe);
1652 	struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
1653 
1654 	return __handle_send(ring, c, cd, cqe);
1655 }
1656 
send_error(struct error_handler * err,struct io_uring * ring,struct io_uring_cqe * cqe)1657 static int send_error(struct error_handler *err, struct io_uring *ring,
1658 		      struct io_uring_cqe *cqe)
1659 {
1660 	struct conn *c = cqe_to_conn(cqe);
1661 	struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
1662 
1663 	cd->pending_send = 0;
1664 
1665 	/* res can have high bit set */
1666 	if (cqe->flags & IORING_CQE_F_NOTIF)
1667 		return handle_send(ring, cqe);
1668 	if (cqe->res != -ENOBUFS)
1669 		return default_error(err, ring, cqe);
1670 
1671 	cd->snd_enobufs++;
1672 	return 0;
1673 }
1674 
1675 /*
1676  * We don't expect to get here, as we marked it with skipping posting a
1677  * CQE if it was successful. If it does trigger, than means it fails and
1678  * that our close has not been done. Log the shutdown error and issue a new
1679  * separate close.
1680  */
handle_shutdown(struct io_uring * ring,struct io_uring_cqe * cqe)1681 static int handle_shutdown(struct io_uring *ring, struct io_uring_cqe *cqe)
1682 {
1683 	struct conn *c = cqe_to_conn(cqe);
1684 	struct io_uring_sqe *sqe;
1685 	int fd = cqe_to_fd(cqe);
1686 
1687 	fprintf(stderr, "Got shutdown notication on fd %d\n", fd);
1688 
1689 	if (!cqe->res)
1690 		fprintf(stderr, "Unexpected success shutdown CQE\n");
1691 	else if (cqe->res < 0)
1692 		fprintf(stderr, "Shutdown got %s\n", strerror(-cqe->res));
1693 
1694 	sqe = get_sqe(ring);
1695 	if (fixed_files)
1696 		io_uring_prep_close_direct(sqe, fd);
1697 	else
1698 		io_uring_prep_close(sqe, fd);
1699 	encode_userdata(sqe, c, __CLOSE, 0, fd);
1700 	return 0;
1701 }
1702 
1703 /*
1704  * Final stage of a connection, the shutdown and close has finished. Mark
1705  * it as disconnected and let the main loop reap it.
1706  */
handle_close(struct io_uring * ring,struct io_uring_cqe * cqe)1707 static int handle_close(struct io_uring *ring, struct io_uring_cqe *cqe)
1708 {
1709 	struct conn *c = cqe_to_conn(cqe);
1710 	int fd = cqe_to_fd(cqe);
1711 
1712 	printf("Closed client: id=%d, in_fd=%d, out_fd=%d\n", c->tid, c->in_fd, c->out_fd);
1713 	if (fd == c->in_fd)
1714 		c->in_fd = -1;
1715 	else if (fd == c->out_fd)
1716 		c->out_fd = -1;
1717 
1718 	if (c->in_fd == -1 && c->out_fd == -1) {
1719 		c->flags |= CONN_F_DISCONNECTED;
1720 
1721 		pthread_mutex_lock(&thread_lock);
1722 		__show_stats(c);
1723 		open_conns--;
1724 		pthread_mutex_unlock(&thread_lock);
1725 		free_buffer_rings(ring, c);
1726 		free_msgs(&c->cd[0]);
1727 		free_msgs(&c->cd[1]);
1728 		free(c->cd[0].rcv_bucket);
1729 		free(c->cd[0].snd_bucket);
1730 	}
1731 
1732 	return 0;
1733 }
1734 
handle_cancel(struct io_uring * ring,struct io_uring_cqe * cqe)1735 static int handle_cancel(struct io_uring *ring, struct io_uring_cqe *cqe)
1736 {
1737 	struct conn *c = cqe_to_conn(cqe);
1738 	int fd = cqe_to_fd(cqe);
1739 
1740 	c->pending_cancels--;
1741 
1742 	vlog("%d: got cancel fd %d, refs %d\n", c->tid, fd, c->pending_cancels);
1743 
1744 	if (!c->pending_cancels) {
1745 		queue_shutdown_close(ring, c, c->in_fd);
1746 		if (c->out_fd != -1)
1747 			queue_shutdown_close(ring, c, c->out_fd);
1748 		io_uring_submit(ring);
1749 	}
1750 
1751 	return 0;
1752 }
1753 
open_socket(struct conn * c)1754 static void open_socket(struct conn *c)
1755 {
1756 	if (is_sink) {
1757 		pthread_mutex_lock(&thread_lock);
1758 		open_conns++;
1759 		pthread_mutex_unlock(&thread_lock);
1760 
1761 		submit_receive(&c->ring, c);
1762 	} else {
1763 		struct io_uring_sqe *sqe;
1764 		int domain;
1765 
1766 		if (ipv6)
1767 			domain = AF_INET6;
1768 		else
1769 			domain = AF_INET;
1770 
1771 		/*
1772 		 * If fixed_files is set, proxy will use fixed files for any new
1773 		 * file descriptors it instantiates. Fixd files, or fixed
1774 		 * descriptors, are io_uring private file descriptors. They
1775 		 * cannot be accessed outside of io_uring. io_uring holds a
1776 		 * fixed reference to them, which means that we do not need to
1777 		 * grab per-request references to them. Particularly for
1778 		 * threaded applications, grabbing and dropping file references
1779 		 * for each operation can be costly as the file table is shared.
1780 		 * This generally shows up as fget/fput related overhead in any
1781 		 * workload profiles.
1782 		 *
1783 		 * Fixed descriptors are passed in via the 'fd' field just like
1784 		 * regular descriptors, and then marked as such by setting the
1785 		 * IOSQE_FIXED_FILE flag in the sqe->flags field. Some helpers
1786 		 * do that automatically, like the below, others will need it
1787 		 * set manually if they don't have a *direct*() helper.
1788 		 *
1789 		 * For operations that instantiate them, like the opening of a
1790 		 * direct socket, the application may either ask the kernel to
1791 		 * find a free one (as is done below), or the application may
1792 		 * manage the space itself and pass in an index for a currently
1793 		 * free slot in the table. If the kernel is asked to allocate a
1794 		 * free direct descriptor, note that io_uring does not abide by
1795 		 * the POSIX mandated "lowest free must be returned". It may
1796 		 * return any free descriptor of its choosing.
1797 		 */
1798 		sqe = get_sqe(&c->ring);
1799 		if (fixed_files)
1800 			io_uring_prep_socket_direct_alloc(sqe, domain, SOCK_STREAM, 0, 0);
1801 		else
1802 			io_uring_prep_socket(sqe, domain, SOCK_STREAM, 0, 0);
1803 		encode_userdata(sqe, c, __SOCK, 0, 0);
1804 	}
1805 }
1806 
1807 /*
1808  * Start of connection, we got our in descriptor.
1809  */
handle_fd_pass(struct io_uring_cqe * cqe)1810 static int handle_fd_pass(struct io_uring_cqe *cqe)
1811 {
1812 	struct conn *c = cqe_to_conn(cqe);
1813 	int fd = cqe_to_fd(cqe);
1814 
1815 	vlog("%d: got fd pass %d\n", c->tid, fd);
1816 	c->in_fd = fd;
1817 	open_socket(c);
1818 	return 0;
1819 }
1820 
handle_stop(struct io_uring_cqe * cqe)1821 static int handle_stop(struct io_uring_cqe *cqe)
1822 {
1823 	struct conn *c = cqe_to_conn(cqe);
1824 
1825 	printf("Client %d: queueing shutdown\n", c->tid);
1826 	queue_cancel(&c->ring, c);
1827 	return 0;
1828 }
1829 
1830 /*
1831  * Called for each CQE that we receive. Decode the request type that it
1832  * came from, and call the appropriate handler.
1833  */
handle_cqe(struct io_uring * ring,struct io_uring_cqe * cqe)1834 static int handle_cqe(struct io_uring *ring, struct io_uring_cqe *cqe)
1835 {
1836 	int ret;
1837 
1838 	/*
1839 	 * Unlikely, but there's an error in this CQE. If an error handler
1840 	 * is defined, call it, and that will deal with it. If no error
1841 	 * handler is defined, the opcode handler either doesn't care or will
1842 	 * handle it on its own.
1843 	 */
1844 	if (cqe->res < 0) {
1845 		struct error_handler *err = &error_handlers[cqe_to_op(cqe)];
1846 
1847 		if (err->error_fn)
1848 			return err->error_fn(err, ring, cqe);
1849 	}
1850 
1851 	switch (cqe_to_op(cqe)) {
1852 	case __ACCEPT:
1853 		ret = handle_accept(ring, cqe);
1854 		break;
1855 	case __SOCK:
1856 		ret = handle_sock(ring, cqe);
1857 		break;
1858 	case __CONNECT:
1859 		ret = handle_connect(ring, cqe);
1860 		break;
1861 	case __RECV:
1862 	case __RECVMSG:
1863 		ret = handle_recv(ring, cqe);
1864 		break;
1865 	case __SEND:
1866 	case __SENDMSG:
1867 		ret = handle_send(ring, cqe);
1868 		break;
1869 	case __CANCEL:
1870 		ret = handle_cancel(ring, cqe);
1871 		break;
1872 	case __SHUTDOWN:
1873 		ret = handle_shutdown(ring, cqe);
1874 		break;
1875 	case __CLOSE:
1876 		ret = handle_close(ring, cqe);
1877 		break;
1878 	case __FD_PASS:
1879 		ret = handle_fd_pass(cqe);
1880 		break;
1881 	case __STOP:
1882 		ret = handle_stop(cqe);
1883 		break;
1884 	case __NOP:
1885 		ret = 0;
1886 		break;
1887 	default:
1888 		fprintf(stderr, "bad user data %lx\n", (long) cqe->user_data);
1889 		return 1;
1890 	}
1891 
1892 	return ret;
1893 }
1894 
house_keeping(struct io_uring * ring)1895 static void house_keeping(struct io_uring *ring)
1896 {
1897 	static unsigned long last_bytes;
1898 	unsigned long bytes, elapsed;
1899 	struct conn *c;
1900 	int i, j;
1901 
1902 	vlog("House keeping entered\n");
1903 
1904 	bytes = 0;
1905 	for (i = 0; i < nr_conns; i++) {
1906 		c = &conns[i];
1907 
1908 		for (j = 0; j < 2; j++) {
1909 			struct conn_dir *cd = &c->cd[j];
1910 
1911 			bytes += cd->in_bytes + cd->out_bytes;
1912 		}
1913 		if (c->flags & CONN_F_DISCONNECTED) {
1914 			vlog("%d: disconnected\n", i);
1915 
1916 			if (!(c->flags & CONN_F_REAPED)) {
1917 				void *ret;
1918 
1919 				pthread_join(c->thread, &ret);
1920 				c->flags |= CONN_F_REAPED;
1921 			}
1922 			continue;
1923 		}
1924 		if (c->flags & CONN_F_DISCONNECTING)
1925 			continue;
1926 
1927 		if (should_shutdown(c)) {
1928 			__close_conn(ring, c);
1929 			c->flags |= CONN_F_DISCONNECTING;
1930 		}
1931 	}
1932 
1933 	elapsed = mtime_since_now(&last_housekeeping);
1934 	if (bytes && elapsed >= 900) {
1935 		unsigned long bw;
1936 
1937 		bw = (8 * (bytes - last_bytes) / 1000UL) / elapsed;
1938 		if (bw) {
1939 			if (open_conns)
1940 				printf("Bandwidth (threads=%d): %'luMbit\n", open_conns, bw);
1941 			gettimeofday(&last_housekeeping, NULL);
1942 			last_bytes = bytes;
1943 		}
1944 	}
1945 }
1946 
1947 /*
1948  * Event loop shared between the parent, and the connections. Could be
1949  * split in two, as they don't handle the same types of events. For the per
1950  * connection loop, 'c' is valid. For the main loop, it's NULL.
1951  */
__event_loop(struct io_uring * ring,struct conn * c)1952 static int __event_loop(struct io_uring *ring, struct conn *c)
1953 {
1954 	struct __kernel_timespec active_ts, idle_ts;
1955 	int flags;
1956 
1957 	idle_ts.tv_sec = 0;
1958 	idle_ts.tv_nsec = 100000000LL;
1959 	active_ts = idle_ts;
1960 	if (wait_usec > 1000000) {
1961 		active_ts.tv_sec = wait_usec / 1000000;
1962 		wait_usec -= active_ts.tv_sec * 1000000;
1963 	}
1964 	active_ts.tv_nsec = wait_usec * 1000;
1965 
1966 	gettimeofday(&last_housekeeping, NULL);
1967 
1968 	flags = 0;
1969 	while (1) {
1970 		struct __kernel_timespec *ts = &idle_ts;
1971 		struct io_uring_cqe *cqe;
1972 		unsigned int head;
1973 		int ret, i, to_wait;
1974 
1975 		/*
1976 		 * If wait_batch is set higher than 1, then we'll wait on
1977 		 * that amount of CQEs to be posted each loop. If used with
1978 		 * DEFER_TASKRUN, this can provide a substantial reduction
1979 		 * in context switch rate as the task isn't woken until the
1980 		 * requested number of events can be returned.
1981 		 *
1982 		 * Can be used with -t to set a wait_usec timeout as well.
1983 		 * For example, if an application can deal with 250 usec
1984 		 * of wait latencies, it can set -w8 -t250 which will cause
1985 		 * io_uring to return when either 8 events have been received,
1986 		 * or if 250 usec of waiting has passed.
1987 		 *
1988 		 * If we don't have any open connections, wait on just 1
1989 		 * always.
1990 		 */
1991 		to_wait = 1;
1992 		if (open_conns && !flags) {
1993 			ts = &active_ts;
1994 			to_wait = wait_batch;
1995 		}
1996 
1997 		vlog("Submit and wait for %d\n", to_wait);
1998 		ret = io_uring_submit_and_wait_timeout(ring, &cqe, to_wait, ts, NULL);
1999 
2000 		if (*ring->cq.koverflow)
2001 			printf("overflow %u\n", *ring->cq.koverflow);
2002 		if (*ring->sq.kflags &  IORING_SQ_CQ_OVERFLOW)
2003 			printf("saw overflow\n");
2004 
2005 		vlog("Submit and wait: %d\n", ret);
2006 
2007 		i = flags = 0;
2008 		io_uring_for_each_cqe(ring, head, cqe) {
2009 			if (handle_cqe(ring, cqe))
2010 				return 1;
2011 			flags |= cqe_to_conn(cqe)->flags;
2012 			++i;
2013 		}
2014 
2015 		vlog("Handled %d events\n", i);
2016 
2017 		/*
2018 		 * Advance the CQ ring for seen events when we've processed
2019 		 * all of them in this loop. This can also be done with
2020 		 * io_uring_cqe_seen() in each handler above, which just marks
2021 		 * that single CQE as seen. However, it's more efficient to
2022 		 * mark a batch as seen when we're done with that batch.
2023 		 */
2024 		if (i) {
2025 			io_uring_cq_advance(ring, i);
2026 			events += i;
2027 		}
2028 
2029 		event_loops++;
2030 		if (c) {
2031 			if (c->flags & CONN_F_DISCONNECTED)
2032 				break;
2033 		} else {
2034 			house_keeping(ring);
2035 		}
2036 	}
2037 
2038 	return 0;
2039 }
2040 
2041 /*
2042  * Main event loop, Submit our multishot accept request, and then just loop
2043  * around handling incoming connections.
2044  */
parent_loop(struct io_uring * ring,int fd)2045 static int parent_loop(struct io_uring *ring, int fd)
2046 {
2047 	struct io_uring_sqe *sqe;
2048 
2049 	/*
2050 	 * proxy provides a way to use either multishot receive or not, but
2051 	 * for accept, we always use multishot. A multishot accept request
2052 	 * needs only be armed once, and then it'll trigger a completion and
2053 	 * post a CQE whenever a new connection is accepted. No need to do
2054 	 * anything else, unless the multishot accept terminates. This happens
2055 	 * if it encounters an error. Applications should check for
2056 	 * IORING_CQE_F_MORE in cqe->flags - this tells you if more completions
2057 	 * are expected from this request or not. Non-multishot never have
2058 	 * this set, where multishot will always have this set unless an error
2059 	 * occurs.
2060 	 */
2061 	sqe = get_sqe(ring);
2062 	if (fixed_files)
2063 		io_uring_prep_multishot_accept_direct(sqe, fd, NULL, NULL, 0);
2064 	else
2065 		io_uring_prep_multishot_accept(sqe, fd, NULL, NULL, 0);
2066 	__encode_userdata(sqe, 0, __ACCEPT, 0, fd);
2067 
2068 	return __event_loop(ring, NULL);
2069 }
2070 
init_ring(struct io_uring * ring,int nr_files)2071 static int init_ring(struct io_uring *ring, int nr_files)
2072 {
2073 	struct io_uring_params params;
2074 	int ret;
2075 
2076 	/*
2077 	 * By default, set us up with a big CQ ring. Not strictly needed
2078 	 * here, but it's very important to never overflow the CQ ring.
2079 	 * Events will not be dropped if this happens, but it does slow
2080 	 * the application down in dealing with overflown events.
2081 	 *
2082 	 * Set SINGLE_ISSUER, which tells the kernel that only one thread
2083 	 * is doing IO submissions. This enables certain optimizations in
2084 	 * the kernel.
2085 	 */
2086 	memset(&params, 0, sizeof(params));
2087 	params.flags |= IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_CLAMP;
2088 	params.flags |= IORING_SETUP_CQSIZE;
2089 	params.cq_entries = 1024;
2090 
2091 	/*
2092 	 * If use_huge is set, setup the ring with IORING_SETUP_NO_MMAP. This
2093 	 * means that the application allocates the memory for the ring, and
2094 	 * the kernel maps it. The alternative is having the kernel allocate
2095 	 * the memory, and then liburing will mmap it. But we can't really
2096 	 * support huge pages that way. If this fails, then ensure that the
2097 	 * system has huge pages set aside upfront.
2098 	 */
2099 	if (use_huge)
2100 		params.flags |= IORING_SETUP_NO_MMAP;
2101 
2102 	/*
2103 	 * DEFER_TASKRUN decouples async event reaping and retrying from
2104 	 * regular system calls. If this isn't set, then io_uring uses
2105 	 * normal task_work for this. task_work is always being run on any
2106 	 * exit to userspace. Real applications do more than just call IO
2107 	 * related system calls, and hence we can be running this work way
2108 	 * too often. Using DEFER_TASKRUN defers any task_work running to
2109 	 * when the application enters the kernel anyway to wait on new
2110 	 * events. It's generally the preferred and recommended way to setup
2111 	 * a ring.
2112 	 */
2113 	if (defer_tw) {
2114 		params.flags |= IORING_SETUP_DEFER_TASKRUN;
2115 		sqpoll = 0;
2116 	}
2117 
2118 	/*
2119 	 * SQPOLL offloads any request submission and retry operations to a
2120 	 * dedicated thread. This enables an application to do IO without
2121 	 * ever having to enter the kernel itself. The SQPOLL thread will
2122 	 * stay busy as long as there's work to do, and go to sleep if
2123 	 * sq_thread_idle msecs have passed. If it's running, submitting new
2124 	 * IO just needs to make them visible to the SQPOLL thread, it needs
2125 	 * not enter the kernel. For submission, the application will only
2126 	 * enter the kernel if the SQPOLL has been idle long enough that it
2127 	 * has gone to sleep.
2128 	 *
2129 	 * Waiting on events still need to enter the kernel, if none are
2130 	 * available. The application may also use io_uring_peek_cqe() to
2131 	 * check for new events without entering the kernel, as completions
2132 	 * will be continually produced to the CQ ring by the SQPOLL thread
2133 	 * as they occur.
2134 	 */
2135 	if (sqpoll) {
2136 		params.flags |= IORING_SETUP_SQPOLL;
2137 		params.sq_thread_idle = 1000;
2138 		defer_tw = 0;
2139 	}
2140 
2141 	/*
2142 	 * If neither DEFER_TASKRUN or SQPOLL is used, set COOP_TASKRUN. This
2143 	 * avoids heavy signal based notifications, which can force an
2144 	 * application to enter the kernel and process it as soon as they
2145 	 * occur.
2146 	 */
2147 	if (!sqpoll && !defer_tw)
2148 		params.flags |= IORING_SETUP_COOP_TASKRUN;
2149 
2150 	/*
2151 	 * The SQ ring size need not be larger than any batch of requests
2152 	 * that need to be prepared before submit. Normally in a loop we'd
2153 	 * only need a few, if any, particularly if multishot is used.
2154 	 */
2155 	ret = io_uring_queue_init_params(ring_size, ring, &params);
2156 	if (ret) {
2157 		fprintf(stderr, "%s\n", strerror(-ret));
2158 		return 1;
2159 	}
2160 
2161 	/*
2162 	 * If send serialization is available and no option was given to use
2163 	 * it or not, default it to on. If it was turned on and the kernel
2164 	 * doesn't support it, turn it off.
2165 	 */
2166 	if (params.features & IORING_FEAT_SEND_BUF_SELECT) {
2167 		if (send_ring == -1)
2168 			send_ring = 1;
2169 	} else {
2170 		if (send_ring == 1) {
2171 			fprintf(stderr, "Kernel doesn't support ring provided "
2172 				"buffers for sends, disabled\n");
2173 		}
2174 		send_ring = 0;
2175 	}
2176 
2177 	if (!send_ring && snd_bundle) {
2178 		fprintf(stderr, "Can't use send bundle without send_ring\n");
2179 		snd_bundle = 0;
2180 	}
2181 
2182 	if (fixed_files) {
2183 		/*
2184 		 * If fixed files are used, we need to allocate a fixed file
2185 		 * table upfront where new direct descriptors can be managed.
2186 		 */
2187 		ret = io_uring_register_files_sparse(ring, nr_files);
2188 		if (ret) {
2189 			fprintf(stderr, "file register: %d\n", ret);
2190 			return 1;
2191 		}
2192 
2193 		/*
2194 		 * If fixed files are used, we also register the ring fd. See
2195 		 * comment near io_uring_prep_socket_direct_alloc() further
2196 		 * down. This avoids the fget/fput overhead associated with
2197 		 * the io_uring_enter(2) system call itself, which is used to
2198 		 * submit and wait on events.
2199 		 */
2200 		ret = io_uring_register_ring_fd(ring);
2201 		if (ret != 1) {
2202 			fprintf(stderr, "ring register: %d\n", ret);
2203 			return 1;
2204 		}
2205 	}
2206 
2207 	if (napi) {
2208 		struct io_uring_napi n = {
2209 			.prefer_busy_poll = napi > 1 ? 1 : 0,
2210 			.busy_poll_to = napi_timeout,
2211 		};
2212 
2213 		ret = io_uring_register_napi(ring, &n);
2214 		if (ret) {
2215 			fprintf(stderr, "io_uring_register_napi: %d\n", ret);
2216 			if (ret != -EINVAL)
2217 				return 1;
2218 			fprintf(stderr, "NAPI not available, turned off\n");
2219 		}
2220 	}
2221 
2222 	return 0;
2223 }
2224 
thread_main(void * data)2225 static void *thread_main(void *data)
2226 {
2227 	struct conn *c = data;
2228 	int ret;
2229 
2230 	c->flags |= CONN_F_STARTED;
2231 
2232 	/* we need a max of 4 descriptors for each client */
2233 	ret = init_ring(&c->ring, 4);
2234 	if (ret)
2235 		goto done;
2236 
2237 	if (setup_buffer_rings(&c->ring, c))
2238 		goto done;
2239 
2240 	/*
2241 	 * If we're using fixed files, then we need to wait for the parent
2242 	 * to install the c->in_fd into our direct descriptor table. When
2243 	 * that happens, we'll set things up. If we're not using fixed files,
2244 	 * we can set up the receive or connect now.
2245 	 */
2246 	if (!fixed_files)
2247 		open_socket(c);
2248 
2249 	/* we're ready */
2250 	pthread_barrier_wait(&c->startup_barrier);
2251 
2252 	__event_loop(&c->ring, c);
2253 done:
2254 	return NULL;
2255 }
2256 
usage(const char * name)2257 static void usage(const char *name)
2258 {
2259 	printf("%s:\n", name);
2260 	printf("\t-m:\t\tUse multishot receive (%d)\n", recv_mshot);
2261 	printf("\t-d:\t\tUse DEFER_TASKRUN (%d)\n", defer_tw);
2262 	printf("\t-S:\t\tUse SQPOLL (%d)\n", sqpoll);
2263 	printf("\t-f:\t\tUse only fixed files (%d)\n", fixed_files);
2264 	printf("\t-a:\t\tUse huge pages for the ring (%d)\n", use_huge);
2265 	printf("\t-t:\t\tTimeout for waiting on CQEs (usec) (%d)\n", wait_usec);
2266 	printf("\t-w:\t\tNumber of CQEs to wait for each loop (%d)\n", wait_batch);
2267 	printf("\t-B:\t\tUse bi-directional mode (%d)\n", bidi);
2268 	printf("\t-s:\t\tAct only as a sink (%d)\n", is_sink);
2269 	printf("\t-q:\t\tRing size to use (%d)\n", ring_size);
2270 	printf("\t-H:\t\tHost to connect to (%s)\n", host);
2271 	printf("\t-r:\t\tPort to receive on (%d)\n", receive_port);
2272 	printf("\t-p:\t\tPort to connect to (%d)\n", send_port);
2273 	printf("\t-6:\t\tUse IPv6 (%d)\n", ipv6);
2274 	printf("\t-N:\t\tUse NAPI polling (%d)\n", napi);
2275 	printf("\t-T:\t\tNAPI timeout (usec) (%d)\n", napi_timeout);
2276 	printf("\t-b:\t\tSend/receive buf size (%d)\n", buf_size);
2277 	printf("\t-n:\t\tNumber of provided buffers (pow2) (%d)\n", nr_bufs);
2278 	printf("\t-u:\t\tUse provided buffers for send (%d)\n", send_ring);
2279 	printf("\t-C:\t\tUse bundles for send (%d)\n", snd_bundle);
2280 	printf("\t-z:\t\tUse zerocopy send (%d)\n", snd_zc);
2281 	printf("\t-c:\t\tUse bundles for recv (%d)\n", snd_bundle);
2282 	printf("\t-M:\t\tUse sendmsg (%d)\n", snd_msg);
2283 	printf("\t-M:\t\tUse recvmsg (%d)\n", rcv_msg);
2284 	printf("\t-x:\t\tShow extended stats (%d)\n", ext_stat);
2285 	printf("\t-V:\t\tIncrease verbosity (%d)\n", verbose);
2286 }
2287 
2288 /*
2289  * Options parsing the ring / net setup
2290  */
main(int argc,char * argv[])2291 int main(int argc, char *argv[])
2292 {
2293 	struct io_uring ring;
2294 	struct sigaction sa = { };
2295 	const char *optstring;
2296 	int opt, ret, fd;
2297 
2298 	setlocale(LC_NUMERIC, "en_US");
2299 
2300 	page_size = sysconf(_SC_PAGESIZE);
2301 	if (page_size < 0) {
2302 		perror("sysconf(_SC_PAGESIZE)");
2303 		return 1;
2304 	}
2305 
2306 	pthread_mutex_init(&thread_lock, NULL);
2307 
2308 	optstring = "m:d:S:s:b:f:H:r:p:n:B:N:T:w:t:M:R:u:c:C:q:a:x:z:6Vh?";
2309 	while ((opt = getopt(argc, argv, optstring)) != -1) {
2310 		switch (opt) {
2311 		case 'm':
2312 			recv_mshot = !!atoi(optarg);
2313 			break;
2314 		case 'S':
2315 			sqpoll = !!atoi(optarg);
2316 			break;
2317 		case 'd':
2318 			defer_tw = !!atoi(optarg);
2319 			break;
2320 		case 'b':
2321 			buf_size = atoi(optarg);
2322 			break;
2323 		case 'n':
2324 			nr_bufs = atoi(optarg);
2325 			break;
2326 		case 'u':
2327 			send_ring = !!atoi(optarg);
2328 			break;
2329 		case 'c':
2330 			rcv_bundle = !!atoi(optarg);
2331 			break;
2332 		case 'C':
2333 			snd_bundle = !!atoi(optarg);
2334 			break;
2335 		case 'w':
2336 			wait_batch = atoi(optarg);
2337 			break;
2338 		case 't':
2339 			wait_usec = atoi(optarg);
2340 			break;
2341 		case 's':
2342 			is_sink = !!atoi(optarg);
2343 			break;
2344 		case 'f':
2345 			fixed_files = !!atoi(optarg);
2346 			break;
2347 		case 'H':
2348 			host = strdup(optarg);
2349 			break;
2350 		case 'r':
2351 			receive_port = atoi(optarg);
2352 			break;
2353 		case 'p':
2354 			send_port = atoi(optarg);
2355 			break;
2356 		case 'B':
2357 			bidi = !!atoi(optarg);
2358 			break;
2359 		case 'N':
2360 			napi = !!atoi(optarg);
2361 			break;
2362 		case 'T':
2363 			napi_timeout = atoi(optarg);
2364 			break;
2365 		case '6':
2366 			ipv6 = true;
2367 			break;
2368 		case 'M':
2369 			snd_msg = !!atoi(optarg);
2370 			break;
2371 		case 'z':
2372 			snd_zc = !!atoi(optarg);
2373 			break;
2374 		case 'R':
2375 			rcv_msg = !!atoi(optarg);
2376 			break;
2377 		case 'q':
2378 			ring_size = atoi(optarg);
2379 			break;
2380 		case 'a':
2381 			use_huge = !!atoi(optarg);
2382 			break;
2383 		case 'x':
2384 			ext_stat = !!atoi(optarg);
2385 			break;
2386 		case 'V':
2387 			verbose++;
2388 			break;
2389 		case 'h':
2390 		default:
2391 			usage(argv[0]);
2392 			return 1;
2393 		}
2394 	}
2395 
2396 	if (bidi && is_sink) {
2397 		fprintf(stderr, "Can't be both bidi proxy and sink\n");
2398 		return 1;
2399 	}
2400 	if (snd_msg && sqpoll) {
2401 		fprintf(stderr, "SQPOLL with msg variants disabled\n");
2402 		snd_msg = 0;
2403 	}
2404 	if (rcv_msg && rcv_bundle) {
2405 		fprintf(stderr, "Can't use bundles with recvmsg\n");
2406 		rcv_msg = 0;
2407 	}
2408 	if (snd_msg && snd_bundle) {
2409 		fprintf(stderr, "Can't use bundles with sendmsg\n");
2410 		snd_msg = 0;
2411 	}
2412 	if (snd_msg && send_ring) {
2413 		fprintf(stderr, "Can't use send ring sendmsg\n");
2414 		snd_msg = 0;
2415 	}
2416 	if (snd_zc && (send_ring || snd_bundle)) {
2417 		fprintf(stderr, "Can't use send zc with bundles or ring\n");
2418 		send_ring = snd_bundle = 0;
2419 	}
2420 	/*
2421 	 * For recvmsg w/multishot, we waste some data at the head of the
2422 	 * packet every time. Adjust the buffer size to account for that,
2423 	 * so we're still handing 'buf_size' actual payload of data.
2424 	 */
2425 	if (rcv_msg && recv_mshot) {
2426 		fprintf(stderr, "Adjusted buf size for recvmsg w/multishot\n");
2427 		buf_size += sizeof(struct io_uring_recvmsg_out);
2428 	}
2429 
2430 	br_mask = nr_bufs - 1;
2431 
2432 	fd = setup_listening_socket(receive_port, ipv6);
2433 	if (is_sink)
2434 		send_port = -1;
2435 
2436 	if (fd == -1)
2437 		return 1;
2438 
2439 	atexit(show_stats);
2440 	sa.sa_handler = sig_int;
2441 	sa.sa_flags = SA_RESTART;
2442 	sigaction(SIGINT, &sa, NULL);
2443 
2444 	ret = init_ring(&ring, MAX_CONNS * 3);
2445 	if (ret)
2446 		return ret;
2447 
2448 	printf("Backend: sqpoll=%d, defer_tw=%d, fixed_files=%d, "
2449 		"is_sink=%d, buf_size=%d, nr_bufs=%d, host=%s, send_port=%d, "
2450 		"receive_port=%d, napi=%d, napi_timeout=%d, huge_page=%d\n",
2451 			sqpoll, defer_tw, fixed_files, is_sink,
2452 			buf_size, nr_bufs, host, send_port, receive_port,
2453 			napi, napi_timeout, use_huge);
2454 	printf(" recv options: recvmsg=%d, recv_mshot=%d, recv_bundle=%d\n",
2455 			rcv_msg, recv_mshot, rcv_bundle);
2456 	printf(" send options: sendmsg=%d, send_ring=%d, send_bundle=%d, "
2457 		"send_zerocopy=%d\n", snd_msg, send_ring, snd_bundle,
2458 			snd_zc);
2459 
2460 	return parent_loop(&ring, fd);
2461 }
2462