1 /* SPDX-License-Identifier: MIT */
2 /*
3 * Sample program that can act either as a packet sink, where it just receives
4 * packets and doesn't do anything with them, or it can act as a proxy where it
5 * receives packets and then sends them to a new destination. The proxy can
6 * be unidirectional (-B0), or bi-direction (-B1).
7 *
8 * Examples:
9 *
10 * Act as a proxy, listening on port 4444, and send data to 192.168.2.6 on port
11 * 4445. Use multishot receive, DEFER_TASKRUN, and fixed files
12 *
13 * ./proxy -m1 -r4444 -H 192.168.2.6 -p4445
14 *
15 * Same as above, but utilize send bundles (-C1, requires -u1 send_ring) as well
16 * with ring provided send buffers, and recv bundles (-c1).
17 *
18 * ./proxy -m1 -c1 -u1 -C1 -r4444 -H 192.168.2.6 -p4445
19 *
20 * Act as a bi-directional proxy, listening on port 8888, and send data back
21 * and forth between host and 192.168.2.6 on port 22. Use multishot receive,
22 * DEFER_TASKRUN, fixed files, and buffers of size 1500.
23 *
24 * ./proxy -m1 -B1 -b1500 -r8888 -H 192.168.2.6 -p22
25 *
26 * Act a sink, listening on port 4445, using multishot receive, DEFER_TASKRUN,
27 * and fixed files:
28 *
29 * ./proxy -m1 -s1 -r4445
30 *
31 * Run with -h to see a list of options, and their defaults.
32 *
33 * (C) 2024 Jens Axboe <axboe@kernel.dk>
34 *
35 */
36 #include <fcntl.h>
37 #include <stdint.h>
38 #include <netinet/in.h>
39 #include <netinet/tcp.h>
40 #include <arpa/inet.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #include <sys/socket.h>
45 #include <sys/time.h>
46 #include <unistd.h>
47 #include <sys/mman.h>
48 #include <linux/mman.h>
49 #include <locale.h>
50 #include <assert.h>
51 #include <pthread.h>
52 #include <liburing.h>
53
54 #include "proxy.h"
55 #include "helpers.h"
56
57 /*
58 * Will go away once/if bundles are upstreamed and we put the generic
59 * definitions in the kernel header.
60 */
61 #ifndef IORING_RECVSEND_BUNDLE
62 #define IORING_RECVSEND_BUNDLE (1U << 4)
63 #endif
64 #ifndef IORING_FEAT_SEND_BUF_SELECT
65 #define IORING_FEAT_SEND_BUF_SELECT (1U << 14)
66 #endif
67
68 static int cur_bgid = 1;
69 static int nr_conns;
70 static int open_conns;
71 static long page_size;
72
73 static unsigned long event_loops;
74 static unsigned long events;
75
76 static int recv_mshot = 1;
77 static int sqpoll;
78 static int defer_tw = 1;
79 static int is_sink;
80 static int fixed_files = 1;
81 static char *host = "192.168.3.2";
82 static int send_port = 4445;
83 static int receive_port = 4444;
84 static int buf_size = 32;
85 static int bidi;
86 static int ipv6;
87 static int napi;
88 static int napi_timeout;
89 static int wait_batch = 1;
90 static int wait_usec = 1000000;
91 static int rcv_msg;
92 static int snd_msg;
93 static int snd_zc;
94 static int send_ring = -1;
95 static int snd_bundle;
96 static int rcv_bundle;
97 static int use_huge;
98 static int ext_stat;
99 static int verbose;
100
101 static int nr_bufs = 256;
102 static int br_mask;
103
104 static int ring_size = 128;
105
106 static pthread_mutex_t thread_lock;
107 static struct timeval last_housekeeping;
108
109 /*
110 * For sendmsg/recvmsg. recvmsg just has a single vec, sendmsg will have
111 * two vecs - one that is currently submitted and being sent, and one that
112 * is being prepared. When a new sendmsg is issued, we'll swap which one we
113 * use. For send, even though we don't pass in the iovec itself, we use the
114 * vec to serialize the sends to avoid reordering.
115 */
116 struct msg_vec {
117 struct iovec *iov;
118 /* length of allocated vec */
119 int vec_size;
120 /* length currently being used */
121 int iov_len;
122 /* only for send, current index we're processing */
123 int cur_iov;
124 };
125
126 struct io_msg {
127 struct msghdr msg;
128 struct msg_vec vecs[2];
129 /* current msg_vec being prepared */
130 int vec_index;
131 };
132
133 /*
134 * Per socket stats per connection. For bi-directional, we'll have both
135 * sends and receives on each socket, this helps track them seperately.
136 * For sink or one directional, each of the two stats will be only sends
137 * or receives, not both.
138 */
139 struct conn_dir {
140 int index;
141
142 int pending_shutdown;
143 int pending_send;
144 int pending_recv;
145
146 int snd_notif;
147
148 int out_buffers;
149
150 int rcv, rcv_shrt, rcv_enobufs, rcv_mshot;
151 int snd, snd_shrt, snd_enobufs, snd_busy, snd_mshot;
152
153 int snd_next_bid;
154 int rcv_next_bid;
155
156 int *rcv_bucket;
157 int *snd_bucket;
158
159 unsigned long in_bytes, out_bytes;
160
161 /* only ever have a single recv pending */
162 struct io_msg io_rcv_msg;
163
164 /* one send that is inflight, and one being prepared for the next one */
165 struct io_msg io_snd_msg;
166 };
167
168 enum {
169 CONN_F_STARTED = 1,
170 CONN_F_DISCONNECTING = 2,
171 CONN_F_DISCONNECTED = 4,
172 CONN_F_PENDING_SHUTDOWN = 8,
173 CONN_F_STATS_SHOWN = 16,
174 CONN_F_END_TIME = 32,
175 CONN_F_REAPED = 64,
176 };
177
178 /*
179 * buffer ring belonging to a connection
180 */
181 struct conn_buf_ring {
182 struct io_uring_buf_ring *br;
183 void *buf;
184 int bgid;
185 };
186
187 struct conn {
188 struct io_uring ring;
189
190 /* receive side buffer ring, new data arrives here */
191 struct conn_buf_ring in_br;
192 /* if send_ring is used, outgoing data to send */
193 struct conn_buf_ring out_br;
194
195 int tid;
196 int in_fd, out_fd;
197 int pending_cancels;
198 int flags;
199
200 struct conn_dir cd[2];
201
202 struct timeval start_time, end_time;
203
204 union {
205 struct sockaddr_in addr;
206 struct sockaddr_in6 addr6;
207 };
208
209 pthread_t thread;
210 pthread_barrier_t startup_barrier;
211 };
212
213 #define MAX_CONNS 1024
214 static struct conn conns[MAX_CONNS];
215
216 #define vlog(str, ...) do { \
217 if (verbose) \
218 printf(str, ##__VA_ARGS__); \
219 } while (0)
220
221 static int prep_next_send(struct io_uring *ring, struct conn *c,
222 struct conn_dir *cd, int fd);
223 static void *thread_main(void *data);
224
cqe_to_conn(struct io_uring_cqe * cqe)225 static struct conn *cqe_to_conn(struct io_uring_cqe *cqe)
226 {
227 struct userdata ud = { .val = cqe->user_data };
228
229 return &conns[ud.op_tid & TID_MASK];
230 }
231
cqe_to_conn_dir(struct conn * c,struct io_uring_cqe * cqe)232 static struct conn_dir *cqe_to_conn_dir(struct conn *c,
233 struct io_uring_cqe *cqe)
234 {
235 int fd = cqe_to_fd(cqe);
236
237 return &c->cd[fd != c->in_fd];
238 }
239
other_dir_fd(struct conn * c,int fd)240 static int other_dir_fd(struct conn *c, int fd)
241 {
242 if (c->in_fd == fd)
243 return c->out_fd;
244 return c->in_fd;
245 }
246
247 /* currently active msg_vec */
msg_vec(struct io_msg * imsg)248 static struct msg_vec *msg_vec(struct io_msg *imsg)
249 {
250 return &imsg->vecs[imsg->vec_index];
251 }
252
snd_msg_vec(struct conn_dir * cd)253 static struct msg_vec *snd_msg_vec(struct conn_dir *cd)
254 {
255 return msg_vec(&cd->io_snd_msg);
256 }
257
258 /*
259 * Goes from accept new connection -> create socket, connect to end
260 * point, prepare recv, on receive do send (unless sink). If either ends
261 * disconnects, we transition to shutdown and then close.
262 */
263 enum {
264 __ACCEPT = 1,
265 __SOCK = 2,
266 __CONNECT = 3,
267 __RECV = 4,
268 __RECVMSG = 5,
269 __SEND = 6,
270 __SENDMSG = 7,
271 __SHUTDOWN = 8,
272 __CANCEL = 9,
273 __CLOSE = 10,
274 __FD_PASS = 11,
275 __NOP = 12,
276 __STOP = 13,
277 };
278
279 struct error_handler {
280 const char *name;
281 int (*error_fn)(struct error_handler *, struct io_uring *, struct io_uring_cqe *);
282 };
283
284 static int recv_error(struct error_handler *err, struct io_uring *ring,
285 struct io_uring_cqe *cqe);
286 static int send_error(struct error_handler *err, struct io_uring *ring,
287 struct io_uring_cqe *cqe);
288
default_error(struct error_handler * err,struct io_uring * ring,struct io_uring_cqe * cqe)289 static int default_error(struct error_handler *err,
290 struct io_uring __attribute__((__unused__)) *ring,
291 struct io_uring_cqe *cqe)
292 {
293 struct conn *c = cqe_to_conn(cqe);
294
295 fprintf(stderr, "%d: %s error %s\n", c->tid, err->name, strerror(-cqe->res));
296 fprintf(stderr, "fd=%d, bid=%d\n", cqe_to_fd(cqe), cqe_to_bid(cqe));
297 return 1;
298 }
299
300 /*
301 * Move error handling out of the normal handling path, cleanly seperating
302 * them. If an opcode doesn't need any error handling, set it to NULL. If
303 * it wants to stop the connection at that point and not do anything else,
304 * then the default handler can be used. Only receive has proper error
305 * handling, as we can get -ENOBUFS which is not a fatal condition. It just
306 * means we need to wait on buffer replenishing before re-arming the receive.
307 */
308 static struct error_handler error_handlers[] = {
309 { .name = "NULL", .error_fn = NULL, },
310 { .name = "ACCEPT", .error_fn = default_error, },
311 { .name = "SOCK", .error_fn = default_error, },
312 { .name = "CONNECT", .error_fn = default_error, },
313 { .name = "RECV", .error_fn = recv_error, },
314 { .name = "RECVMSG", .error_fn = recv_error, },
315 { .name = "SEND", .error_fn = send_error, },
316 { .name = "SENDMSG", .error_fn = send_error, },
317 { .name = "SHUTDOWN", .error_fn = NULL, },
318 { .name = "CANCEL", .error_fn = NULL, },
319 { .name = "CLOSE", .error_fn = NULL, },
320 { .name = "FD_PASS", .error_fn = default_error, },
321 { .name = "NOP", .error_fn = NULL, },
322 { .name = "STOP", .error_fn = default_error, },
323 };
324
free_buffer_ring(struct io_uring * ring,struct conn_buf_ring * cbr)325 static void free_buffer_ring(struct io_uring *ring, struct conn_buf_ring *cbr)
326 {
327 if (!cbr->br)
328 return;
329
330 io_uring_free_buf_ring(ring, cbr->br, nr_bufs, cbr->bgid);
331 cbr->br = NULL;
332 if (use_huge)
333 munmap(cbr->buf, buf_size * nr_bufs);
334 else
335 free(cbr->buf);
336 }
337
free_buffer_rings(struct io_uring * ring,struct conn * c)338 static void free_buffer_rings(struct io_uring *ring, struct conn *c)
339 {
340 free_buffer_ring(ring, &c->in_br);
341 free_buffer_ring(ring, &c->out_br);
342 }
343
344 /*
345 * Setup a ring provided buffer ring for each connection. If we get -ENOBUFS
346 * on receive, for multishot receive we'll wait for half the provided buffers
347 * to be returned by pending sends, then re-arm the multishot receive. If
348 * this happens too frequently (see enobufs= stat), then the ring size is
349 * likely too small. Use -nXX to make it bigger. See recv_enobufs().
350 *
351 * The alternative here would be to use the older style provided buffers,
352 * where you simply setup a buffer group and use SQEs with
353 * io_urign_prep_provide_buffers() to add to the pool. But that approach is
354 * slower and has been deprecated by using the faster ring provided buffers.
355 */
setup_recv_ring(struct io_uring * ring,struct conn * c)356 static int setup_recv_ring(struct io_uring *ring, struct conn *c)
357 {
358 struct conn_buf_ring *cbr = &c->in_br;
359 int ret, i;
360 size_t len;
361 void *ptr;
362
363 len = buf_size * nr_bufs;
364 if (use_huge) {
365 cbr->buf = mmap(NULL, len, PROT_READ|PROT_WRITE,
366 MAP_PRIVATE|MAP_HUGETLB|MAP_HUGE_2MB|MAP_ANONYMOUS,
367 -1, 0);
368 if (cbr->buf == MAP_FAILED) {
369 perror("mmap");
370 return 1;
371 }
372 } else {
373 if (posix_memalign(&cbr->buf, page_size, len)) {
374 perror("posix memalign");
375 return 1;
376 }
377 }
378 cbr->br = io_uring_setup_buf_ring(ring, nr_bufs, cbr->bgid, 0, &ret);
379 if (!cbr->br) {
380 fprintf(stderr, "Buffer ring register failed %d\n", ret);
381 return 1;
382 }
383
384 ptr = cbr->buf;
385 for (i = 0; i < nr_bufs; i++) {
386 vlog("%d: add bid %d, data %p\n", c->tid, i, ptr);
387 io_uring_buf_ring_add(cbr->br, ptr, buf_size, i, br_mask, i);
388 ptr += buf_size;
389 }
390 io_uring_buf_ring_advance(cbr->br, nr_bufs);
391 printf("%d: recv buffer ring bgid %d, bufs %d\n", c->tid, cbr->bgid, nr_bufs);
392 return 0;
393 }
394
395 /*
396 * If 'send_ring' is used and the kernel supports it, we can skip serializing
397 * sends as the data will be ordered regardless. This reduces the send handling
398 * complexity, as buffers can always be added to the outgoing ring and will be
399 * processed in the order in which they were added.
400 */
setup_send_ring(struct io_uring * ring,struct conn * c)401 static int setup_send_ring(struct io_uring *ring, struct conn *c)
402 {
403 struct conn_buf_ring *cbr = &c->out_br;
404 int ret;
405
406 cbr->br = io_uring_setup_buf_ring(ring, nr_bufs, cbr->bgid, 0, &ret);
407 if (!cbr->br) {
408 fprintf(stderr, "Buffer ring register failed %d\n", ret);
409 return 1;
410 }
411
412 printf("%d: send buffer ring bgid %d, bufs %d\n", c->tid, cbr->bgid, nr_bufs);
413 return 0;
414 }
415
setup_send_zc(struct io_uring * ring,struct conn * c)416 static int setup_send_zc(struct io_uring *ring, struct conn *c)
417 {
418 struct iovec *iovs;
419 void *buf;
420 int i, ret;
421
422 if (snd_msg)
423 return 0;
424
425 buf = c->in_br.buf;
426 iovs = calloc(nr_bufs, sizeof(struct iovec));
427 for (i = 0; i < nr_bufs; i++) {
428 iovs[i].iov_base = buf;
429 iovs[i].iov_len = buf_size;
430 buf += buf_size;
431 }
432
433 ret = io_uring_register_buffers(ring, iovs, nr_bufs);
434 if (ret) {
435 fprintf(stderr, "failed registering buffers: %d\n", ret);
436 free(iovs);
437 return ret;
438 }
439 free(iovs);
440 return 0;
441 }
442
443 /*
444 * Setup an input and output buffer ring.
445 */
setup_buffer_rings(struct io_uring * ring,struct conn * c)446 static int setup_buffer_rings(struct io_uring *ring, struct conn *c)
447 {
448 int ret;
449
450 /* no locking needed on cur_bgid, parent serializes setup */
451 c->in_br.bgid = cur_bgid++;
452 c->out_br.bgid = cur_bgid++;
453 c->out_br.br = NULL;
454
455 ret = setup_recv_ring(ring, c);
456 if (ret)
457 return ret;
458 if (is_sink)
459 return 0;
460 if (snd_zc) {
461 ret = setup_send_zc(ring, c);
462 if (ret)
463 return ret;
464 }
465 if (send_ring) {
466 ret = setup_send_ring(ring, c);
467 if (ret) {
468 free_buffer_ring(ring, &c->in_br);
469 return ret;
470 }
471 }
472
473 return 0;
474 }
475
476 struct bucket_stat {
477 int nr_packets;
478 int count;
479 };
480
stat_cmp(const void * p1,const void * p2)481 static int stat_cmp(const void *p1, const void *p2)
482 {
483 const struct bucket_stat *b1 = p1;
484 const struct bucket_stat *b2 = p2;
485
486 if (b1->count < b2->count)
487 return 1;
488 else if (b1->count > b2->count)
489 return -1;
490 return 0;
491 }
492
show_buckets(struct conn_dir * cd)493 static void show_buckets(struct conn_dir *cd)
494 {
495 unsigned long snd_total, rcv_total;
496 struct bucket_stat *rstat, *sstat;
497 int i;
498
499 if (!cd->rcv_bucket || !cd->snd_bucket)
500 return;
501
502 rstat = calloc(nr_bufs + 1, sizeof(struct bucket_stat));
503 sstat = calloc(nr_bufs + 1, sizeof(struct bucket_stat));
504
505 snd_total = rcv_total = 0;
506 for (i = 0; i <= nr_bufs; i++) {
507 snd_total += cd->snd_bucket[i];
508 sstat[i].nr_packets = i;
509 sstat[i].count = cd->snd_bucket[i];
510 rcv_total += cd->rcv_bucket[i];
511 rstat[i].nr_packets = i;
512 rstat[i].count = cd->rcv_bucket[i];
513 }
514
515 if (!snd_total && !rcv_total) {
516 free(sstat);
517 free(rstat);
518 }
519 if (snd_total)
520 qsort(sstat, nr_bufs, sizeof(struct bucket_stat), stat_cmp);
521 if (rcv_total)
522 qsort(rstat, nr_bufs, sizeof(struct bucket_stat), stat_cmp);
523
524 printf("\t Packets per recv/send:\n");
525 for (i = 0; i <= nr_bufs; i++) {
526 double snd_prc = 0.0, rcv_prc = 0.0;
527 if (!rstat[i].count && !sstat[i].count)
528 continue;
529 if (rstat[i].count)
530 rcv_prc = 100.0 * (rstat[i].count / (double) rcv_total);
531 if (sstat[i].count)
532 snd_prc = 100.0 * (sstat[i].count / (double) snd_total);
533 printf("\t bucket(%3d/%3d): rcv=%u (%.2f%%) snd=%u (%.2f%%)\n",
534 rstat[i].nr_packets, sstat[i].nr_packets,
535 rstat[i].count, rcv_prc,
536 sstat[i].count, snd_prc);
537 }
538
539 free(sstat);
540 free(rstat);
541 }
542
__show_stats(struct conn * c)543 static void __show_stats(struct conn *c)
544 {
545 unsigned long msec, qps;
546 unsigned long bytes, bw;
547 struct conn_dir *cd;
548 int i;
549
550 if (c->flags & (CONN_F_STATS_SHOWN | CONN_F_REAPED))
551 return;
552 if (!(c->flags & CONN_F_STARTED))
553 return;
554
555 if (!(c->flags & CONN_F_END_TIME))
556 gettimeofday(&c->end_time, NULL);
557
558 msec = (c->end_time.tv_sec - c->start_time.tv_sec) * 1000;
559 msec += (c->end_time.tv_usec - c->start_time.tv_usec) / 1000;
560
561 qps = 0;
562 for (i = 0; i < 2; i++)
563 qps += c->cd[i].rcv + c->cd[i].snd;
564
565 if (!qps)
566 return;
567
568 if (msec)
569 qps = (qps * 1000) / msec;
570
571 printf("Conn %d/(in_fd=%d, out_fd=%d): qps=%lu, msec=%lu\n", c->tid,
572 c->in_fd, c->out_fd, qps, msec);
573
574 bytes = 0;
575 for (i = 0; i < 2; i++) {
576 cd = &c->cd[i];
577
578 if (!cd->in_bytes && !cd->out_bytes && !cd->snd && !cd->rcv)
579 continue;
580
581 bytes += cd->in_bytes;
582 bytes += cd->out_bytes;
583
584 printf("\t%3d: rcv=%u (short=%u, enobufs=%d), snd=%u (short=%u,"
585 " busy=%u, enobufs=%d)\n", i, cd->rcv, cd->rcv_shrt,
586 cd->rcv_enobufs, cd->snd, cd->snd_shrt, cd->snd_busy,
587 cd->snd_enobufs);
588 printf("\t : in_bytes=%lu (Kb %lu), out_bytes=%lu (Kb %lu)\n",
589 cd->in_bytes, cd->in_bytes >> 10,
590 cd->out_bytes, cd->out_bytes >> 10);
591 printf("\t : mshot_rcv=%d, mshot_snd=%d\n", cd->rcv_mshot,
592 cd->snd_mshot);
593 show_buckets(cd);
594
595 }
596 if (msec) {
597 bytes *= 8UL;
598 bw = bytes / 1000;
599 bw /= msec;
600 printf("\tBW=%'luMbit\n", bw);
601 }
602
603 c->flags |= CONN_F_STATS_SHOWN;
604 }
605
show_stats(void)606 static void show_stats(void)
607 {
608 float events_per_loop = 0.0;
609 static int stats_shown;
610 int i;
611
612 if (stats_shown)
613 return;
614
615 if (events)
616 events_per_loop = (float) events / (float) event_loops;
617
618 printf("Event loops: %lu, events %lu, events per loop %.2f\n", event_loops,
619 events, events_per_loop);
620
621 for (i = 0; i < MAX_CONNS; i++) {
622 struct conn *c = &conns[i];
623
624 __show_stats(c);
625 }
626 stats_shown = 1;
627 }
628
sig_int(int sig)629 static void sig_int(int __attribute__((__unused__)) sig)
630 {
631 printf("\n");
632 show_stats();
633 exit(1);
634 }
635
636 /*
637 * Special cased for SQPOLL only, as we don't control when SQEs are consumed if
638 * that is used. Hence we may need to wait for the SQPOLL thread to keep up
639 * until we can get a new SQE. All other cases will break immediately, with a
640 * fresh SQE.
641 *
642 * If we grossly undersized our SQ ring, getting a NULL sqe can happen even
643 * for the !SQPOLL case if we're handling a lot of CQEs in our event loop
644 * and multishot isn't used. We can do io_uring_submit() to flush what we
645 * have here. Only caveat here is that if linked requests are used, SQEs
646 * would need to be allocated upfront as a link chain is only valid within
647 * a single submission cycle.
648 */
get_sqe(struct io_uring * ring)649 static struct io_uring_sqe *get_sqe(struct io_uring *ring)
650 {
651 struct io_uring_sqe *sqe;
652
653 do {
654 sqe = io_uring_get_sqe(ring);
655 if (sqe)
656 break;
657 if (!sqpoll)
658 io_uring_submit(ring);
659 else
660 io_uring_sqring_wait(ring);
661 } while (1);
662
663 return sqe;
664 }
665
666 /*
667 * See __encode_userdata() for how we encode sqe->user_data, which is passed
668 * back as cqe->user_data at completion time.
669 */
encode_userdata(struct io_uring_sqe * sqe,struct conn * c,int op,int bid,int fd)670 static void encode_userdata(struct io_uring_sqe *sqe, struct conn *c, int op,
671 int bid, int fd)
672 {
673 __encode_userdata(sqe, c->tid, op, bid, fd);
674 }
675
__submit_receive(struct io_uring * ring,struct conn * c,struct conn_dir * cd,int fd)676 static void __submit_receive(struct io_uring *ring, struct conn *c,
677 struct conn_dir *cd, int fd)
678 {
679 struct conn_buf_ring *cbr = &c->in_br;
680 struct io_uring_sqe *sqe;
681
682 vlog("%d: submit receive fd=%d\n", c->tid, fd);
683
684 assert(!cd->pending_recv);
685 cd->pending_recv = 1;
686
687 /*
688 * For both recv and multishot receive, we use the ring provided
689 * buffers. These are handed to the application ahead of time, and
690 * are consumed when a receive triggers. Note that the address and
691 * length of the receive are set to NULL/0, and we assign the
692 * sqe->buf_group to tell the kernel which buffer group ID to pick
693 * a buffer from. Finally, IOSQE_BUFFER_SELECT is set to tell the
694 * kernel that we want a buffer picked for this request, we are not
695 * passing one in with the request.
696 */
697 sqe = get_sqe(ring);
698 if (rcv_msg) {
699 struct io_msg *imsg = &cd->io_rcv_msg;
700 struct msghdr *msg = &imsg->msg;
701
702 memset(msg, 0, sizeof(*msg));
703 msg->msg_iov = msg_vec(imsg)->iov;
704 msg->msg_iovlen = msg_vec(imsg)->iov_len;
705
706 if (recv_mshot) {
707 cd->rcv_mshot++;
708 io_uring_prep_recvmsg_multishot(sqe, fd, &imsg->msg, 0);
709 } else {
710 io_uring_prep_recvmsg(sqe, fd, &imsg->msg, 0);
711 }
712 } else {
713 if (recv_mshot) {
714 cd->rcv_mshot++;
715 io_uring_prep_recv_multishot(sqe, fd, NULL, 0, 0);
716 } else {
717 io_uring_prep_recv(sqe, fd, NULL, 0, 0);
718 }
719 }
720 encode_userdata(sqe, c, __RECV, 0, fd);
721 sqe->buf_group = cbr->bgid;
722 sqe->flags |= IOSQE_BUFFER_SELECT;
723 if (fixed_files)
724 sqe->flags |= IOSQE_FIXED_FILE;
725 if (rcv_bundle)
726 sqe->ioprio |= IORING_RECVSEND_BUNDLE;
727 }
728
729 /*
730 * One directional just arms receive on our in_fd
731 */
submit_receive(struct io_uring * ring,struct conn * c)732 static void submit_receive(struct io_uring *ring, struct conn *c)
733 {
734 __submit_receive(ring, c, &c->cd[0], c->in_fd);
735 }
736
737 /*
738 * Bi-directional arms receive on both in and out fd
739 */
submit_bidi_receive(struct io_uring * ring,struct conn * c)740 static void submit_bidi_receive(struct io_uring *ring, struct conn *c)
741 {
742 __submit_receive(ring, c, &c->cd[0], c->in_fd);
743 __submit_receive(ring, c, &c->cd[1], c->out_fd);
744 }
745
746 /*
747 * We hit -ENOBUFS, which means that we ran out of buffers in our current
748 * provided buffer group. This can happen if there's an imbalance between the
749 * receives coming in and the sends being processed, particularly with multishot
750 * receive as they can trigger very quickly. If this happens, defer arming a
751 * new receive until we've replenished half of the buffer pool by processing
752 * pending sends.
753 */
recv_enobufs(struct io_uring * ring,struct conn * c,struct conn_dir * cd,int fd)754 static void recv_enobufs(struct io_uring *ring, struct conn *c,
755 struct conn_dir *cd, int fd)
756 {
757 vlog("%d: enobufs hit\n", c->tid);
758
759 cd->rcv_enobufs++;
760
761 /*
762 * If we're a sink, mark rcv as rearm. If we're not, then mark us as
763 * needing a rearm for receive and send. The completing send will
764 * kick the recv rearm.
765 */
766 if (!is_sink) {
767 int do_recv_arm = 1;
768
769 if (!cd->pending_send)
770 do_recv_arm = !prep_next_send(ring, c, cd, fd);
771 if (do_recv_arm)
772 __submit_receive(ring, c, &c->cd[0], c->in_fd);
773 } else {
774 __submit_receive(ring, c, &c->cd[0], c->in_fd);
775 }
776 }
777
778 /*
779 * Kill this socket - submit a shutdown and link a close to it. We don't
780 * care about shutdown status, so mark it as not needing to post a CQE unless
781 * it fails.
782 */
queue_shutdown_close(struct io_uring * ring,struct conn * c,int fd)783 static void queue_shutdown_close(struct io_uring *ring, struct conn *c, int fd)
784 {
785 struct io_uring_sqe *sqe1, *sqe2;
786
787 /*
788 * On the off chance that we run out of SQEs after the first one,
789 * grab two upfront. This it to prevent our link not working if
790 * get_sqe() ends up doing submissions to free up an SQE, as links
791 * are not valid across separate submissions.
792 */
793 sqe1 = get_sqe(ring);
794 sqe2 = get_sqe(ring);
795
796 io_uring_prep_shutdown(sqe1, fd, SHUT_RDWR);
797 if (fixed_files)
798 sqe1->flags |= IOSQE_FIXED_FILE;
799 sqe1->flags |= IOSQE_IO_LINK | IOSQE_CQE_SKIP_SUCCESS;
800 encode_userdata(sqe1, c, __SHUTDOWN, 0, fd);
801
802 if (fixed_files)
803 io_uring_prep_close_direct(sqe2, fd);
804 else
805 io_uring_prep_close(sqe2, fd);
806 encode_userdata(sqe2, c, __CLOSE, 0, fd);
807 }
808
809 /*
810 * This connection is going away, queue a cancel for any pending recv, for
811 * example, we have pending for this ring. For completeness, we issue a cancel
812 * for any request we have pending for both in_fd and out_fd.
813 */
queue_cancel(struct io_uring * ring,struct conn * c)814 static void queue_cancel(struct io_uring *ring, struct conn *c)
815 {
816 struct io_uring_sqe *sqe;
817 int flags = 0;
818
819 if (fixed_files)
820 flags |= IORING_ASYNC_CANCEL_FD_FIXED;
821
822 sqe = get_sqe(ring);
823 io_uring_prep_cancel_fd(sqe, c->in_fd, flags);
824 encode_userdata(sqe, c, __CANCEL, 0, c->in_fd);
825 c->pending_cancels++;
826
827 if (c->out_fd != -1) {
828 sqe = get_sqe(ring);
829 io_uring_prep_cancel_fd(sqe, c->out_fd, flags);
830 encode_userdata(sqe, c, __CANCEL, 0, c->out_fd);
831 c->pending_cancels++;
832 }
833
834 io_uring_submit(ring);
835 }
836
pending_shutdown(struct conn * c)837 static int pending_shutdown(struct conn *c)
838 {
839 return c->cd[0].pending_shutdown + c->cd[1].pending_shutdown;
840 }
841
should_shutdown(struct conn * c)842 static bool should_shutdown(struct conn *c)
843 {
844 int i;
845
846 if (!pending_shutdown(c))
847 return false;
848 if (is_sink)
849 return true;
850 if (!bidi)
851 return c->cd[0].in_bytes == c->cd[1].out_bytes;
852
853 for (i = 0; i < 2; i++) {
854 if (c->cd[0].rcv != c->cd[1].snd)
855 return false;
856 if (c->cd[1].rcv != c->cd[0].snd)
857 return false;
858 }
859
860 return true;
861 }
862
863 /*
864 * Close this connection - send a ring message to the connection with intent
865 * to stop. When the client gets the message, it will initiate the stop.
866 */
__close_conn(struct io_uring * ring,struct conn * c)867 static void __close_conn(struct io_uring *ring, struct conn *c)
868 {
869 struct io_uring_sqe *sqe;
870 uint64_t user_data;
871
872 printf("Client %d: queueing stop\n", c->tid);
873
874 user_data = __raw_encode(c->tid, __STOP, 0, 0);
875 sqe = io_uring_get_sqe(ring);
876 io_uring_prep_msg_ring(sqe, c->ring.ring_fd, 0, user_data, 0);
877 encode_userdata(sqe, c, __NOP, 0, 0);
878 io_uring_submit(ring);
879 }
880
close_cd(struct conn * c,struct conn_dir * cd)881 static void close_cd(struct conn *c, struct conn_dir *cd)
882 {
883 cd->pending_shutdown = 1;
884
885 if (cd->pending_send)
886 return;
887
888 if (!(c->flags & CONN_F_PENDING_SHUTDOWN)) {
889 gettimeofday(&c->end_time, NULL);
890 c->flags |= CONN_F_PENDING_SHUTDOWN | CONN_F_END_TIME;
891 }
892 }
893
894 /*
895 * We're done with this buffer, add it back to our pool so the kernel is
896 * free to use it again.
897 */
replenish_buffer(struct conn_buf_ring * cbr,int bid,int offset)898 static int replenish_buffer(struct conn_buf_ring *cbr, int bid, int offset)
899 {
900 void *this_buf = cbr->buf + bid * buf_size;
901
902 assert(bid < nr_bufs);
903
904 io_uring_buf_ring_add(cbr->br, this_buf, buf_size, bid, br_mask, offset);
905 return buf_size;
906 }
907
908 /*
909 * Iterate buffers from '*bid' and with a total size of 'bytes' and add them
910 * back to our receive ring so they can be reused for new receives.
911 */
replenish_buffers(struct conn * c,int * bid,int bytes)912 static int replenish_buffers(struct conn *c, int *bid, int bytes)
913 {
914 struct conn_buf_ring *cbr = &c->in_br;
915 int nr_packets = 0;
916
917 while (bytes) {
918 int this_len = replenish_buffer(cbr, *bid, nr_packets);
919
920 if (this_len > bytes)
921 this_len = bytes;
922 bytes -= this_len;
923
924 *bid = (*bid + 1) & (nr_bufs - 1);
925 nr_packets++;
926 }
927
928 io_uring_buf_ring_advance(cbr->br, nr_packets);
929 return nr_packets;
930 }
931
free_mvec(struct msg_vec * mvec)932 static void free_mvec(struct msg_vec *mvec)
933 {
934 free(mvec->iov);
935 mvec->iov = NULL;
936 }
937
init_mvec(struct msg_vec * mvec)938 static void init_mvec(struct msg_vec *mvec)
939 {
940 memset(mvec, 0, sizeof(*mvec));
941 mvec->iov = malloc(sizeof(struct iovec));
942 mvec->vec_size = 1;
943 }
944
init_msgs(struct conn_dir * cd)945 static void init_msgs(struct conn_dir *cd)
946 {
947 memset(&cd->io_snd_msg, 0, sizeof(cd->io_snd_msg));
948 memset(&cd->io_rcv_msg, 0, sizeof(cd->io_rcv_msg));
949 init_mvec(&cd->io_snd_msg.vecs[0]);
950 init_mvec(&cd->io_snd_msg.vecs[1]);
951 init_mvec(&cd->io_rcv_msg.vecs[0]);
952 }
953
free_msgs(struct conn_dir * cd)954 static void free_msgs(struct conn_dir *cd)
955 {
956 free_mvec(&cd->io_snd_msg.vecs[0]);
957 free_mvec(&cd->io_snd_msg.vecs[1]);
958 free_mvec(&cd->io_rcv_msg.vecs[0]);
959 }
960
961 /*
962 * Multishot accept completion triggered. If we're acting as a sink, we're
963 * good to go. Just issue a receive for that case. If we're acting as a proxy,
964 * then start opening a socket that we can use to connect to the other end.
965 */
handle_accept(struct io_uring * ring,struct io_uring_cqe * cqe)966 static int handle_accept(struct io_uring *ring, struct io_uring_cqe *cqe)
967 {
968 struct conn *c;
969 int i;
970
971 if (nr_conns == MAX_CONNS) {
972 fprintf(stderr, "max clients reached %d\n", nr_conns);
973 return 1;
974 }
975
976 /* main thread handles this, which is obviously serialized */
977 c = &conns[nr_conns];
978 c->tid = nr_conns++;
979 c->in_fd = -1;
980 c->out_fd = -1;
981
982 for (i = 0; i < 2; i++) {
983 struct conn_dir *cd = &c->cd[i];
984
985 cd->index = i;
986 cd->snd_next_bid = -1;
987 cd->rcv_next_bid = -1;
988 if (ext_stat) {
989 cd->rcv_bucket = calloc(nr_bufs + 1, sizeof(int));
990 cd->snd_bucket = calloc(nr_bufs + 1, sizeof(int));
991 }
992 init_msgs(cd);
993 }
994
995 printf("New client: id=%d, in=%d\n", c->tid, c->in_fd);
996 gettimeofday(&c->start_time, NULL);
997
998 pthread_barrier_init(&c->startup_barrier, NULL, 2);
999 pthread_create(&c->thread, NULL, thread_main, c);
1000
1001 /*
1002 * Wait for thread to have its ring setup, then either assign the fd
1003 * if it's non-fixed, or pass the fixed one
1004 */
1005 pthread_barrier_wait(&c->startup_barrier);
1006 if (!fixed_files) {
1007 c->in_fd = cqe->res;
1008 } else {
1009 struct io_uring_sqe *sqe;
1010 uint64_t user_data;
1011
1012 /*
1013 * Ring has just been setup, we'll use index 0 as the descriptor
1014 * value.
1015 */
1016 user_data = __raw_encode(c->tid, __FD_PASS, 0, 0);
1017 sqe = io_uring_get_sqe(ring);
1018 io_uring_prep_msg_ring_fd(sqe, c->ring.ring_fd, cqe->res, 0,
1019 user_data, 0);
1020 encode_userdata(sqe, c, __NOP, 0, cqe->res);
1021 }
1022
1023 return 0;
1024 }
1025
1026 /*
1027 * Our socket request completed, issue a connect request to the other end.
1028 */
handle_sock(struct io_uring * ring,struct io_uring_cqe * cqe)1029 static int handle_sock(struct io_uring *ring, struct io_uring_cqe *cqe)
1030 {
1031 struct conn *c = cqe_to_conn(cqe);
1032 struct io_uring_sqe *sqe;
1033 int ret;
1034
1035 vlog("%d: sock: res=%d\n", c->tid, cqe->res);
1036
1037 c->out_fd = cqe->res;
1038
1039 if (ipv6) {
1040 memset(&c->addr6, 0, sizeof(c->addr6));
1041 c->addr6.sin6_family = AF_INET6;
1042 c->addr6.sin6_port = htons(send_port);
1043 ret = inet_pton(AF_INET6, host, &c->addr6.sin6_addr);
1044 } else {
1045 memset(&c->addr, 0, sizeof(c->addr));
1046 c->addr.sin_family = AF_INET;
1047 c->addr.sin_port = htons(send_port);
1048 ret = inet_pton(AF_INET, host, &c->addr.sin_addr);
1049 }
1050 if (ret <= 0) {
1051 if (!ret)
1052 fprintf(stderr, "host not in right format\n");
1053 else
1054 perror("inet_pton");
1055 return 1;
1056 }
1057
1058 sqe = get_sqe(ring);
1059 if (ipv6) {
1060 io_uring_prep_connect(sqe, c->out_fd,
1061 (struct sockaddr *) &c->addr6,
1062 sizeof(c->addr6));
1063 } else {
1064 io_uring_prep_connect(sqe, c->out_fd,
1065 (struct sockaddr *) &c->addr,
1066 sizeof(c->addr));
1067 }
1068 encode_userdata(sqe, c, __CONNECT, 0, c->out_fd);
1069 if (fixed_files)
1070 sqe->flags |= IOSQE_FIXED_FILE;
1071 return 0;
1072 }
1073
1074 /*
1075 * Connection to the other end is done, submit a receive to start receiving
1076 * data. If we're a bidirectional proxy, issue a receive on both ends. If not,
1077 * then just a single recv will do.
1078 */
handle_connect(struct io_uring * ring,struct io_uring_cqe * cqe)1079 static int handle_connect(struct io_uring *ring, struct io_uring_cqe *cqe)
1080 {
1081 struct conn *c = cqe_to_conn(cqe);
1082
1083 pthread_mutex_lock(&thread_lock);
1084 open_conns++;
1085 pthread_mutex_unlock(&thread_lock);
1086
1087 if (bidi)
1088 submit_bidi_receive(ring, c);
1089 else
1090 submit_receive(ring, c);
1091
1092 return 0;
1093 }
1094
1095 /*
1096 * Append new segment to our currently active msg_vec. This will be submitted
1097 * as a sendmsg (with all of it), or as separate sends, later. If we're using
1098 * send_ring, then we won't hit this path. Instead, outgoing buffers are
1099 * added directly to our outgoing send buffer ring.
1100 */
send_append_vec(struct conn_dir * cd,void * data,int len)1101 static void send_append_vec(struct conn_dir *cd, void *data, int len)
1102 {
1103 struct msg_vec *mvec = snd_msg_vec(cd);
1104
1105 if (mvec->iov_len == mvec->vec_size) {
1106 mvec->vec_size <<= 1;
1107 mvec->iov = realloc(mvec->iov, mvec->vec_size * sizeof(struct iovec));
1108 }
1109
1110 mvec->iov[mvec->iov_len].iov_base = data;
1111 mvec->iov[mvec->iov_len].iov_len = len;
1112 mvec->iov_len++;
1113 }
1114
1115 /*
1116 * Queue a send based on the data received in this cqe, which came from
1117 * a completed receive operation.
1118 */
send_append(struct conn * c,struct conn_dir * cd,void * data,int bid,int len)1119 static void send_append(struct conn *c, struct conn_dir *cd, void *data,
1120 int bid, int len)
1121 {
1122 vlog("%d: send %d (%p, bid %d)\n", c->tid, len, data, bid);
1123
1124 assert(bid < nr_bufs);
1125
1126 /* if using provided buffers for send, add it upfront */
1127 if (send_ring) {
1128 struct conn_buf_ring *cbr = &c->out_br;
1129
1130 io_uring_buf_ring_add(cbr->br, data, len, bid, br_mask, 0);
1131 io_uring_buf_ring_advance(cbr->br, 1);
1132 } else {
1133 send_append_vec(cd, data, len);
1134 }
1135 }
1136
1137 /*
1138 * For non recvmsg && multishot, a zero receive marks the end. For recvmsg
1139 * with multishot, we always get the header regardless. Hence a "zero receive"
1140 * is the size of the header.
1141 */
recv_done_res(int res)1142 static int recv_done_res(int res)
1143 {
1144 if (!res)
1145 return 1;
1146 if (rcv_msg && recv_mshot && res == sizeof(struct io_uring_recvmsg_out))
1147 return 1;
1148 return 0;
1149 }
1150
1151 /*
1152 * Any receive that isn't recvmsg with multishot can be handled the same way.
1153 * Iterate from '*bid' and 'in_bytes' in total, and append the data to the
1154 * outgoing queue.
1155 */
recv_bids(struct conn * c,struct conn_dir * cd,int * bid,int in_bytes)1156 static int recv_bids(struct conn *c, struct conn_dir *cd, int *bid, int in_bytes)
1157 {
1158 struct conn_buf_ring *cbr = &c->out_br;
1159 struct conn_buf_ring *in_cbr = &c->in_br;
1160 struct io_uring_buf *buf;
1161 int nr_packets = 0;
1162
1163 while (in_bytes) {
1164 int this_bytes;
1165 void *data;
1166
1167 buf = &in_cbr->br->bufs[*bid];
1168 data = (void *) (unsigned long) buf->addr;
1169 this_bytes = buf->len;
1170 if (this_bytes > in_bytes)
1171 this_bytes = in_bytes;
1172
1173 in_bytes -= this_bytes;
1174
1175 if (send_ring)
1176 io_uring_buf_ring_add(cbr->br, data, this_bytes, *bid,
1177 br_mask, nr_packets);
1178 else
1179 send_append(c, cd, data, *bid, this_bytes);
1180
1181 *bid = (*bid + 1) & (nr_bufs - 1);
1182 nr_packets++;
1183 }
1184
1185 if (send_ring)
1186 io_uring_buf_ring_advance(cbr->br, nr_packets);
1187
1188 return nr_packets;
1189 }
1190
1191 /*
1192 * Special handling of recvmsg with multishot
1193 */
recv_mshot_msg(struct conn * c,struct conn_dir * cd,int * bid,int in_bytes)1194 static int recv_mshot_msg(struct conn *c, struct conn_dir *cd, int *bid,
1195 int in_bytes)
1196 {
1197 struct conn_buf_ring *cbr = &c->out_br;
1198 struct conn_buf_ring *in_cbr = &c->in_br;
1199 struct io_uring_buf *buf;
1200 int nr_packets = 0;
1201
1202 while (in_bytes) {
1203 struct io_uring_recvmsg_out *pdu;
1204 int this_bytes;
1205 void *data;
1206
1207 buf = &in_cbr->br->bufs[*bid];
1208
1209 /*
1210 * multishot recvmsg puts a header in front of the data - we
1211 * have to take that into account for the send setup, and
1212 * adjust the actual data read to not take this metadata into
1213 * account. For this use case, namelen and controllen will not
1214 * be set. If they were, they would need to be factored in too.
1215 */
1216 buf->len -= sizeof(struct io_uring_recvmsg_out);
1217 in_bytes -= sizeof(struct io_uring_recvmsg_out);
1218
1219 pdu = (void *) (unsigned long) buf->addr;
1220 vlog("pdu namelen %d, controllen %d, payload %d flags %x\n",
1221 pdu->namelen, pdu->controllen, pdu->payloadlen,
1222 pdu->flags);
1223 data = (void *) (pdu + 1);
1224
1225 this_bytes = pdu->payloadlen;
1226 if (this_bytes > in_bytes)
1227 this_bytes = in_bytes;
1228
1229 in_bytes -= this_bytes;
1230
1231 if (send_ring)
1232 io_uring_buf_ring_add(cbr->br, data, this_bytes, *bid,
1233 br_mask, nr_packets);
1234 else
1235 send_append(c, cd, data, *bid, this_bytes);
1236
1237 *bid = (*bid + 1) & (nr_bufs - 1);
1238 nr_packets++;
1239 }
1240
1241 if (send_ring)
1242 io_uring_buf_ring_advance(cbr->br, nr_packets);
1243
1244 return nr_packets;
1245 }
1246
__handle_recv(struct io_uring * ring,struct conn * c,struct conn_dir * cd,struct io_uring_cqe * cqe)1247 static int __handle_recv(struct io_uring *ring, struct conn *c,
1248 struct conn_dir *cd, struct io_uring_cqe *cqe)
1249 {
1250 struct conn_dir *ocd = &c->cd[!cd->index];
1251 int bid, nr_packets;
1252
1253 /*
1254 * Not having a buffer attached should only happen if we get a zero
1255 * sized receive, because the other end closed the connection. It
1256 * cannot happen otherwise, as all our receives are using provided
1257 * buffers and hence it's not possible to return a CQE with a non-zero
1258 * result and not have a buffer attached.
1259 */
1260 if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
1261 cd->pending_recv = 0;
1262
1263 if (!recv_done_res(cqe->res)) {
1264 fprintf(stderr, "no buffer assigned, res=%d\n", cqe->res);
1265 return 1;
1266 }
1267 start_close:
1268 prep_next_send(ring, c, ocd, other_dir_fd(c, cqe_to_fd(cqe)));
1269 close_cd(c, cd);
1270 return 0;
1271 }
1272
1273 if (cqe->res && cqe->res < buf_size)
1274 cd->rcv_shrt++;
1275
1276 bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
1277
1278 /*
1279 * BIDI will use the same buffer pool and do receive on both CDs,
1280 * so can't reliably check. TODO.
1281 */
1282 if (!bidi && cd->rcv_next_bid != -1 && bid != cd->rcv_next_bid) {
1283 fprintf(stderr, "recv bid %d, wanted %d\n", bid, cd->rcv_next_bid);
1284 goto start_close;
1285 }
1286
1287 vlog("%d: recv: bid=%d, res=%d, cflags=%x\n", c->tid, bid, cqe->res, cqe->flags);
1288 /*
1289 * If we're a sink, we're done here. Just replenish the buffer back
1290 * to the pool. For proxy mode, we will send the data to the other
1291 * end and the buffer will be replenished once the send is done with
1292 * it.
1293 */
1294 if (is_sink)
1295 nr_packets = replenish_buffers(c, &bid, cqe->res);
1296 else if (rcv_msg && recv_mshot)
1297 nr_packets = recv_mshot_msg(c, ocd, &bid, cqe->res);
1298 else
1299 nr_packets = recv_bids(c, ocd, &bid, cqe->res);
1300
1301 if (cd->rcv_bucket)
1302 cd->rcv_bucket[nr_packets]++;
1303
1304 if (!is_sink) {
1305 ocd->out_buffers += nr_packets;
1306 assert(ocd->out_buffers <= nr_bufs);
1307 }
1308
1309 cd->rcv++;
1310 cd->rcv_next_bid = bid;
1311
1312 /*
1313 * If IORING_CQE_F_MORE isn't set, then this is either a normal recv
1314 * that needs rearming, or it's a multishot that won't post any further
1315 * completions. Setup a new one for these cases.
1316 */
1317 if (!(cqe->flags & IORING_CQE_F_MORE)) {
1318 cd->pending_recv = 0;
1319 if (recv_done_res(cqe->res))
1320 goto start_close;
1321 if (is_sink)
1322 __submit_receive(ring, c, &c->cd[0], c->in_fd);
1323 }
1324
1325 /*
1326 * Submit a send if we won't get anymore notifications from this
1327 * recv, or if we have nr_bufs / 2 queued up. If BIDI mode, send
1328 * every buffer. We assume this is interactive mode, and hence don't
1329 * delay anything.
1330 */
1331 if (((!ocd->pending_send && (bidi || (ocd->out_buffers >= nr_bufs / 2))) ||
1332 !(cqe->flags & IORING_CQE_F_MORE)) && !is_sink)
1333 prep_next_send(ring, c, ocd, other_dir_fd(c, cqe_to_fd(cqe)));
1334
1335 if (!recv_done_res(cqe->res))
1336 cd->in_bytes += cqe->res;
1337 return 0;
1338 }
1339
handle_recv(struct io_uring * ring,struct io_uring_cqe * cqe)1340 static int handle_recv(struct io_uring *ring, struct io_uring_cqe *cqe)
1341 {
1342 struct conn *c = cqe_to_conn(cqe);
1343 struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
1344
1345 return __handle_recv(ring, c, cd, cqe);
1346 }
1347
recv_error(struct error_handler * err,struct io_uring * ring,struct io_uring_cqe * cqe)1348 static int recv_error(struct error_handler *err, struct io_uring *ring,
1349 struct io_uring_cqe *cqe)
1350 {
1351 struct conn *c = cqe_to_conn(cqe);
1352 struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
1353
1354 cd->pending_recv = 0;
1355
1356 if (cqe->res != -ENOBUFS)
1357 return default_error(err, ring, cqe);
1358
1359 recv_enobufs(ring, c, cd, other_dir_fd(c, cqe_to_fd(cqe)));
1360 return 0;
1361 }
1362
submit_send(struct io_uring * ring,struct conn * c,struct conn_dir * cd,int fd,void * data,int len,int bid,int flags)1363 static void submit_send(struct io_uring *ring, struct conn *c,
1364 struct conn_dir *cd, int fd, void *data, int len,
1365 int bid, int flags)
1366 {
1367 struct io_uring_sqe *sqe;
1368 int bgid = c->out_br.bgid;
1369
1370 if (cd->pending_send)
1371 return;
1372 cd->pending_send = 1;
1373
1374 flags |= MSG_WAITALL | MSG_NOSIGNAL;
1375
1376 sqe = get_sqe(ring);
1377 if (snd_msg) {
1378 struct io_msg *imsg = &cd->io_snd_msg;
1379
1380 if (snd_zc) {
1381 io_uring_prep_sendmsg_zc(sqe, fd, &imsg->msg, flags);
1382 cd->snd_notif++;
1383 } else {
1384 io_uring_prep_sendmsg(sqe, fd, &imsg->msg, flags);
1385 }
1386 } else if (send_ring) {
1387 io_uring_prep_send(sqe, fd, NULL, 0, flags);
1388 } else if (!snd_zc) {
1389 io_uring_prep_send(sqe, fd, data, len, flags);
1390 } else {
1391 io_uring_prep_send_zc(sqe, fd, data, len, flags, 0);
1392 sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
1393 sqe->buf_index = bid;
1394 cd->snd_notif++;
1395 }
1396 encode_userdata(sqe, c, __SEND, bid, fd);
1397 if (fixed_files)
1398 sqe->flags |= IOSQE_FIXED_FILE;
1399 if (send_ring) {
1400 sqe->flags |= IOSQE_BUFFER_SELECT;
1401 sqe->buf_group = bgid;
1402 }
1403 if (snd_bundle) {
1404 sqe->ioprio |= IORING_RECVSEND_BUNDLE;
1405 cd->snd_mshot++;
1406 } else if (send_ring)
1407 cd->snd_mshot++;
1408 }
1409
1410 /*
1411 * Prepare the next send request, if we need to. If one is already pending,
1412 * or if we're a sink and we don't need to do sends, then there's nothing
1413 * to do.
1414 *
1415 * Return 1 if another send completion is expected, 0 if not.
1416 */
prep_next_send(struct io_uring * ring,struct conn * c,struct conn_dir * cd,int fd)1417 static int prep_next_send(struct io_uring *ring, struct conn *c,
1418 struct conn_dir *cd, int fd)
1419 {
1420 int bid;
1421
1422 if (cd->pending_send || is_sink)
1423 return 0;
1424 if (!cd->out_buffers)
1425 return 0;
1426
1427 bid = cd->snd_next_bid;
1428 if (bid == -1)
1429 bid = 0;
1430
1431 if (send_ring) {
1432 /*
1433 * send_ring mode is easy, there's nothing to do but submit
1434 * our next send request. That will empty the entire outgoing
1435 * queue.
1436 */
1437 submit_send(ring, c, cd, fd, NULL, 0, bid, 0);
1438 return 1;
1439 } else if (snd_msg) {
1440 /*
1441 * For sendmsg mode, submit our currently prepared iovec, if
1442 * we have one, and swap our iovecs so that any further
1443 * receives will start preparing that one.
1444 */
1445 struct io_msg *imsg = &cd->io_snd_msg;
1446
1447 if (!msg_vec(imsg)->iov_len)
1448 return 0;
1449 imsg->msg.msg_iov = msg_vec(imsg)->iov;
1450 imsg->msg.msg_iovlen = msg_vec(imsg)->iov_len;
1451 msg_vec(imsg)->iov_len = 0;
1452 imsg->vec_index = !imsg->vec_index;
1453 submit_send(ring, c, cd, fd, NULL, 0, bid, 0);
1454 return 1;
1455 } else {
1456 /*
1457 * send without send_ring - submit the next available vec,
1458 * if any. If this vec is the last one in the current series,
1459 * then swap to the next vec. We flag each send with MSG_MORE,
1460 * unless this is the last part of the current vec.
1461 */
1462 struct io_msg *imsg = &cd->io_snd_msg;
1463 struct msg_vec *mvec = msg_vec(imsg);
1464 int flags = !snd_zc ? MSG_MORE : 0;
1465 struct iovec *iov;
1466
1467 if (mvec->iov_len == mvec->cur_iov)
1468 return 0;
1469 imsg->msg.msg_iov = msg_vec(imsg)->iov;
1470 iov = &mvec->iov[mvec->cur_iov];
1471 mvec->cur_iov++;
1472 if (mvec->cur_iov == mvec->iov_len) {
1473 mvec->iov_len = 0;
1474 mvec->cur_iov = 0;
1475 imsg->vec_index = !imsg->vec_index;
1476 flags = 0;
1477 }
1478 submit_send(ring, c, cd, fd, iov->iov_base, iov->iov_len, bid, flags);
1479 return 1;
1480 }
1481 }
1482
1483 /*
1484 * Handling a send with an outgoing send ring. Get the buffers from the
1485 * receive side, and add them to the ingoing buffer ring again.
1486 */
handle_send_ring(struct conn * c,struct conn_dir * cd,int bid,int bytes)1487 static int handle_send_ring(struct conn *c, struct conn_dir *cd,
1488 int bid, int bytes)
1489 {
1490 struct conn_buf_ring *in_cbr = &c->in_br;
1491 struct conn_buf_ring *out_cbr = &c->out_br;
1492 int i = 0;
1493
1494 while (bytes) {
1495 struct io_uring_buf *buf = &out_cbr->br->bufs[bid];
1496 int this_bytes;
1497 void *this_buf;
1498
1499 this_bytes = buf->len;
1500 if (this_bytes > bytes)
1501 this_bytes = bytes;
1502
1503 cd->out_bytes += this_bytes;
1504
1505 vlog("%d: send: bid=%d, len=%d\n", c->tid, bid, this_bytes);
1506
1507 this_buf = in_cbr->buf + bid * buf_size;
1508 io_uring_buf_ring_add(in_cbr->br, this_buf, buf_size, bid, br_mask, i);
1509 /*
1510 * Find the provided buffer that the receive consumed, and
1511 * which we then used for the send, and add it back to the
1512 * pool so it can get picked by another receive. Once the send
1513 * is done, we're done with it.
1514 */
1515 bid = (bid + 1) & (nr_bufs - 1);
1516 bytes -= this_bytes;
1517 i++;
1518 }
1519 cd->snd_next_bid = bid;
1520 io_uring_buf_ring_advance(in_cbr->br, i);
1521
1522 if (pending_shutdown(c))
1523 close_cd(c, cd);
1524
1525 return i;
1526 }
1527
1528 /*
1529 * sendmsg, or send without a ring. Just add buffers back to the ingoing
1530 * ring for receives.
1531 */
handle_send_buf(struct conn * c,struct conn_dir * cd,int bid,int bytes)1532 static int handle_send_buf(struct conn *c, struct conn_dir *cd, int bid,
1533 int bytes)
1534 {
1535 struct conn_buf_ring *in_cbr = &c->in_br;
1536 int i = 0;
1537
1538 while (bytes) {
1539 struct io_uring_buf *buf = &in_cbr->br->bufs[bid];
1540 int this_bytes;
1541
1542 this_bytes = bytes;
1543 if (this_bytes > buf->len)
1544 this_bytes = buf->len;
1545
1546 vlog("%d: send: bid=%d, len=%d\n", c->tid, bid, this_bytes);
1547
1548 cd->out_bytes += this_bytes;
1549 /* each recvmsg mshot package has this overhead */
1550 if (rcv_msg && recv_mshot)
1551 cd->out_bytes += sizeof(struct io_uring_recvmsg_out);
1552 replenish_buffer(in_cbr, bid, i);
1553 bid = (bid + 1) & (nr_bufs - 1);
1554 bytes -= this_bytes;
1555 i++;
1556 }
1557 io_uring_buf_ring_advance(in_cbr->br, i);
1558 cd->snd_next_bid = bid;
1559 return i;
1560 }
1561
__handle_send(struct io_uring * ring,struct conn * c,struct conn_dir * cd,struct io_uring_cqe * cqe)1562 static int __handle_send(struct io_uring *ring, struct conn *c,
1563 struct conn_dir *cd, struct io_uring_cqe *cqe)
1564 {
1565 struct conn_dir *ocd;
1566 int bid, nr_packets;
1567
1568 if (send_ring) {
1569 if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
1570 fprintf(stderr, "no buffer in send?! %d\n", cqe->res);
1571 return 1;
1572 }
1573 bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
1574 } else {
1575 bid = cqe_to_bid(cqe);
1576 }
1577
1578 /*
1579 * CQE notifications only happen with send/sendmsg zerocopy. They
1580 * tell us that the data has been acked, and that hence the buffer
1581 * is now free to reuse. Waiting on an ACK for each packet will slow
1582 * us down tremendously, so do all of our sends and then wait for
1583 * the ACKs to come in. They tend to come in bundles anyway. Once
1584 * all acks are done (cd->snd_notif == 0), then fire off the next
1585 * receive.
1586 */
1587 if (cqe->flags & IORING_CQE_F_NOTIF) {
1588 cd->snd_notif--;
1589 } else {
1590 if (cqe->res && cqe->res < buf_size)
1591 cd->snd_shrt++;
1592
1593 /*
1594 * BIDI will use the same buffer pool and do sends on both CDs,
1595 * so can't reliably check. TODO.
1596 */
1597 if (!bidi && send_ring && cd->snd_next_bid != -1 &&
1598 bid != cd->snd_next_bid) {
1599 fprintf(stderr, "send bid %d, wanted %d at %lu\n", bid,
1600 cd->snd_next_bid, cd->out_bytes);
1601 goto out_close;
1602 }
1603
1604 assert(bid <= nr_bufs);
1605
1606 vlog("send: got %d, %lu\n", cqe->res, cd->out_bytes);
1607
1608 if (send_ring)
1609 nr_packets = handle_send_ring(c, cd, bid, cqe->res);
1610 else
1611 nr_packets = handle_send_buf(c, cd, bid, cqe->res);
1612
1613 if (cd->snd_bucket)
1614 cd->snd_bucket[nr_packets]++;
1615
1616 cd->out_buffers -= nr_packets;
1617 assert(cd->out_buffers >= 0);
1618
1619 cd->snd++;
1620 }
1621
1622 if (!(cqe->flags & IORING_CQE_F_MORE)) {
1623 int do_recv_arm;
1624
1625 cd->pending_send = 0;
1626
1627 /*
1628 * send done - see if the current vec has data to submit, and
1629 * do so if it does. if it doesn't have data yet, nothing to
1630 * do.
1631 */
1632 do_recv_arm = !prep_next_send(ring, c, cd, cqe_to_fd(cqe));
1633
1634 ocd = &c->cd[!cd->index];
1635 if (!cd->snd_notif && do_recv_arm && !ocd->pending_recv) {
1636 int fd = other_dir_fd(c, cqe_to_fd(cqe));
1637
1638 __submit_receive(ring, c, ocd, fd);
1639 }
1640 out_close:
1641 if (pending_shutdown(c))
1642 close_cd(c, cd);
1643 }
1644
1645 vlog("%d: pending sends %d\n", c->tid, cd->pending_send);
1646 return 0;
1647 }
1648
handle_send(struct io_uring * ring,struct io_uring_cqe * cqe)1649 static int handle_send(struct io_uring *ring, struct io_uring_cqe *cqe)
1650 {
1651 struct conn *c = cqe_to_conn(cqe);
1652 struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
1653
1654 return __handle_send(ring, c, cd, cqe);
1655 }
1656
send_error(struct error_handler * err,struct io_uring * ring,struct io_uring_cqe * cqe)1657 static int send_error(struct error_handler *err, struct io_uring *ring,
1658 struct io_uring_cqe *cqe)
1659 {
1660 struct conn *c = cqe_to_conn(cqe);
1661 struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
1662
1663 cd->pending_send = 0;
1664
1665 /* res can have high bit set */
1666 if (cqe->flags & IORING_CQE_F_NOTIF)
1667 return handle_send(ring, cqe);
1668 if (cqe->res != -ENOBUFS)
1669 return default_error(err, ring, cqe);
1670
1671 cd->snd_enobufs++;
1672 return 0;
1673 }
1674
1675 /*
1676 * We don't expect to get here, as we marked it with skipping posting a
1677 * CQE if it was successful. If it does trigger, than means it fails and
1678 * that our close has not been done. Log the shutdown error and issue a new
1679 * separate close.
1680 */
handle_shutdown(struct io_uring * ring,struct io_uring_cqe * cqe)1681 static int handle_shutdown(struct io_uring *ring, struct io_uring_cqe *cqe)
1682 {
1683 struct conn *c = cqe_to_conn(cqe);
1684 struct io_uring_sqe *sqe;
1685 int fd = cqe_to_fd(cqe);
1686
1687 fprintf(stderr, "Got shutdown notication on fd %d\n", fd);
1688
1689 if (!cqe->res)
1690 fprintf(stderr, "Unexpected success shutdown CQE\n");
1691 else if (cqe->res < 0)
1692 fprintf(stderr, "Shutdown got %s\n", strerror(-cqe->res));
1693
1694 sqe = get_sqe(ring);
1695 if (fixed_files)
1696 io_uring_prep_close_direct(sqe, fd);
1697 else
1698 io_uring_prep_close(sqe, fd);
1699 encode_userdata(sqe, c, __CLOSE, 0, fd);
1700 return 0;
1701 }
1702
1703 /*
1704 * Final stage of a connection, the shutdown and close has finished. Mark
1705 * it as disconnected and let the main loop reap it.
1706 */
handle_close(struct io_uring * ring,struct io_uring_cqe * cqe)1707 static int handle_close(struct io_uring *ring, struct io_uring_cqe *cqe)
1708 {
1709 struct conn *c = cqe_to_conn(cqe);
1710 int fd = cqe_to_fd(cqe);
1711
1712 printf("Closed client: id=%d, in_fd=%d, out_fd=%d\n", c->tid, c->in_fd, c->out_fd);
1713 if (fd == c->in_fd)
1714 c->in_fd = -1;
1715 else if (fd == c->out_fd)
1716 c->out_fd = -1;
1717
1718 if (c->in_fd == -1 && c->out_fd == -1) {
1719 c->flags |= CONN_F_DISCONNECTED;
1720
1721 pthread_mutex_lock(&thread_lock);
1722 __show_stats(c);
1723 open_conns--;
1724 pthread_mutex_unlock(&thread_lock);
1725 free_buffer_rings(ring, c);
1726 free_msgs(&c->cd[0]);
1727 free_msgs(&c->cd[1]);
1728 free(c->cd[0].rcv_bucket);
1729 free(c->cd[0].snd_bucket);
1730 }
1731
1732 return 0;
1733 }
1734
handle_cancel(struct io_uring * ring,struct io_uring_cqe * cqe)1735 static int handle_cancel(struct io_uring *ring, struct io_uring_cqe *cqe)
1736 {
1737 struct conn *c = cqe_to_conn(cqe);
1738 int fd = cqe_to_fd(cqe);
1739
1740 c->pending_cancels--;
1741
1742 vlog("%d: got cancel fd %d, refs %d\n", c->tid, fd, c->pending_cancels);
1743
1744 if (!c->pending_cancels) {
1745 queue_shutdown_close(ring, c, c->in_fd);
1746 if (c->out_fd != -1)
1747 queue_shutdown_close(ring, c, c->out_fd);
1748 io_uring_submit(ring);
1749 }
1750
1751 return 0;
1752 }
1753
open_socket(struct conn * c)1754 static void open_socket(struct conn *c)
1755 {
1756 if (is_sink) {
1757 pthread_mutex_lock(&thread_lock);
1758 open_conns++;
1759 pthread_mutex_unlock(&thread_lock);
1760
1761 submit_receive(&c->ring, c);
1762 } else {
1763 struct io_uring_sqe *sqe;
1764 int domain;
1765
1766 if (ipv6)
1767 domain = AF_INET6;
1768 else
1769 domain = AF_INET;
1770
1771 /*
1772 * If fixed_files is set, proxy will use fixed files for any new
1773 * file descriptors it instantiates. Fixd files, or fixed
1774 * descriptors, are io_uring private file descriptors. They
1775 * cannot be accessed outside of io_uring. io_uring holds a
1776 * fixed reference to them, which means that we do not need to
1777 * grab per-request references to them. Particularly for
1778 * threaded applications, grabbing and dropping file references
1779 * for each operation can be costly as the file table is shared.
1780 * This generally shows up as fget/fput related overhead in any
1781 * workload profiles.
1782 *
1783 * Fixed descriptors are passed in via the 'fd' field just like
1784 * regular descriptors, and then marked as such by setting the
1785 * IOSQE_FIXED_FILE flag in the sqe->flags field. Some helpers
1786 * do that automatically, like the below, others will need it
1787 * set manually if they don't have a *direct*() helper.
1788 *
1789 * For operations that instantiate them, like the opening of a
1790 * direct socket, the application may either ask the kernel to
1791 * find a free one (as is done below), or the application may
1792 * manage the space itself and pass in an index for a currently
1793 * free slot in the table. If the kernel is asked to allocate a
1794 * free direct descriptor, note that io_uring does not abide by
1795 * the POSIX mandated "lowest free must be returned". It may
1796 * return any free descriptor of its choosing.
1797 */
1798 sqe = get_sqe(&c->ring);
1799 if (fixed_files)
1800 io_uring_prep_socket_direct_alloc(sqe, domain, SOCK_STREAM, 0, 0);
1801 else
1802 io_uring_prep_socket(sqe, domain, SOCK_STREAM, 0, 0);
1803 encode_userdata(sqe, c, __SOCK, 0, 0);
1804 }
1805 }
1806
1807 /*
1808 * Start of connection, we got our in descriptor.
1809 */
handle_fd_pass(struct io_uring_cqe * cqe)1810 static int handle_fd_pass(struct io_uring_cqe *cqe)
1811 {
1812 struct conn *c = cqe_to_conn(cqe);
1813 int fd = cqe_to_fd(cqe);
1814
1815 vlog("%d: got fd pass %d\n", c->tid, fd);
1816 c->in_fd = fd;
1817 open_socket(c);
1818 return 0;
1819 }
1820
handle_stop(struct io_uring_cqe * cqe)1821 static int handle_stop(struct io_uring_cqe *cqe)
1822 {
1823 struct conn *c = cqe_to_conn(cqe);
1824
1825 printf("Client %d: queueing shutdown\n", c->tid);
1826 queue_cancel(&c->ring, c);
1827 return 0;
1828 }
1829
1830 /*
1831 * Called for each CQE that we receive. Decode the request type that it
1832 * came from, and call the appropriate handler.
1833 */
handle_cqe(struct io_uring * ring,struct io_uring_cqe * cqe)1834 static int handle_cqe(struct io_uring *ring, struct io_uring_cqe *cqe)
1835 {
1836 int ret;
1837
1838 /*
1839 * Unlikely, but there's an error in this CQE. If an error handler
1840 * is defined, call it, and that will deal with it. If no error
1841 * handler is defined, the opcode handler either doesn't care or will
1842 * handle it on its own.
1843 */
1844 if (cqe->res < 0) {
1845 struct error_handler *err = &error_handlers[cqe_to_op(cqe)];
1846
1847 if (err->error_fn)
1848 return err->error_fn(err, ring, cqe);
1849 }
1850
1851 switch (cqe_to_op(cqe)) {
1852 case __ACCEPT:
1853 ret = handle_accept(ring, cqe);
1854 break;
1855 case __SOCK:
1856 ret = handle_sock(ring, cqe);
1857 break;
1858 case __CONNECT:
1859 ret = handle_connect(ring, cqe);
1860 break;
1861 case __RECV:
1862 case __RECVMSG:
1863 ret = handle_recv(ring, cqe);
1864 break;
1865 case __SEND:
1866 case __SENDMSG:
1867 ret = handle_send(ring, cqe);
1868 break;
1869 case __CANCEL:
1870 ret = handle_cancel(ring, cqe);
1871 break;
1872 case __SHUTDOWN:
1873 ret = handle_shutdown(ring, cqe);
1874 break;
1875 case __CLOSE:
1876 ret = handle_close(ring, cqe);
1877 break;
1878 case __FD_PASS:
1879 ret = handle_fd_pass(cqe);
1880 break;
1881 case __STOP:
1882 ret = handle_stop(cqe);
1883 break;
1884 case __NOP:
1885 ret = 0;
1886 break;
1887 default:
1888 fprintf(stderr, "bad user data %lx\n", (long) cqe->user_data);
1889 return 1;
1890 }
1891
1892 return ret;
1893 }
1894
house_keeping(struct io_uring * ring)1895 static void house_keeping(struct io_uring *ring)
1896 {
1897 static unsigned long last_bytes;
1898 unsigned long bytes, elapsed;
1899 struct conn *c;
1900 int i, j;
1901
1902 vlog("House keeping entered\n");
1903
1904 bytes = 0;
1905 for (i = 0; i < nr_conns; i++) {
1906 c = &conns[i];
1907
1908 for (j = 0; j < 2; j++) {
1909 struct conn_dir *cd = &c->cd[j];
1910
1911 bytes += cd->in_bytes + cd->out_bytes;
1912 }
1913 if (c->flags & CONN_F_DISCONNECTED) {
1914 vlog("%d: disconnected\n", i);
1915
1916 if (!(c->flags & CONN_F_REAPED)) {
1917 void *ret;
1918
1919 pthread_join(c->thread, &ret);
1920 c->flags |= CONN_F_REAPED;
1921 }
1922 continue;
1923 }
1924 if (c->flags & CONN_F_DISCONNECTING)
1925 continue;
1926
1927 if (should_shutdown(c)) {
1928 __close_conn(ring, c);
1929 c->flags |= CONN_F_DISCONNECTING;
1930 }
1931 }
1932
1933 elapsed = mtime_since_now(&last_housekeeping);
1934 if (bytes && elapsed >= 900) {
1935 unsigned long bw;
1936
1937 bw = (8 * (bytes - last_bytes) / 1000UL) / elapsed;
1938 if (bw) {
1939 if (open_conns)
1940 printf("Bandwidth (threads=%d): %'luMbit\n", open_conns, bw);
1941 gettimeofday(&last_housekeeping, NULL);
1942 last_bytes = bytes;
1943 }
1944 }
1945 }
1946
1947 /*
1948 * Event loop shared between the parent, and the connections. Could be
1949 * split in two, as they don't handle the same types of events. For the per
1950 * connection loop, 'c' is valid. For the main loop, it's NULL.
1951 */
__event_loop(struct io_uring * ring,struct conn * c)1952 static int __event_loop(struct io_uring *ring, struct conn *c)
1953 {
1954 struct __kernel_timespec active_ts, idle_ts;
1955 int flags;
1956
1957 idle_ts.tv_sec = 0;
1958 idle_ts.tv_nsec = 100000000LL;
1959 active_ts = idle_ts;
1960 if (wait_usec > 1000000) {
1961 active_ts.tv_sec = wait_usec / 1000000;
1962 wait_usec -= active_ts.tv_sec * 1000000;
1963 }
1964 active_ts.tv_nsec = wait_usec * 1000;
1965
1966 gettimeofday(&last_housekeeping, NULL);
1967
1968 flags = 0;
1969 while (1) {
1970 struct __kernel_timespec *ts = &idle_ts;
1971 struct io_uring_cqe *cqe;
1972 unsigned int head;
1973 int ret, i, to_wait;
1974
1975 /*
1976 * If wait_batch is set higher than 1, then we'll wait on
1977 * that amount of CQEs to be posted each loop. If used with
1978 * DEFER_TASKRUN, this can provide a substantial reduction
1979 * in context switch rate as the task isn't woken until the
1980 * requested number of events can be returned.
1981 *
1982 * Can be used with -t to set a wait_usec timeout as well.
1983 * For example, if an application can deal with 250 usec
1984 * of wait latencies, it can set -w8 -t250 which will cause
1985 * io_uring to return when either 8 events have been received,
1986 * or if 250 usec of waiting has passed.
1987 *
1988 * If we don't have any open connections, wait on just 1
1989 * always.
1990 */
1991 to_wait = 1;
1992 if (open_conns && !flags) {
1993 ts = &active_ts;
1994 to_wait = wait_batch;
1995 }
1996
1997 vlog("Submit and wait for %d\n", to_wait);
1998 ret = io_uring_submit_and_wait_timeout(ring, &cqe, to_wait, ts, NULL);
1999
2000 if (*ring->cq.koverflow)
2001 printf("overflow %u\n", *ring->cq.koverflow);
2002 if (*ring->sq.kflags & IORING_SQ_CQ_OVERFLOW)
2003 printf("saw overflow\n");
2004
2005 vlog("Submit and wait: %d\n", ret);
2006
2007 i = flags = 0;
2008 io_uring_for_each_cqe(ring, head, cqe) {
2009 if (handle_cqe(ring, cqe))
2010 return 1;
2011 flags |= cqe_to_conn(cqe)->flags;
2012 ++i;
2013 }
2014
2015 vlog("Handled %d events\n", i);
2016
2017 /*
2018 * Advance the CQ ring for seen events when we've processed
2019 * all of them in this loop. This can also be done with
2020 * io_uring_cqe_seen() in each handler above, which just marks
2021 * that single CQE as seen. However, it's more efficient to
2022 * mark a batch as seen when we're done with that batch.
2023 */
2024 if (i) {
2025 io_uring_cq_advance(ring, i);
2026 events += i;
2027 }
2028
2029 event_loops++;
2030 if (c) {
2031 if (c->flags & CONN_F_DISCONNECTED)
2032 break;
2033 } else {
2034 house_keeping(ring);
2035 }
2036 }
2037
2038 return 0;
2039 }
2040
2041 /*
2042 * Main event loop, Submit our multishot accept request, and then just loop
2043 * around handling incoming connections.
2044 */
parent_loop(struct io_uring * ring,int fd)2045 static int parent_loop(struct io_uring *ring, int fd)
2046 {
2047 struct io_uring_sqe *sqe;
2048
2049 /*
2050 * proxy provides a way to use either multishot receive or not, but
2051 * for accept, we always use multishot. A multishot accept request
2052 * needs only be armed once, and then it'll trigger a completion and
2053 * post a CQE whenever a new connection is accepted. No need to do
2054 * anything else, unless the multishot accept terminates. This happens
2055 * if it encounters an error. Applications should check for
2056 * IORING_CQE_F_MORE in cqe->flags - this tells you if more completions
2057 * are expected from this request or not. Non-multishot never have
2058 * this set, where multishot will always have this set unless an error
2059 * occurs.
2060 */
2061 sqe = get_sqe(ring);
2062 if (fixed_files)
2063 io_uring_prep_multishot_accept_direct(sqe, fd, NULL, NULL, 0);
2064 else
2065 io_uring_prep_multishot_accept(sqe, fd, NULL, NULL, 0);
2066 __encode_userdata(sqe, 0, __ACCEPT, 0, fd);
2067
2068 return __event_loop(ring, NULL);
2069 }
2070
init_ring(struct io_uring * ring,int nr_files)2071 static int init_ring(struct io_uring *ring, int nr_files)
2072 {
2073 struct io_uring_params params;
2074 int ret;
2075
2076 /*
2077 * By default, set us up with a big CQ ring. Not strictly needed
2078 * here, but it's very important to never overflow the CQ ring.
2079 * Events will not be dropped if this happens, but it does slow
2080 * the application down in dealing with overflown events.
2081 *
2082 * Set SINGLE_ISSUER, which tells the kernel that only one thread
2083 * is doing IO submissions. This enables certain optimizations in
2084 * the kernel.
2085 */
2086 memset(¶ms, 0, sizeof(params));
2087 params.flags |= IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_CLAMP;
2088 params.flags |= IORING_SETUP_CQSIZE;
2089 params.cq_entries = 1024;
2090
2091 /*
2092 * If use_huge is set, setup the ring with IORING_SETUP_NO_MMAP. This
2093 * means that the application allocates the memory for the ring, and
2094 * the kernel maps it. The alternative is having the kernel allocate
2095 * the memory, and then liburing will mmap it. But we can't really
2096 * support huge pages that way. If this fails, then ensure that the
2097 * system has huge pages set aside upfront.
2098 */
2099 if (use_huge)
2100 params.flags |= IORING_SETUP_NO_MMAP;
2101
2102 /*
2103 * DEFER_TASKRUN decouples async event reaping and retrying from
2104 * regular system calls. If this isn't set, then io_uring uses
2105 * normal task_work for this. task_work is always being run on any
2106 * exit to userspace. Real applications do more than just call IO
2107 * related system calls, and hence we can be running this work way
2108 * too often. Using DEFER_TASKRUN defers any task_work running to
2109 * when the application enters the kernel anyway to wait on new
2110 * events. It's generally the preferred and recommended way to setup
2111 * a ring.
2112 */
2113 if (defer_tw) {
2114 params.flags |= IORING_SETUP_DEFER_TASKRUN;
2115 sqpoll = 0;
2116 }
2117
2118 /*
2119 * SQPOLL offloads any request submission and retry operations to a
2120 * dedicated thread. This enables an application to do IO without
2121 * ever having to enter the kernel itself. The SQPOLL thread will
2122 * stay busy as long as there's work to do, and go to sleep if
2123 * sq_thread_idle msecs have passed. If it's running, submitting new
2124 * IO just needs to make them visible to the SQPOLL thread, it needs
2125 * not enter the kernel. For submission, the application will only
2126 * enter the kernel if the SQPOLL has been idle long enough that it
2127 * has gone to sleep.
2128 *
2129 * Waiting on events still need to enter the kernel, if none are
2130 * available. The application may also use io_uring_peek_cqe() to
2131 * check for new events without entering the kernel, as completions
2132 * will be continually produced to the CQ ring by the SQPOLL thread
2133 * as they occur.
2134 */
2135 if (sqpoll) {
2136 params.flags |= IORING_SETUP_SQPOLL;
2137 params.sq_thread_idle = 1000;
2138 defer_tw = 0;
2139 }
2140
2141 /*
2142 * If neither DEFER_TASKRUN or SQPOLL is used, set COOP_TASKRUN. This
2143 * avoids heavy signal based notifications, which can force an
2144 * application to enter the kernel and process it as soon as they
2145 * occur.
2146 */
2147 if (!sqpoll && !defer_tw)
2148 params.flags |= IORING_SETUP_COOP_TASKRUN;
2149
2150 /*
2151 * The SQ ring size need not be larger than any batch of requests
2152 * that need to be prepared before submit. Normally in a loop we'd
2153 * only need a few, if any, particularly if multishot is used.
2154 */
2155 ret = io_uring_queue_init_params(ring_size, ring, ¶ms);
2156 if (ret) {
2157 fprintf(stderr, "%s\n", strerror(-ret));
2158 return 1;
2159 }
2160
2161 /*
2162 * If send serialization is available and no option was given to use
2163 * it or not, default it to on. If it was turned on and the kernel
2164 * doesn't support it, turn it off.
2165 */
2166 if (params.features & IORING_FEAT_SEND_BUF_SELECT) {
2167 if (send_ring == -1)
2168 send_ring = 1;
2169 } else {
2170 if (send_ring == 1) {
2171 fprintf(stderr, "Kernel doesn't support ring provided "
2172 "buffers for sends, disabled\n");
2173 }
2174 send_ring = 0;
2175 }
2176
2177 if (!send_ring && snd_bundle) {
2178 fprintf(stderr, "Can't use send bundle without send_ring\n");
2179 snd_bundle = 0;
2180 }
2181
2182 if (fixed_files) {
2183 /*
2184 * If fixed files are used, we need to allocate a fixed file
2185 * table upfront where new direct descriptors can be managed.
2186 */
2187 ret = io_uring_register_files_sparse(ring, nr_files);
2188 if (ret) {
2189 fprintf(stderr, "file register: %d\n", ret);
2190 return 1;
2191 }
2192
2193 /*
2194 * If fixed files are used, we also register the ring fd. See
2195 * comment near io_uring_prep_socket_direct_alloc() further
2196 * down. This avoids the fget/fput overhead associated with
2197 * the io_uring_enter(2) system call itself, which is used to
2198 * submit and wait on events.
2199 */
2200 ret = io_uring_register_ring_fd(ring);
2201 if (ret != 1) {
2202 fprintf(stderr, "ring register: %d\n", ret);
2203 return 1;
2204 }
2205 }
2206
2207 if (napi) {
2208 struct io_uring_napi n = {
2209 .prefer_busy_poll = napi > 1 ? 1 : 0,
2210 .busy_poll_to = napi_timeout,
2211 };
2212
2213 ret = io_uring_register_napi(ring, &n);
2214 if (ret) {
2215 fprintf(stderr, "io_uring_register_napi: %d\n", ret);
2216 if (ret != -EINVAL)
2217 return 1;
2218 fprintf(stderr, "NAPI not available, turned off\n");
2219 }
2220 }
2221
2222 return 0;
2223 }
2224
thread_main(void * data)2225 static void *thread_main(void *data)
2226 {
2227 struct conn *c = data;
2228 int ret;
2229
2230 c->flags |= CONN_F_STARTED;
2231
2232 /* we need a max of 4 descriptors for each client */
2233 ret = init_ring(&c->ring, 4);
2234 if (ret)
2235 goto done;
2236
2237 if (setup_buffer_rings(&c->ring, c))
2238 goto done;
2239
2240 /*
2241 * If we're using fixed files, then we need to wait for the parent
2242 * to install the c->in_fd into our direct descriptor table. When
2243 * that happens, we'll set things up. If we're not using fixed files,
2244 * we can set up the receive or connect now.
2245 */
2246 if (!fixed_files)
2247 open_socket(c);
2248
2249 /* we're ready */
2250 pthread_barrier_wait(&c->startup_barrier);
2251
2252 __event_loop(&c->ring, c);
2253 done:
2254 return NULL;
2255 }
2256
usage(const char * name)2257 static void usage(const char *name)
2258 {
2259 printf("%s:\n", name);
2260 printf("\t-m:\t\tUse multishot receive (%d)\n", recv_mshot);
2261 printf("\t-d:\t\tUse DEFER_TASKRUN (%d)\n", defer_tw);
2262 printf("\t-S:\t\tUse SQPOLL (%d)\n", sqpoll);
2263 printf("\t-f:\t\tUse only fixed files (%d)\n", fixed_files);
2264 printf("\t-a:\t\tUse huge pages for the ring (%d)\n", use_huge);
2265 printf("\t-t:\t\tTimeout for waiting on CQEs (usec) (%d)\n", wait_usec);
2266 printf("\t-w:\t\tNumber of CQEs to wait for each loop (%d)\n", wait_batch);
2267 printf("\t-B:\t\tUse bi-directional mode (%d)\n", bidi);
2268 printf("\t-s:\t\tAct only as a sink (%d)\n", is_sink);
2269 printf("\t-q:\t\tRing size to use (%d)\n", ring_size);
2270 printf("\t-H:\t\tHost to connect to (%s)\n", host);
2271 printf("\t-r:\t\tPort to receive on (%d)\n", receive_port);
2272 printf("\t-p:\t\tPort to connect to (%d)\n", send_port);
2273 printf("\t-6:\t\tUse IPv6 (%d)\n", ipv6);
2274 printf("\t-N:\t\tUse NAPI polling (%d)\n", napi);
2275 printf("\t-T:\t\tNAPI timeout (usec) (%d)\n", napi_timeout);
2276 printf("\t-b:\t\tSend/receive buf size (%d)\n", buf_size);
2277 printf("\t-n:\t\tNumber of provided buffers (pow2) (%d)\n", nr_bufs);
2278 printf("\t-u:\t\tUse provided buffers for send (%d)\n", send_ring);
2279 printf("\t-C:\t\tUse bundles for send (%d)\n", snd_bundle);
2280 printf("\t-z:\t\tUse zerocopy send (%d)\n", snd_zc);
2281 printf("\t-c:\t\tUse bundles for recv (%d)\n", snd_bundle);
2282 printf("\t-M:\t\tUse sendmsg (%d)\n", snd_msg);
2283 printf("\t-M:\t\tUse recvmsg (%d)\n", rcv_msg);
2284 printf("\t-x:\t\tShow extended stats (%d)\n", ext_stat);
2285 printf("\t-V:\t\tIncrease verbosity (%d)\n", verbose);
2286 }
2287
2288 /*
2289 * Options parsing the ring / net setup
2290 */
main(int argc,char * argv[])2291 int main(int argc, char *argv[])
2292 {
2293 struct io_uring ring;
2294 struct sigaction sa = { };
2295 const char *optstring;
2296 int opt, ret, fd;
2297
2298 setlocale(LC_NUMERIC, "en_US");
2299
2300 page_size = sysconf(_SC_PAGESIZE);
2301 if (page_size < 0) {
2302 perror("sysconf(_SC_PAGESIZE)");
2303 return 1;
2304 }
2305
2306 pthread_mutex_init(&thread_lock, NULL);
2307
2308 optstring = "m:d:S:s:b:f:H:r:p:n:B:N:T:w:t:M:R:u:c:C:q:a:x:z:6Vh?";
2309 while ((opt = getopt(argc, argv, optstring)) != -1) {
2310 switch (opt) {
2311 case 'm':
2312 recv_mshot = !!atoi(optarg);
2313 break;
2314 case 'S':
2315 sqpoll = !!atoi(optarg);
2316 break;
2317 case 'd':
2318 defer_tw = !!atoi(optarg);
2319 break;
2320 case 'b':
2321 buf_size = atoi(optarg);
2322 break;
2323 case 'n':
2324 nr_bufs = atoi(optarg);
2325 break;
2326 case 'u':
2327 send_ring = !!atoi(optarg);
2328 break;
2329 case 'c':
2330 rcv_bundle = !!atoi(optarg);
2331 break;
2332 case 'C':
2333 snd_bundle = !!atoi(optarg);
2334 break;
2335 case 'w':
2336 wait_batch = atoi(optarg);
2337 break;
2338 case 't':
2339 wait_usec = atoi(optarg);
2340 break;
2341 case 's':
2342 is_sink = !!atoi(optarg);
2343 break;
2344 case 'f':
2345 fixed_files = !!atoi(optarg);
2346 break;
2347 case 'H':
2348 host = strdup(optarg);
2349 break;
2350 case 'r':
2351 receive_port = atoi(optarg);
2352 break;
2353 case 'p':
2354 send_port = atoi(optarg);
2355 break;
2356 case 'B':
2357 bidi = !!atoi(optarg);
2358 break;
2359 case 'N':
2360 napi = !!atoi(optarg);
2361 break;
2362 case 'T':
2363 napi_timeout = atoi(optarg);
2364 break;
2365 case '6':
2366 ipv6 = true;
2367 break;
2368 case 'M':
2369 snd_msg = !!atoi(optarg);
2370 break;
2371 case 'z':
2372 snd_zc = !!atoi(optarg);
2373 break;
2374 case 'R':
2375 rcv_msg = !!atoi(optarg);
2376 break;
2377 case 'q':
2378 ring_size = atoi(optarg);
2379 break;
2380 case 'a':
2381 use_huge = !!atoi(optarg);
2382 break;
2383 case 'x':
2384 ext_stat = !!atoi(optarg);
2385 break;
2386 case 'V':
2387 verbose++;
2388 break;
2389 case 'h':
2390 default:
2391 usage(argv[0]);
2392 return 1;
2393 }
2394 }
2395
2396 if (bidi && is_sink) {
2397 fprintf(stderr, "Can't be both bidi proxy and sink\n");
2398 return 1;
2399 }
2400 if (snd_msg && sqpoll) {
2401 fprintf(stderr, "SQPOLL with msg variants disabled\n");
2402 snd_msg = 0;
2403 }
2404 if (rcv_msg && rcv_bundle) {
2405 fprintf(stderr, "Can't use bundles with recvmsg\n");
2406 rcv_msg = 0;
2407 }
2408 if (snd_msg && snd_bundle) {
2409 fprintf(stderr, "Can't use bundles with sendmsg\n");
2410 snd_msg = 0;
2411 }
2412 if (snd_msg && send_ring) {
2413 fprintf(stderr, "Can't use send ring sendmsg\n");
2414 snd_msg = 0;
2415 }
2416 if (snd_zc && (send_ring || snd_bundle)) {
2417 fprintf(stderr, "Can't use send zc with bundles or ring\n");
2418 send_ring = snd_bundle = 0;
2419 }
2420 /*
2421 * For recvmsg w/multishot, we waste some data at the head of the
2422 * packet every time. Adjust the buffer size to account for that,
2423 * so we're still handing 'buf_size' actual payload of data.
2424 */
2425 if (rcv_msg && recv_mshot) {
2426 fprintf(stderr, "Adjusted buf size for recvmsg w/multishot\n");
2427 buf_size += sizeof(struct io_uring_recvmsg_out);
2428 }
2429
2430 br_mask = nr_bufs - 1;
2431
2432 fd = setup_listening_socket(receive_port, ipv6);
2433 if (is_sink)
2434 send_port = -1;
2435
2436 if (fd == -1)
2437 return 1;
2438
2439 atexit(show_stats);
2440 sa.sa_handler = sig_int;
2441 sa.sa_flags = SA_RESTART;
2442 sigaction(SIGINT, &sa, NULL);
2443
2444 ret = init_ring(&ring, MAX_CONNS * 3);
2445 if (ret)
2446 return ret;
2447
2448 printf("Backend: sqpoll=%d, defer_tw=%d, fixed_files=%d, "
2449 "is_sink=%d, buf_size=%d, nr_bufs=%d, host=%s, send_port=%d, "
2450 "receive_port=%d, napi=%d, napi_timeout=%d, huge_page=%d\n",
2451 sqpoll, defer_tw, fixed_files, is_sink,
2452 buf_size, nr_bufs, host, send_port, receive_port,
2453 napi, napi_timeout, use_huge);
2454 printf(" recv options: recvmsg=%d, recv_mshot=%d, recv_bundle=%d\n",
2455 rcv_msg, recv_mshot, rcv_bundle);
2456 printf(" send options: sendmsg=%d, send_ring=%d, send_bundle=%d, "
2457 "send_zerocopy=%d\n", snd_msg, send_ring, snd_bundle,
2458 snd_zc);
2459
2460 return parent_loop(&ring, fd);
2461 }
2462