1 /* SPDX-License-Identifier: MIT */
2 /*
3 * Sample program that can act either as a packet sink, where it just receives
4 * packets and doesn't do anything with them, or it can act as a proxy where it
5 * receives packets and then sends them to a new destination. The proxy can
6 * be unidirectional (-B0), or bi-direction (-B1).
7 *
8 * Examples:
9 *
10 * Act as a proxy, listening on port 4444, and send data to 192.168.2.6 on port
11 * 4445. Use multishot receive, DEFER_TASKRUN, and fixed files
12 *
13 * ./proxy -m1 -r4444 -H 192.168.2.6 -p4445
14 *
15 * Same as above, but utilize send bundles (-C1, requires -u1 send_ring) as well
16 * with ring provided send buffers, and recv bundles (-c1).
17 *
18 * ./proxy -m1 -c1 -u1 -C1 -r4444 -H 192.168.2.6 -p4445
19 *
20 * Act as a bi-directional proxy, listening on port 8888, and send data back
21 * and forth between host and 192.168.2.6 on port 22. Use multishot receive,
22 * DEFER_TASKRUN, fixed files, and buffers of size 1500.
23 *
24 * ./proxy -m1 -B1 -b1500 -r8888 -H 192.168.2.6 -p22
25 *
26 * Act a sink, listening on port 4445, using multishot receive, DEFER_TASKRUN,
27 * and fixed files:
28 *
29 * ./proxy -m1 -s1 -r4445
30 *
31 * Run with -h to see a list of options, and their defaults.
32 *
33 * (C) 2024 Jens Axboe <axboe@kernel.dk>
34 *
35 */
36 #include <fcntl.h>
37 #include <stdint.h>
38 #include <netinet/in.h>
39 #include <netinet/tcp.h>
40 #include <arpa/inet.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #include <sys/socket.h>
45 #include <sys/time.h>
46 #include <unistd.h>
47 #include <sys/mman.h>
48 #include <linux/mman.h>
49 #include <locale.h>
50 #include <assert.h>
51 #include <pthread.h>
52 #include <liburing.h>
53
54 #include "proxy.h"
55 #include "helpers.h"
56
57 /*
58 * Will go away once/if bundles are upstreamed and we put the generic
59 * definitions in the kernel header.
60 */
61 #ifndef IORING_RECVSEND_BUNDLE
62 #define IORING_RECVSEND_BUNDLE (1U << 4)
63 #endif
64 #ifndef IORING_FEAT_SEND_BUF_SELECT
65 #define IORING_FEAT_SEND_BUF_SELECT (1U << 14)
66 #endif
67
68 static int cur_bgid = 1;
69 static int nr_conns;
70 static int open_conns;
71 static long page_size;
72
73 static unsigned long event_loops;
74 static unsigned long events;
75
76 static int recv_mshot = 1;
77 static int sqpoll;
78 static int defer_tw = 1;
79 static int is_sink;
80 static int fixed_files = 1;
81 static char *host = "192.168.3.2";
82 static int send_port = 4445;
83 static int receive_port = 4444;
84 static int buf_size = 32;
85 static int buf_ring_inc;
86 static int bidi;
87 static int ipv6;
88 static int napi;
89 static int napi_timeout;
90 static int wait_batch = 1;
91 static int wait_usec = 1000000;
92 static int rcv_msg;
93 static int snd_msg;
94 static int snd_zc;
95 static int send_ring = -1;
96 static int snd_bundle;
97 static int rcv_bundle;
98 static int use_huge;
99 static int ext_stat;
100 static int verbose;
101
102 static int nr_bufs = 256;
103 static int br_mask;
104
105 static int ring_size = 128;
106
107 static pthread_mutex_t thread_lock;
108 static struct timeval last_housekeeping;
109
110 /*
111 * For sendmsg/recvmsg. recvmsg just has a single vec, sendmsg will have
112 * two vecs - one that is currently submitted and being sent, and one that
113 * is being prepared. When a new sendmsg is issued, we'll swap which one we
114 * use. For send, even though we don't pass in the iovec itself, we use the
115 * vec to serialize the sends to avoid reordering.
116 */
117 struct msg_vec {
118 struct iovec *iov;
119 /* length of allocated vec */
120 int vec_size;
121 /* length currently being used */
122 int iov_len;
123 /* only for send, current index we're processing */
124 int cur_iov;
125 };
126
127 struct io_msg {
128 struct msghdr msg;
129 struct msg_vec vecs[2];
130 /* current msg_vec being prepared */
131 int vec_index;
132 };
133
134 /*
135 * Per socket stats per connection. For bi-directional, we'll have both
136 * sends and receives on each socket, this helps track them separately.
137 * For sink or one directional, each of the two stats will be only sends
138 * or receives, not both.
139 */
140 struct conn_dir {
141 int index;
142
143 int pending_shutdown;
144 int pending_send;
145 int pending_recv;
146
147 int snd_notif;
148
149 int out_buffers;
150
151 int rcv, rcv_shrt, rcv_enobufs, rcv_mshot;
152 int snd, snd_shrt, snd_enobufs, snd_busy, snd_mshot;
153
154 int snd_next_bid;
155 int rcv_next_bid;
156
157 int *rcv_bucket;
158 int *snd_bucket;
159
160 unsigned long in_bytes, out_bytes;
161
162 /* only ever have a single recv pending */
163 struct io_msg io_rcv_msg;
164
165 /* one send that is inflight, and one being prepared for the next one */
166 struct io_msg io_snd_msg;
167 };
168
169 enum {
170 CONN_F_STARTED = 1,
171 CONN_F_DISCONNECTING = 2,
172 CONN_F_DISCONNECTED = 4,
173 CONN_F_PENDING_SHUTDOWN = 8,
174 CONN_F_STATS_SHOWN = 16,
175 CONN_F_END_TIME = 32,
176 CONN_F_REAPED = 64,
177 };
178
179 /*
180 * buffer ring belonging to a connection
181 */
182 struct conn_buf_ring {
183 struct io_uring_buf_ring *br;
184 void *buf;
185 int bgid;
186 };
187
188 struct conn {
189 struct io_uring ring;
190
191 /* receive side buffer ring, new data arrives here */
192 struct conn_buf_ring in_br;
193 /* if send_ring is used, outgoing data to send */
194 struct conn_buf_ring out_br;
195
196 int tid;
197 int in_fd, out_fd;
198 int pending_cancels;
199 int flags;
200
201 struct conn_dir cd[2];
202
203 struct timeval start_time, end_time;
204
205 union {
206 struct sockaddr_in addr;
207 struct sockaddr_in6 addr6;
208 };
209
210 pthread_t thread;
211 pthread_barrier_t startup_barrier;
212 };
213
214 #define MAX_CONNS 1024
215 static struct conn conns[MAX_CONNS];
216
217 #define vlog(str, ...) do { \
218 if (verbose) \
219 printf(str, ##__VA_ARGS__); \
220 } while (0)
221
222 static int prep_next_send(struct io_uring *ring, struct conn *c,
223 struct conn_dir *cd, int fd);
224 static void *thread_main(void *data);
225
cqe_to_conn(struct io_uring_cqe * cqe)226 static struct conn *cqe_to_conn(struct io_uring_cqe *cqe)
227 {
228 struct userdata ud = { .val = cqe->user_data };
229
230 return &conns[ud.op_tid & TID_MASK];
231 }
232
cqe_to_conn_dir(struct conn * c,struct io_uring_cqe * cqe)233 static struct conn_dir *cqe_to_conn_dir(struct conn *c,
234 struct io_uring_cqe *cqe)
235 {
236 int fd = cqe_to_fd(cqe);
237
238 return &c->cd[fd != c->in_fd];
239 }
240
other_dir_fd(struct conn * c,int fd)241 static int other_dir_fd(struct conn *c, int fd)
242 {
243 if (c->in_fd == fd)
244 return c->out_fd;
245 return c->in_fd;
246 }
247
248 /* currently active msg_vec */
msg_vec(struct io_msg * imsg)249 static struct msg_vec *msg_vec(struct io_msg *imsg)
250 {
251 return &imsg->vecs[imsg->vec_index];
252 }
253
snd_msg_vec(struct conn_dir * cd)254 static struct msg_vec *snd_msg_vec(struct conn_dir *cd)
255 {
256 return msg_vec(&cd->io_snd_msg);
257 }
258
259 /*
260 * Goes from accept new connection -> create socket, connect to end
261 * point, prepare recv, on receive do send (unless sink). If either ends
262 * disconnects, we transition to shutdown and then close.
263 */
264 enum {
265 __ACCEPT = 1,
266 __SOCK = 2,
267 __CONNECT = 3,
268 __RECV = 4,
269 __RECVMSG = 5,
270 __SEND = 6,
271 __SENDMSG = 7,
272 __SHUTDOWN = 8,
273 __CANCEL = 9,
274 __CLOSE = 10,
275 __FD_PASS = 11,
276 __NOP = 12,
277 __STOP = 13,
278 };
279
280 struct error_handler {
281 const char *name;
282 int (*error_fn)(struct error_handler *, struct io_uring *, struct io_uring_cqe *);
283 };
284
285 static int recv_error(struct error_handler *err, struct io_uring *ring,
286 struct io_uring_cqe *cqe);
287 static int send_error(struct error_handler *err, struct io_uring *ring,
288 struct io_uring_cqe *cqe);
289
default_error(struct error_handler * err,struct io_uring * ring,struct io_uring_cqe * cqe)290 static int default_error(struct error_handler *err,
291 struct io_uring __attribute__((__unused__)) *ring,
292 struct io_uring_cqe *cqe)
293 {
294 struct conn *c = cqe_to_conn(cqe);
295
296 fprintf(stderr, "%d: %s error %s\n", c->tid, err->name, strerror(-cqe->res));
297 fprintf(stderr, "fd=%d, bid=%d\n", cqe_to_fd(cqe), cqe_to_bid(cqe));
298 return 1;
299 }
300
301 /*
302 * Move error handling out of the normal handling path, cleanly separating
303 * them. If an opcode doesn't need any error handling, set it to NULL. If
304 * it wants to stop the connection at that point and not do anything else,
305 * then the default handler can be used. Only receive has proper error
306 * handling, as we can get -ENOBUFS which is not a fatal condition. It just
307 * means we need to wait on buffer replenishing before re-arming the receive.
308 */
309 static struct error_handler error_handlers[] = {
310 { .name = "NULL", .error_fn = NULL, },
311 { .name = "ACCEPT", .error_fn = default_error, },
312 { .name = "SOCK", .error_fn = default_error, },
313 { .name = "CONNECT", .error_fn = default_error, },
314 { .name = "RECV", .error_fn = recv_error, },
315 { .name = "RECVMSG", .error_fn = recv_error, },
316 { .name = "SEND", .error_fn = send_error, },
317 { .name = "SENDMSG", .error_fn = send_error, },
318 { .name = "SHUTDOWN", .error_fn = NULL, },
319 { .name = "CANCEL", .error_fn = NULL, },
320 { .name = "CLOSE", .error_fn = NULL, },
321 { .name = "FD_PASS", .error_fn = default_error, },
322 { .name = "NOP", .error_fn = NULL, },
323 { .name = "STOP", .error_fn = default_error, },
324 };
325
free_buffer_ring(struct io_uring * ring,struct conn_buf_ring * cbr)326 static void free_buffer_ring(struct io_uring *ring, struct conn_buf_ring *cbr)
327 {
328 if (!cbr->br)
329 return;
330
331 io_uring_free_buf_ring(ring, cbr->br, nr_bufs, cbr->bgid);
332 cbr->br = NULL;
333 if (use_huge)
334 munmap(cbr->buf, buf_size * nr_bufs);
335 else
336 free(cbr->buf);
337 }
338
free_buffer_rings(struct io_uring * ring,struct conn * c)339 static void free_buffer_rings(struct io_uring *ring, struct conn *c)
340 {
341 free_buffer_ring(ring, &c->in_br);
342 free_buffer_ring(ring, &c->out_br);
343 }
344
345 /*
346 * Setup a ring provided buffer ring for each connection. If we get -ENOBUFS
347 * on receive, for multishot receive we'll wait for half the provided buffers
348 * to be returned by pending sends, then re-arm the multishot receive. If
349 * this happens too frequently (see enobufs= stat), then the ring size is
350 * likely too small. Use -nXX to make it bigger. See recv_enobufs().
351 *
352 * The alternative here would be to use the older style provided buffers,
353 * where you simply setup a buffer group and use SQEs with
354 * io_urign_prep_provide_buffers() to add to the pool. But that approach is
355 * slower and has been deprecated by using the faster ring provided buffers.
356 */
setup_recv_ring(struct io_uring * ring,struct conn * c)357 static int setup_recv_ring(struct io_uring *ring, struct conn *c)
358 {
359 struct conn_buf_ring *cbr = &c->in_br;
360 int br_flags = 0;
361 int ret, i;
362 size_t len;
363 void *ptr;
364
365 len = buf_size * nr_bufs;
366 if (use_huge) {
367 cbr->buf = mmap(NULL, len, PROT_READ|PROT_WRITE,
368 MAP_PRIVATE|MAP_HUGETLB|MAP_HUGE_2MB|MAP_ANONYMOUS,
369 -1, 0);
370 if (cbr->buf == MAP_FAILED) {
371 perror("mmap");
372 return 1;
373 }
374 } else {
375 if (posix_memalign(&cbr->buf, page_size, len)) {
376 perror("posix memalign");
377 return 1;
378 }
379 }
380 if (buf_ring_inc)
381 br_flags = IOU_PBUF_RING_INC;
382 cbr->br = io_uring_setup_buf_ring(ring, nr_bufs, cbr->bgid, br_flags, &ret);
383 if (!cbr->br) {
384 fprintf(stderr, "Buffer ring register failed %d\n", ret);
385 return 1;
386 }
387
388 ptr = cbr->buf;
389 for (i = 0; i < nr_bufs; i++) {
390 vlog("%d: add bid %d, data %p\n", c->tid, i, ptr);
391 io_uring_buf_ring_add(cbr->br, ptr, buf_size, i, br_mask, i);
392 ptr += buf_size;
393 }
394 io_uring_buf_ring_advance(cbr->br, nr_bufs);
395 printf("%d: recv buffer ring bgid %d, bufs %d\n", c->tid, cbr->bgid, nr_bufs);
396 return 0;
397 }
398
399 /*
400 * If 'send_ring' is used and the kernel supports it, we can skip serializing
401 * sends as the data will be ordered regardless. This reduces the send handling
402 * complexity, as buffers can always be added to the outgoing ring and will be
403 * processed in the order in which they were added.
404 */
setup_send_ring(struct io_uring * ring,struct conn * c)405 static int setup_send_ring(struct io_uring *ring, struct conn *c)
406 {
407 struct conn_buf_ring *cbr = &c->out_br;
408 int br_flags = 0;
409 int ret;
410
411 if (buf_ring_inc)
412 br_flags = IOU_PBUF_RING_INC;
413 cbr->br = io_uring_setup_buf_ring(ring, nr_bufs, cbr->bgid, br_flags, &ret);
414 if (!cbr->br) {
415 fprintf(stderr, "Buffer ring register failed %d\n", ret);
416 return 1;
417 }
418
419 printf("%d: send buffer ring bgid %d, bufs %d\n", c->tid, cbr->bgid, nr_bufs);
420 return 0;
421 }
422
setup_send_zc(struct io_uring * ring,struct conn * c)423 static int setup_send_zc(struct io_uring *ring, struct conn *c)
424 {
425 struct iovec *iovs;
426 void *buf;
427 int i, ret;
428
429 if (snd_msg)
430 return 0;
431
432 buf = c->in_br.buf;
433 iovs = calloc(nr_bufs, sizeof(struct iovec));
434 for (i = 0; i < nr_bufs; i++) {
435 iovs[i].iov_base = buf;
436 iovs[i].iov_len = buf_size;
437 buf += buf_size;
438 }
439
440 ret = io_uring_register_buffers(ring, iovs, nr_bufs);
441 if (ret) {
442 fprintf(stderr, "failed registering buffers: %d\n", ret);
443 free(iovs);
444 return ret;
445 }
446 free(iovs);
447 return 0;
448 }
449
450 /*
451 * Setup an input and output buffer ring.
452 */
setup_buffer_rings(struct io_uring * ring,struct conn * c)453 static int setup_buffer_rings(struct io_uring *ring, struct conn *c)
454 {
455 int ret;
456
457 /* no locking needed on cur_bgid, parent serializes setup */
458 c->in_br.bgid = cur_bgid++;
459 c->out_br.bgid = cur_bgid++;
460 c->out_br.br = NULL;
461
462 ret = setup_recv_ring(ring, c);
463 if (ret)
464 return ret;
465 if (is_sink)
466 return 0;
467 if (snd_zc) {
468 ret = setup_send_zc(ring, c);
469 if (ret)
470 return ret;
471 }
472 if (send_ring) {
473 ret = setup_send_ring(ring, c);
474 if (ret) {
475 free_buffer_ring(ring, &c->in_br);
476 return ret;
477 }
478 }
479
480 return 0;
481 }
482
483 struct bucket_stat {
484 int nr_packets;
485 int count;
486 };
487
stat_cmp(const void * p1,const void * p2)488 static int stat_cmp(const void *p1, const void *p2)
489 {
490 const struct bucket_stat *b1 = p1;
491 const struct bucket_stat *b2 = p2;
492
493 if (b1->count < b2->count)
494 return 1;
495 else if (b1->count > b2->count)
496 return -1;
497 return 0;
498 }
499
show_buckets(struct conn_dir * cd)500 static void show_buckets(struct conn_dir *cd)
501 {
502 unsigned long snd_total, rcv_total;
503 struct bucket_stat *rstat, *sstat;
504 int i;
505
506 if (!cd->rcv_bucket || !cd->snd_bucket)
507 return;
508
509 rstat = calloc(nr_bufs + 1, sizeof(struct bucket_stat));
510 sstat = calloc(nr_bufs + 1, sizeof(struct bucket_stat));
511
512 snd_total = rcv_total = 0;
513 for (i = 0; i <= nr_bufs; i++) {
514 snd_total += cd->snd_bucket[i];
515 sstat[i].nr_packets = i;
516 sstat[i].count = cd->snd_bucket[i];
517 rcv_total += cd->rcv_bucket[i];
518 rstat[i].nr_packets = i;
519 rstat[i].count = cd->rcv_bucket[i];
520 }
521
522 if (!snd_total && !rcv_total) {
523 free(sstat);
524 free(rstat);
525 }
526 if (snd_total)
527 qsort(sstat, nr_bufs, sizeof(struct bucket_stat), stat_cmp);
528 if (rcv_total)
529 qsort(rstat, nr_bufs, sizeof(struct bucket_stat), stat_cmp);
530
531 printf("\t Packets per recv/send:\n");
532 for (i = 0; i <= nr_bufs; i++) {
533 double snd_prc = 0.0, rcv_prc = 0.0;
534 if (!rstat[i].count && !sstat[i].count)
535 continue;
536 if (rstat[i].count)
537 rcv_prc = 100.0 * (rstat[i].count / (double) rcv_total);
538 if (sstat[i].count)
539 snd_prc = 100.0 * (sstat[i].count / (double) snd_total);
540 printf("\t bucket(%3d/%3d): rcv=%u (%.2f%%) snd=%u (%.2f%%)\n",
541 rstat[i].nr_packets, sstat[i].nr_packets,
542 rstat[i].count, rcv_prc,
543 sstat[i].count, snd_prc);
544 }
545
546 free(sstat);
547 free(rstat);
548 }
549
__show_stats(struct conn * c)550 static void __show_stats(struct conn *c)
551 {
552 unsigned long msec, qps;
553 unsigned long bytes, bw;
554 struct conn_dir *cd;
555 int i;
556
557 if (c->flags & (CONN_F_STATS_SHOWN | CONN_F_REAPED))
558 return;
559 if (!(c->flags & CONN_F_STARTED))
560 return;
561
562 if (!(c->flags & CONN_F_END_TIME))
563 gettimeofday(&c->end_time, NULL);
564
565 msec = (c->end_time.tv_sec - c->start_time.tv_sec) * 1000;
566 msec += (c->end_time.tv_usec - c->start_time.tv_usec) / 1000;
567
568 qps = 0;
569 for (i = 0; i < 2; i++)
570 qps += c->cd[i].rcv + c->cd[i].snd;
571
572 if (!qps)
573 return;
574
575 if (msec)
576 qps = (qps * 1000) / msec;
577
578 printf("Conn %d/(in_fd=%d, out_fd=%d): qps=%lu, msec=%lu\n", c->tid,
579 c->in_fd, c->out_fd, qps, msec);
580
581 bytes = 0;
582 for (i = 0; i < 2; i++) {
583 cd = &c->cd[i];
584
585 if (!cd->in_bytes && !cd->out_bytes && !cd->snd && !cd->rcv)
586 continue;
587
588 bytes += cd->in_bytes;
589 bytes += cd->out_bytes;
590
591 printf("\t%3d: rcv=%u (short=%u, enobufs=%d), snd=%u (short=%u,"
592 " busy=%u, enobufs=%d)\n", i, cd->rcv, cd->rcv_shrt,
593 cd->rcv_enobufs, cd->snd, cd->snd_shrt, cd->snd_busy,
594 cd->snd_enobufs);
595 printf("\t : in_bytes=%lu (Kb %lu), out_bytes=%lu (Kb %lu)\n",
596 cd->in_bytes, cd->in_bytes >> 10,
597 cd->out_bytes, cd->out_bytes >> 10);
598 printf("\t : mshot_rcv=%d, mshot_snd=%d\n", cd->rcv_mshot,
599 cd->snd_mshot);
600 show_buckets(cd);
601
602 }
603 if (msec) {
604 bytes *= 8UL;
605 bw = bytes / 1000;
606 bw /= msec;
607 printf("\tBW=%'luMbit\n", bw);
608 }
609
610 c->flags |= CONN_F_STATS_SHOWN;
611 }
612
show_stats(void)613 static void show_stats(void)
614 {
615 float events_per_loop = 0.0;
616 static int stats_shown;
617 int i;
618
619 if (stats_shown)
620 return;
621
622 if (events)
623 events_per_loop = (float) events / (float) event_loops;
624
625 printf("Event loops: %lu, events %lu, events per loop %.2f\n", event_loops,
626 events, events_per_loop);
627
628 for (i = 0; i < MAX_CONNS; i++) {
629 struct conn *c = &conns[i];
630
631 __show_stats(c);
632 }
633 stats_shown = 1;
634 }
635
sig_int(int sig)636 static void sig_int(int __attribute__((__unused__)) sig)
637 {
638 printf("\n");
639 show_stats();
640 exit(1);
641 }
642
643 /*
644 * Special cased for SQPOLL only, as we don't control when SQEs are consumed if
645 * that is used. Hence we may need to wait for the SQPOLL thread to keep up
646 * until we can get a new SQE. All other cases will break immediately, with a
647 * fresh SQE.
648 *
649 * If we grossly undersized our SQ ring, getting a NULL sqe can happen even
650 * for the !SQPOLL case if we're handling a lot of CQEs in our event loop
651 * and multishot isn't used. We can do io_uring_submit() to flush what we
652 * have here. Only caveat here is that if linked requests are used, SQEs
653 * would need to be allocated upfront as a link chain is only valid within
654 * a single submission cycle.
655 */
get_sqe(struct io_uring * ring)656 static struct io_uring_sqe *get_sqe(struct io_uring *ring)
657 {
658 struct io_uring_sqe *sqe;
659
660 do {
661 sqe = io_uring_get_sqe(ring);
662 if (sqe)
663 break;
664 if (!sqpoll)
665 io_uring_submit(ring);
666 else
667 io_uring_sqring_wait(ring);
668 } while (1);
669
670 return sqe;
671 }
672
673 /*
674 * See __encode_userdata() for how we encode sqe->user_data, which is passed
675 * back as cqe->user_data at completion time.
676 */
encode_userdata(struct io_uring_sqe * sqe,struct conn * c,int op,int bid,int fd)677 static void encode_userdata(struct io_uring_sqe *sqe, struct conn *c, int op,
678 int bid, int fd)
679 {
680 __encode_userdata(sqe, c->tid, op, bid, fd);
681 }
682
__submit_receive(struct io_uring * ring,struct conn * c,struct conn_dir * cd,int fd)683 static void __submit_receive(struct io_uring *ring, struct conn *c,
684 struct conn_dir *cd, int fd)
685 {
686 struct conn_buf_ring *cbr = &c->in_br;
687 struct io_uring_sqe *sqe;
688
689 vlog("%d: submit receive fd=%d\n", c->tid, fd);
690
691 assert(!cd->pending_recv);
692 cd->pending_recv = 1;
693
694 /*
695 * For both recv and multishot receive, we use the ring provided
696 * buffers. These are handed to the application ahead of time, and
697 * are consumed when a receive triggers. Note that the address and
698 * length of the receive are set to NULL/0, and we assign the
699 * sqe->buf_group to tell the kernel which buffer group ID to pick
700 * a buffer from. Finally, IOSQE_BUFFER_SELECT is set to tell the
701 * kernel that we want a buffer picked for this request, we are not
702 * passing one in with the request.
703 */
704 sqe = get_sqe(ring);
705 if (rcv_msg) {
706 struct io_msg *imsg = &cd->io_rcv_msg;
707 struct msghdr *msg = &imsg->msg;
708
709 memset(msg, 0, sizeof(*msg));
710 msg->msg_iov = msg_vec(imsg)->iov;
711 msg->msg_iovlen = msg_vec(imsg)->iov_len;
712
713 if (recv_mshot) {
714 cd->rcv_mshot++;
715 io_uring_prep_recvmsg_multishot(sqe, fd, &imsg->msg, 0);
716 } else {
717 io_uring_prep_recvmsg(sqe, fd, &imsg->msg, 0);
718 }
719 } else {
720 if (recv_mshot) {
721 cd->rcv_mshot++;
722 io_uring_prep_recv_multishot(sqe, fd, NULL, 0, 0);
723 } else {
724 io_uring_prep_recv(sqe, fd, NULL, 0, 0);
725 }
726 }
727 encode_userdata(sqe, c, __RECV, 0, fd);
728 sqe->buf_group = cbr->bgid;
729 sqe->flags |= IOSQE_BUFFER_SELECT;
730 if (fixed_files)
731 sqe->flags |= IOSQE_FIXED_FILE;
732 if (rcv_bundle)
733 sqe->ioprio |= IORING_RECVSEND_BUNDLE;
734 }
735
736 /*
737 * One directional just arms receive on our in_fd
738 */
submit_receive(struct io_uring * ring,struct conn * c)739 static void submit_receive(struct io_uring *ring, struct conn *c)
740 {
741 __submit_receive(ring, c, &c->cd[0], c->in_fd);
742 }
743
744 /*
745 * Bi-directional arms receive on both in and out fd
746 */
submit_bidi_receive(struct io_uring * ring,struct conn * c)747 static void submit_bidi_receive(struct io_uring *ring, struct conn *c)
748 {
749 __submit_receive(ring, c, &c->cd[0], c->in_fd);
750 __submit_receive(ring, c, &c->cd[1], c->out_fd);
751 }
752
753 /*
754 * We hit -ENOBUFS, which means that we ran out of buffers in our current
755 * provided buffer group. This can happen if there's an imbalance between the
756 * receives coming in and the sends being processed, particularly with multishot
757 * receive as they can trigger very quickly. If this happens, defer arming a
758 * new receive until we've replenished half of the buffer pool by processing
759 * pending sends.
760 */
recv_enobufs(struct io_uring * ring,struct conn * c,struct conn_dir * cd,int fd)761 static void recv_enobufs(struct io_uring *ring, struct conn *c,
762 struct conn_dir *cd, int fd)
763 {
764 vlog("%d: enobufs hit\n", c->tid);
765
766 cd->rcv_enobufs++;
767
768 /*
769 * If we're a sink, mark rcv as rearm. If we're not, then mark us as
770 * needing a rearm for receive and send. The completing send will
771 * kick the recv rearm.
772 */
773 if (!is_sink) {
774 int do_recv_arm = 1;
775
776 if (!cd->pending_send)
777 do_recv_arm = !prep_next_send(ring, c, cd, fd);
778 if (do_recv_arm)
779 __submit_receive(ring, c, &c->cd[0], c->in_fd);
780 } else {
781 __submit_receive(ring, c, &c->cd[0], c->in_fd);
782 }
783 }
784
785 /*
786 * Kill this socket - submit a shutdown and link a close to it. We don't
787 * care about shutdown status, so mark it as not needing to post a CQE unless
788 * it fails.
789 */
queue_shutdown_close(struct io_uring * ring,struct conn * c,int fd)790 static void queue_shutdown_close(struct io_uring *ring, struct conn *c, int fd)
791 {
792 struct io_uring_sqe *sqe1, *sqe2;
793
794 /*
795 * On the off chance that we run out of SQEs after the first one,
796 * grab two upfront. This it to prevent our link not working if
797 * get_sqe() ends up doing submissions to free up an SQE, as links
798 * are not valid across separate submissions.
799 */
800 sqe1 = get_sqe(ring);
801 sqe2 = get_sqe(ring);
802
803 io_uring_prep_shutdown(sqe1, fd, SHUT_RDWR);
804 if (fixed_files)
805 sqe1->flags |= IOSQE_FIXED_FILE;
806 sqe1->flags |= IOSQE_IO_LINK | IOSQE_CQE_SKIP_SUCCESS;
807 encode_userdata(sqe1, c, __SHUTDOWN, 0, fd);
808
809 if (fixed_files)
810 io_uring_prep_close_direct(sqe2, fd);
811 else
812 io_uring_prep_close(sqe2, fd);
813 encode_userdata(sqe2, c, __CLOSE, 0, fd);
814 }
815
816 /*
817 * This connection is going away, queue a cancel for any pending recv, for
818 * example, we have pending for this ring. For completeness, we issue a cancel
819 * for any request we have pending for both in_fd and out_fd.
820 */
queue_cancel(struct io_uring * ring,struct conn * c)821 static void queue_cancel(struct io_uring *ring, struct conn *c)
822 {
823 struct io_uring_sqe *sqe;
824 int flags = 0;
825
826 if (fixed_files)
827 flags |= IORING_ASYNC_CANCEL_FD_FIXED;
828
829 sqe = get_sqe(ring);
830 io_uring_prep_cancel_fd(sqe, c->in_fd, flags);
831 encode_userdata(sqe, c, __CANCEL, 0, c->in_fd);
832 c->pending_cancels++;
833
834 if (c->out_fd != -1) {
835 sqe = get_sqe(ring);
836 io_uring_prep_cancel_fd(sqe, c->out_fd, flags);
837 encode_userdata(sqe, c, __CANCEL, 0, c->out_fd);
838 c->pending_cancels++;
839 }
840
841 io_uring_submit(ring);
842 }
843
pending_shutdown(struct conn * c)844 static int pending_shutdown(struct conn *c)
845 {
846 return c->cd[0].pending_shutdown + c->cd[1].pending_shutdown;
847 }
848
should_shutdown(struct conn * c)849 static bool should_shutdown(struct conn *c)
850 {
851 int i;
852
853 if (!pending_shutdown(c))
854 return false;
855 if (is_sink)
856 return true;
857 if (!bidi)
858 return c->cd[0].in_bytes == c->cd[1].out_bytes;
859
860 for (i = 0; i < 2; i++) {
861 if (c->cd[0].rcv != c->cd[1].snd)
862 return false;
863 if (c->cd[1].rcv != c->cd[0].snd)
864 return false;
865 }
866
867 return true;
868 }
869
870 /*
871 * Close this connection - send a ring message to the connection with intent
872 * to stop. When the client gets the message, it will initiate the stop.
873 */
__close_conn(struct io_uring * ring,struct conn * c)874 static void __close_conn(struct io_uring *ring, struct conn *c)
875 {
876 struct io_uring_sqe *sqe;
877 uint64_t user_data;
878
879 printf("Client %d: queueing stop\n", c->tid);
880
881 user_data = __raw_encode(c->tid, __STOP, 0, 0);
882 sqe = io_uring_get_sqe(ring);
883 io_uring_prep_msg_ring(sqe, c->ring.ring_fd, 0, user_data, 0);
884 encode_userdata(sqe, c, __NOP, 0, 0);
885 io_uring_submit(ring);
886 }
887
close_cd(struct conn * c,struct conn_dir * cd)888 static void close_cd(struct conn *c, struct conn_dir *cd)
889 {
890 cd->pending_shutdown = 1;
891
892 if (cd->pending_send)
893 return;
894
895 if (!(c->flags & CONN_F_PENDING_SHUTDOWN)) {
896 gettimeofday(&c->end_time, NULL);
897 c->flags |= CONN_F_PENDING_SHUTDOWN | CONN_F_END_TIME;
898 }
899 }
900
901 /*
902 * We're done with this buffer, add it back to our pool so the kernel is
903 * free to use it again.
904 */
replenish_buffer(struct conn_buf_ring * cbr,int bid,int offset)905 static int replenish_buffer(struct conn_buf_ring *cbr, int bid, int offset)
906 {
907 void *this_buf = cbr->buf + bid * buf_size;
908
909 assert(bid < nr_bufs);
910
911 io_uring_buf_ring_add(cbr->br, this_buf, buf_size, bid, br_mask, offset);
912 return buf_size;
913 }
914
915 /*
916 * Iterate buffers from '*bid' and with a total size of 'bytes' and add them
917 * back to our receive ring so they can be reused for new receives.
918 */
replenish_buffers(struct conn * c,int * bid,int bytes)919 static int replenish_buffers(struct conn *c, int *bid, int bytes)
920 {
921 struct conn_buf_ring *cbr = &c->in_br;
922 int nr_packets = 0;
923
924 while (bytes) {
925 int this_len = replenish_buffer(cbr, *bid, nr_packets);
926
927 if (this_len > bytes)
928 this_len = bytes;
929 bytes -= this_len;
930
931 *bid = (*bid + 1) & (nr_bufs - 1);
932 nr_packets++;
933 }
934
935 io_uring_buf_ring_advance(cbr->br, nr_packets);
936 return nr_packets;
937 }
938
free_mvec(struct msg_vec * mvec)939 static void free_mvec(struct msg_vec *mvec)
940 {
941 free(mvec->iov);
942 mvec->iov = NULL;
943 }
944
init_mvec(struct msg_vec * mvec)945 static void init_mvec(struct msg_vec *mvec)
946 {
947 memset(mvec, 0, sizeof(*mvec));
948 mvec->iov = malloc(sizeof(struct iovec));
949 mvec->vec_size = 1;
950 }
951
init_msgs(struct conn_dir * cd)952 static void init_msgs(struct conn_dir *cd)
953 {
954 memset(&cd->io_snd_msg, 0, sizeof(cd->io_snd_msg));
955 memset(&cd->io_rcv_msg, 0, sizeof(cd->io_rcv_msg));
956 init_mvec(&cd->io_snd_msg.vecs[0]);
957 init_mvec(&cd->io_snd_msg.vecs[1]);
958 init_mvec(&cd->io_rcv_msg.vecs[0]);
959 }
960
free_msgs(struct conn_dir * cd)961 static void free_msgs(struct conn_dir *cd)
962 {
963 free_mvec(&cd->io_snd_msg.vecs[0]);
964 free_mvec(&cd->io_snd_msg.vecs[1]);
965 free_mvec(&cd->io_rcv_msg.vecs[0]);
966 }
967
968 /*
969 * Multishot accept completion triggered. If we're acting as a sink, we're
970 * good to go. Just issue a receive for that case. If we're acting as a proxy,
971 * then start opening a socket that we can use to connect to the other end.
972 */
handle_accept(struct io_uring * ring,struct io_uring_cqe * cqe)973 static int handle_accept(struct io_uring *ring, struct io_uring_cqe *cqe)
974 {
975 struct conn *c;
976 int i;
977
978 if (nr_conns == MAX_CONNS) {
979 fprintf(stderr, "max clients reached %d\n", nr_conns);
980 return 1;
981 }
982
983 /* main thread handles this, which is obviously serialized */
984 c = &conns[nr_conns];
985 c->tid = nr_conns++;
986 c->in_fd = -1;
987 c->out_fd = -1;
988
989 for (i = 0; i < 2; i++) {
990 struct conn_dir *cd = &c->cd[i];
991
992 cd->index = i;
993 cd->snd_next_bid = -1;
994 cd->rcv_next_bid = -1;
995 if (ext_stat) {
996 cd->rcv_bucket = calloc(nr_bufs + 1, sizeof(int));
997 cd->snd_bucket = calloc(nr_bufs + 1, sizeof(int));
998 }
999 init_msgs(cd);
1000 }
1001
1002 printf("New client: id=%d, in=%d\n", c->tid, c->in_fd);
1003 gettimeofday(&c->start_time, NULL);
1004
1005 pthread_barrier_init(&c->startup_barrier, NULL, 2);
1006 pthread_create(&c->thread, NULL, thread_main, c);
1007
1008 /*
1009 * Wait for thread to have its ring setup, then either assign the fd
1010 * if it's non-fixed, or pass the fixed one
1011 */
1012 pthread_barrier_wait(&c->startup_barrier);
1013 if (!fixed_files) {
1014 c->in_fd = cqe->res;
1015 } else {
1016 struct io_uring_sqe *sqe;
1017 uint64_t user_data;
1018
1019 /*
1020 * Ring has just been setup, we'll use index 0 as the descriptor
1021 * value.
1022 */
1023 user_data = __raw_encode(c->tid, __FD_PASS, 0, 0);
1024 sqe = io_uring_get_sqe(ring);
1025 io_uring_prep_msg_ring_fd(sqe, c->ring.ring_fd, cqe->res, 0,
1026 user_data, 0);
1027 encode_userdata(sqe, c, __NOP, 0, cqe->res);
1028 }
1029
1030 return 0;
1031 }
1032
1033 /*
1034 * Our socket request completed, issue a connect request to the other end.
1035 */
handle_sock(struct io_uring * ring,struct io_uring_cqe * cqe)1036 static int handle_sock(struct io_uring *ring, struct io_uring_cqe *cqe)
1037 {
1038 struct conn *c = cqe_to_conn(cqe);
1039 struct io_uring_sqe *sqe;
1040 int ret;
1041
1042 vlog("%d: sock: res=%d\n", c->tid, cqe->res);
1043
1044 c->out_fd = cqe->res;
1045
1046 if (ipv6) {
1047 memset(&c->addr6, 0, sizeof(c->addr6));
1048 c->addr6.sin6_family = AF_INET6;
1049 c->addr6.sin6_port = htons(send_port);
1050 ret = inet_pton(AF_INET6, host, &c->addr6.sin6_addr);
1051 } else {
1052 memset(&c->addr, 0, sizeof(c->addr));
1053 c->addr.sin_family = AF_INET;
1054 c->addr.sin_port = htons(send_port);
1055 ret = inet_pton(AF_INET, host, &c->addr.sin_addr);
1056 }
1057 if (ret <= 0) {
1058 if (!ret)
1059 fprintf(stderr, "host not in right format\n");
1060 else
1061 perror("inet_pton");
1062 return 1;
1063 }
1064
1065 sqe = get_sqe(ring);
1066 if (ipv6) {
1067 io_uring_prep_connect(sqe, c->out_fd,
1068 (struct sockaddr *) &c->addr6,
1069 sizeof(c->addr6));
1070 } else {
1071 io_uring_prep_connect(sqe, c->out_fd,
1072 (struct sockaddr *) &c->addr,
1073 sizeof(c->addr));
1074 }
1075 encode_userdata(sqe, c, __CONNECT, 0, c->out_fd);
1076 if (fixed_files)
1077 sqe->flags |= IOSQE_FIXED_FILE;
1078 return 0;
1079 }
1080
1081 /*
1082 * Connection to the other end is done, submit a receive to start receiving
1083 * data. If we're a bidirectional proxy, issue a receive on both ends. If not,
1084 * then just a single recv will do.
1085 */
handle_connect(struct io_uring * ring,struct io_uring_cqe * cqe)1086 static int handle_connect(struct io_uring *ring, struct io_uring_cqe *cqe)
1087 {
1088 struct conn *c = cqe_to_conn(cqe);
1089
1090 pthread_mutex_lock(&thread_lock);
1091 open_conns++;
1092 pthread_mutex_unlock(&thread_lock);
1093
1094 if (bidi)
1095 submit_bidi_receive(ring, c);
1096 else
1097 submit_receive(ring, c);
1098
1099 return 0;
1100 }
1101
1102 /*
1103 * Append new segment to our currently active msg_vec. This will be submitted
1104 * as a sendmsg (with all of it), or as separate sends, later. If we're using
1105 * send_ring, then we won't hit this path. Instead, outgoing buffers are
1106 * added directly to our outgoing send buffer ring.
1107 */
send_append_vec(struct conn_dir * cd,void * data,int len)1108 static void send_append_vec(struct conn_dir *cd, void *data, int len)
1109 {
1110 struct msg_vec *mvec = snd_msg_vec(cd);
1111
1112 if (mvec->iov_len == mvec->vec_size) {
1113 mvec->vec_size <<= 1;
1114 mvec->iov = realloc(mvec->iov, mvec->vec_size * sizeof(struct iovec));
1115 }
1116
1117 mvec->iov[mvec->iov_len].iov_base = data;
1118 mvec->iov[mvec->iov_len].iov_len = len;
1119 mvec->iov_len++;
1120 }
1121
1122 /*
1123 * Queue a send based on the data received in this cqe, which came from
1124 * a completed receive operation.
1125 */
send_append(struct conn * c,struct conn_dir * cd,void * data,int bid,int len)1126 static void send_append(struct conn *c, struct conn_dir *cd, void *data,
1127 int bid, int len)
1128 {
1129 vlog("%d: send %d (%p, bid %d)\n", c->tid, len, data, bid);
1130
1131 assert(bid < nr_bufs);
1132
1133 /* if using provided buffers for send, add it upfront */
1134 if (send_ring) {
1135 struct conn_buf_ring *cbr = &c->out_br;
1136
1137 io_uring_buf_ring_add(cbr->br, data, len, bid, br_mask, 0);
1138 io_uring_buf_ring_advance(cbr->br, 1);
1139 } else {
1140 send_append_vec(cd, data, len);
1141 }
1142 }
1143
1144 /*
1145 * For non recvmsg && multishot, a zero receive marks the end. For recvmsg
1146 * with multishot, we always get the header regardless. Hence a "zero receive"
1147 * is the size of the header.
1148 */
recv_done_res(int res)1149 static int recv_done_res(int res)
1150 {
1151 if (!res)
1152 return 1;
1153 if (rcv_msg && recv_mshot && res == sizeof(struct io_uring_recvmsg_out))
1154 return 1;
1155 return 0;
1156 }
1157
recv_inc(struct conn * c,struct conn_dir * cd,int * bid,struct io_uring_cqe * cqe)1158 static int recv_inc(struct conn *c, struct conn_dir *cd, int *bid,
1159 struct io_uring_cqe *cqe)
1160 {
1161 struct conn_buf_ring *cbr = &c->out_br;
1162 struct conn_buf_ring *in_cbr = &c->in_br;
1163 void *data;
1164
1165 if (!cqe->res)
1166 return 0;
1167 if (cqe->flags & IORING_CQE_F_BUF_MORE)
1168 return 0;
1169
1170 data = in_cbr->buf + *bid * buf_size;
1171 if (is_sink) {
1172 io_uring_buf_ring_add(in_cbr->br, data, buf_size, *bid, br_mask, 0);
1173 io_uring_buf_ring_advance(in_cbr->br, 1);
1174 } else if (send_ring) {
1175 io_uring_buf_ring_add(cbr->br, data, buf_size, *bid, br_mask, 0);
1176 io_uring_buf_ring_advance(cbr->br, 1);
1177 } else {
1178 send_append(c, cd, data, *bid, buf_size);
1179 }
1180 *bid = (*bid + 1) & (nr_bufs - 1);
1181 return 1;
1182 }
1183
1184 /*
1185 * Any receive that isn't recvmsg with multishot can be handled the same way.
1186 * Iterate from '*bid' and 'in_bytes' in total, and append the data to the
1187 * outgoing queue.
1188 */
recv_bids(struct conn * c,struct conn_dir * cd,int * bid,int in_bytes)1189 static int recv_bids(struct conn *c, struct conn_dir *cd, int *bid, int in_bytes)
1190 {
1191 struct conn_buf_ring *cbr = &c->out_br;
1192 struct conn_buf_ring *in_cbr = &c->in_br;
1193 struct io_uring_buf *buf;
1194 int nr_packets = 0;
1195
1196 while (in_bytes) {
1197 int this_bytes;
1198 void *data;
1199
1200 buf = &in_cbr->br->bufs[*bid];
1201 data = (void *) (unsigned long) buf->addr;
1202 this_bytes = buf->len;
1203 if (this_bytes > in_bytes)
1204 this_bytes = in_bytes;
1205
1206 in_bytes -= this_bytes;
1207
1208 if (send_ring)
1209 io_uring_buf_ring_add(cbr->br, data, this_bytes, *bid,
1210 br_mask, nr_packets);
1211 else
1212 send_append(c, cd, data, *bid, this_bytes);
1213
1214 *bid = (*bid + 1) & (nr_bufs - 1);
1215 nr_packets++;
1216 }
1217
1218 if (send_ring)
1219 io_uring_buf_ring_advance(cbr->br, nr_packets);
1220
1221 return nr_packets;
1222 }
1223
1224 /*
1225 * Special handling of recvmsg with multishot
1226 */
recv_mshot_msg(struct conn * c,struct conn_dir * cd,int * bid,int in_bytes)1227 static int recv_mshot_msg(struct conn *c, struct conn_dir *cd, int *bid,
1228 int in_bytes)
1229 {
1230 struct conn_buf_ring *cbr = &c->out_br;
1231 struct conn_buf_ring *in_cbr = &c->in_br;
1232 struct io_uring_buf *buf;
1233 int nr_packets = 0;
1234
1235 while (in_bytes) {
1236 struct io_uring_recvmsg_out *pdu;
1237 int this_bytes;
1238 void *data;
1239
1240 buf = &in_cbr->br->bufs[*bid];
1241
1242 /*
1243 * multishot recvmsg puts a header in front of the data - we
1244 * have to take that into account for the send setup, and
1245 * adjust the actual data read to not take this metadata into
1246 * account. For this use case, namelen and controllen will not
1247 * be set. If they were, they would need to be factored in too.
1248 */
1249 buf->len -= sizeof(struct io_uring_recvmsg_out);
1250 in_bytes -= sizeof(struct io_uring_recvmsg_out);
1251
1252 pdu = (void *) (unsigned long) buf->addr;
1253 vlog("pdu namelen %d, controllen %d, payload %d flags %x\n",
1254 pdu->namelen, pdu->controllen, pdu->payloadlen,
1255 pdu->flags);
1256 data = (void *) (pdu + 1);
1257
1258 this_bytes = pdu->payloadlen;
1259 if (this_bytes > in_bytes)
1260 this_bytes = in_bytes;
1261
1262 in_bytes -= this_bytes;
1263
1264 if (send_ring)
1265 io_uring_buf_ring_add(cbr->br, data, this_bytes, *bid,
1266 br_mask, nr_packets);
1267 else
1268 send_append(c, cd, data, *bid, this_bytes);
1269
1270 *bid = (*bid + 1) & (nr_bufs - 1);
1271 nr_packets++;
1272 }
1273
1274 if (send_ring)
1275 io_uring_buf_ring_advance(cbr->br, nr_packets);
1276
1277 return nr_packets;
1278 }
1279
__handle_recv(struct io_uring * ring,struct conn * c,struct conn_dir * cd,struct io_uring_cqe * cqe)1280 static int __handle_recv(struct io_uring *ring, struct conn *c,
1281 struct conn_dir *cd, struct io_uring_cqe *cqe)
1282 {
1283 struct conn_dir *ocd = &c->cd[!cd->index];
1284 int bid, nr_packets;
1285
1286 /*
1287 * Not having a buffer attached should only happen if we get a zero
1288 * sized receive, because the other end closed the connection. It
1289 * cannot happen otherwise, as all our receives are using provided
1290 * buffers and hence it's not possible to return a CQE with a non-zero
1291 * result and not have a buffer attached.
1292 */
1293 if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
1294 cd->pending_recv = 0;
1295
1296 if (!recv_done_res(cqe->res)) {
1297 fprintf(stderr, "no buffer assigned, res=%d\n", cqe->res);
1298 return 1;
1299 }
1300 start_close:
1301 prep_next_send(ring, c, ocd, other_dir_fd(c, cqe_to_fd(cqe)));
1302 close_cd(c, cd);
1303 return 0;
1304 }
1305
1306 if (cqe->res && cqe->res < buf_size)
1307 cd->rcv_shrt++;
1308
1309 bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
1310
1311 /*
1312 * BIDI will use the same buffer pool and do receive on both CDs,
1313 * so can't reliably check. TODO.
1314 */
1315 if (!bidi && cd->rcv_next_bid != -1 && bid != cd->rcv_next_bid) {
1316 fprintf(stderr, "recv bid %d, wanted %d\n", bid, cd->rcv_next_bid);
1317 goto start_close;
1318 }
1319
1320 vlog("%d: recv: bid=%d, res=%d, cflags=%x\n", c->tid, bid, cqe->res, cqe->flags);
1321 /*
1322 * If we're a sink, we're done here. Just replenish the buffer back
1323 * to the pool. For proxy mode, we will send the data to the other
1324 * end and the buffer will be replenished once the send is done with
1325 * it.
1326 */
1327 if (buf_ring_inc)
1328 nr_packets = recv_inc(c, ocd, &bid, cqe);
1329 else if (is_sink)
1330 nr_packets = replenish_buffers(c, &bid, cqe->res);
1331 else if (rcv_msg && recv_mshot)
1332 nr_packets = recv_mshot_msg(c, ocd, &bid, cqe->res);
1333 else
1334 nr_packets = recv_bids(c, ocd, &bid, cqe->res);
1335
1336 if (cd->rcv_bucket)
1337 cd->rcv_bucket[nr_packets]++;
1338
1339 if (!is_sink) {
1340 ocd->out_buffers += nr_packets;
1341 assert(ocd->out_buffers <= nr_bufs);
1342 }
1343
1344 cd->rcv++;
1345 cd->rcv_next_bid = bid;
1346
1347 /*
1348 * If IORING_CQE_F_MORE isn't set, then this is either a normal recv
1349 * that needs rearming, or it's a multishot that won't post any further
1350 * completions. Setup a new one for these cases.
1351 */
1352 if (!(cqe->flags & IORING_CQE_F_MORE)) {
1353 cd->pending_recv = 0;
1354 if (recv_done_res(cqe->res))
1355 goto start_close;
1356 if (is_sink || !ocd->pending_send)
1357 __submit_receive(ring, c, &c->cd[0], c->in_fd);
1358 }
1359
1360 /*
1361 * Submit a send if we won't get anymore notifications from this
1362 * recv, or if we have nr_bufs / 2 queued up. If BIDI mode, send
1363 * every buffer. We assume this is interactive mode, and hence don't
1364 * delay anything.
1365 */
1366 if (((!ocd->pending_send && (bidi || (ocd->out_buffers >= nr_bufs / 2))) ||
1367 !(cqe->flags & IORING_CQE_F_MORE)) && !is_sink)
1368 prep_next_send(ring, c, ocd, other_dir_fd(c, cqe_to_fd(cqe)));
1369
1370 if (!recv_done_res(cqe->res))
1371 cd->in_bytes += cqe->res;
1372 return 0;
1373 }
1374
handle_recv(struct io_uring * ring,struct io_uring_cqe * cqe)1375 static int handle_recv(struct io_uring *ring, struct io_uring_cqe *cqe)
1376 {
1377 struct conn *c = cqe_to_conn(cqe);
1378 struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
1379
1380 return __handle_recv(ring, c, cd, cqe);
1381 }
1382
recv_error(struct error_handler * err,struct io_uring * ring,struct io_uring_cqe * cqe)1383 static int recv_error(struct error_handler *err, struct io_uring *ring,
1384 struct io_uring_cqe *cqe)
1385 {
1386 struct conn *c = cqe_to_conn(cqe);
1387 struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
1388
1389 cd->pending_recv = 0;
1390
1391 if (cqe->res != -ENOBUFS)
1392 return default_error(err, ring, cqe);
1393
1394 recv_enobufs(ring, c, cd, other_dir_fd(c, cqe_to_fd(cqe)));
1395 return 0;
1396 }
1397
submit_send(struct io_uring * ring,struct conn * c,struct conn_dir * cd,int fd,void * data,int len,int bid,int flags)1398 static void submit_send(struct io_uring *ring, struct conn *c,
1399 struct conn_dir *cd, int fd, void *data, int len,
1400 int bid, int flags)
1401 {
1402 struct io_uring_sqe *sqe;
1403 int bgid = c->out_br.bgid;
1404
1405 if (cd->pending_send)
1406 return;
1407 cd->pending_send = 1;
1408
1409 flags |= MSG_WAITALL | MSG_NOSIGNAL;
1410
1411 sqe = get_sqe(ring);
1412 if (snd_msg) {
1413 struct io_msg *imsg = &cd->io_snd_msg;
1414
1415 if (snd_zc) {
1416 io_uring_prep_sendmsg_zc(sqe, fd, &imsg->msg, flags);
1417 cd->snd_notif++;
1418 } else {
1419 io_uring_prep_sendmsg(sqe, fd, &imsg->msg, flags);
1420 }
1421 } else if (send_ring) {
1422 io_uring_prep_send(sqe, fd, NULL, 0, flags);
1423 } else if (!snd_zc) {
1424 io_uring_prep_send(sqe, fd, data, len, flags);
1425 } else {
1426 io_uring_prep_send_zc(sqe, fd, data, len, flags, 0);
1427 sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
1428 sqe->buf_index = bid;
1429 cd->snd_notif++;
1430 }
1431 encode_userdata(sqe, c, __SEND, bid, fd);
1432 if (fixed_files)
1433 sqe->flags |= IOSQE_FIXED_FILE;
1434 if (send_ring) {
1435 sqe->flags |= IOSQE_BUFFER_SELECT;
1436 sqe->buf_group = bgid;
1437 }
1438 if (snd_bundle) {
1439 sqe->ioprio |= IORING_RECVSEND_BUNDLE;
1440 cd->snd_mshot++;
1441 } else if (send_ring)
1442 cd->snd_mshot++;
1443 }
1444
1445 /*
1446 * Prepare the next send request, if we need to. If one is already pending,
1447 * or if we're a sink and we don't need to do sends, then there's nothing
1448 * to do.
1449 *
1450 * Return 1 if another send completion is expected, 0 if not.
1451 */
prep_next_send(struct io_uring * ring,struct conn * c,struct conn_dir * cd,int fd)1452 static int prep_next_send(struct io_uring *ring, struct conn *c,
1453 struct conn_dir *cd, int fd)
1454 {
1455 int bid;
1456
1457 if (cd->pending_send || is_sink)
1458 return 0;
1459 if (!cd->out_buffers)
1460 return 0;
1461
1462 bid = cd->snd_next_bid;
1463 if (bid == -1)
1464 bid = 0;
1465
1466 if (send_ring) {
1467 /*
1468 * send_ring mode is easy, there's nothing to do but submit
1469 * our next send request. That will empty the entire outgoing
1470 * queue.
1471 */
1472 submit_send(ring, c, cd, fd, NULL, 0, bid, 0);
1473 return 1;
1474 } else if (snd_msg) {
1475 /*
1476 * For sendmsg mode, submit our currently prepared iovec, if
1477 * we have one, and swap our iovecs so that any further
1478 * receives will start preparing that one.
1479 */
1480 struct io_msg *imsg = &cd->io_snd_msg;
1481
1482 if (!msg_vec(imsg)->iov_len)
1483 return 0;
1484 imsg->msg.msg_iov = msg_vec(imsg)->iov;
1485 imsg->msg.msg_iovlen = msg_vec(imsg)->iov_len;
1486 msg_vec(imsg)->iov_len = 0;
1487 imsg->vec_index = !imsg->vec_index;
1488 submit_send(ring, c, cd, fd, NULL, 0, bid, 0);
1489 return 1;
1490 } else {
1491 /*
1492 * send without send_ring - submit the next available vec,
1493 * if any. If this vec is the last one in the current series,
1494 * then swap to the next vec. We flag each send with MSG_MORE,
1495 * unless this is the last part of the current vec.
1496 */
1497 struct io_msg *imsg = &cd->io_snd_msg;
1498 struct msg_vec *mvec = msg_vec(imsg);
1499 int flags = !snd_zc ? MSG_MORE : 0;
1500 struct iovec *iov;
1501
1502 if (mvec->iov_len == mvec->cur_iov)
1503 return 0;
1504 imsg->msg.msg_iov = msg_vec(imsg)->iov;
1505 iov = &mvec->iov[mvec->cur_iov];
1506 mvec->cur_iov++;
1507 if (mvec->cur_iov == mvec->iov_len) {
1508 mvec->iov_len = 0;
1509 mvec->cur_iov = 0;
1510 imsg->vec_index = !imsg->vec_index;
1511 flags = 0;
1512 }
1513 submit_send(ring, c, cd, fd, iov->iov_base, iov->iov_len, bid, flags);
1514 return 1;
1515 }
1516 }
1517
handle_send_inc(struct conn * c,struct conn_dir * cd,int bid,struct io_uring_cqe * cqe)1518 static int handle_send_inc(struct conn *c, struct conn_dir *cd, int bid,
1519 struct io_uring_cqe *cqe)
1520 {
1521 struct conn_buf_ring *in_cbr = &c->in_br;
1522 int ret = 0;
1523 void *data;
1524
1525 if (!cqe->res)
1526 goto out;
1527 if (cqe->flags & IORING_CQE_F_BUF_MORE)
1528 return 0;
1529
1530 assert(cqe->res <= buf_size);
1531 cd->out_bytes += cqe->res;
1532
1533 data = in_cbr->buf + bid * buf_size;
1534 io_uring_buf_ring_add(in_cbr->br, data, buf_size, bid, br_mask, 0);
1535 io_uring_buf_ring_advance(in_cbr->br, 1);
1536 bid = (bid + 1) & (nr_bufs - 1);
1537 ret = 1;
1538 out:
1539 if (pending_shutdown(c))
1540 close_cd(c, cd);
1541
1542 return ret;
1543 }
1544
1545 /*
1546 * Handling a send with an outgoing send ring. Get the buffers from the
1547 * receive side, and add them to the ingoing buffer ring again.
1548 */
handle_send_ring(struct conn * c,struct conn_dir * cd,int bid,int bytes)1549 static int handle_send_ring(struct conn *c, struct conn_dir *cd, int bid,
1550 int bytes)
1551 {
1552 struct conn_buf_ring *in_cbr = &c->in_br;
1553 struct conn_buf_ring *out_cbr = &c->out_br;
1554 int i = 0;
1555
1556 while (bytes) {
1557 struct io_uring_buf *buf = &out_cbr->br->bufs[bid];
1558 int this_bytes;
1559 void *this_buf;
1560
1561 this_bytes = buf->len;
1562 if (this_bytes > bytes)
1563 this_bytes = bytes;
1564
1565 cd->out_bytes += this_bytes;
1566
1567 vlog("%d: send: bid=%d, len=%d\n", c->tid, bid, this_bytes);
1568
1569 this_buf = in_cbr->buf + bid * buf_size;
1570 io_uring_buf_ring_add(in_cbr->br, this_buf, buf_size, bid, br_mask, i);
1571 /*
1572 * Find the provided buffer that the receive consumed, and
1573 * which we then used for the send, and add it back to the
1574 * pool so it can get picked by another receive. Once the send
1575 * is done, we're done with it.
1576 */
1577 bid = (bid + 1) & (nr_bufs - 1);
1578 bytes -= this_bytes;
1579 i++;
1580 }
1581 cd->snd_next_bid = bid;
1582 io_uring_buf_ring_advance(in_cbr->br, i);
1583
1584 if (pending_shutdown(c))
1585 close_cd(c, cd);
1586
1587 return i;
1588 }
1589
1590 /*
1591 * sendmsg, or send without a ring. Just add buffers back to the ingoing
1592 * ring for receives.
1593 */
handle_send_buf(struct conn * c,struct conn_dir * cd,int bid,int bytes)1594 static int handle_send_buf(struct conn *c, struct conn_dir *cd, int bid,
1595 int bytes)
1596 {
1597 struct conn_buf_ring *in_cbr = &c->in_br;
1598 int i = 0;
1599
1600 while (bytes) {
1601 struct io_uring_buf *buf = &in_cbr->br->bufs[bid];
1602 int this_bytes;
1603
1604 this_bytes = bytes;
1605 if (this_bytes > buf->len)
1606 this_bytes = buf->len;
1607
1608 vlog("%d: send: bid=%d, len=%d\n", c->tid, bid, this_bytes);
1609
1610 cd->out_bytes += this_bytes;
1611 /* each recvmsg mshot package has this overhead */
1612 if (rcv_msg && recv_mshot)
1613 cd->out_bytes += sizeof(struct io_uring_recvmsg_out);
1614 replenish_buffer(in_cbr, bid, i);
1615 bid = (bid + 1) & (nr_bufs - 1);
1616 bytes -= this_bytes;
1617 i++;
1618 }
1619 io_uring_buf_ring_advance(in_cbr->br, i);
1620 cd->snd_next_bid = bid;
1621 return i;
1622 }
1623
__handle_send(struct io_uring * ring,struct conn * c,struct conn_dir * cd,struct io_uring_cqe * cqe)1624 static int __handle_send(struct io_uring *ring, struct conn *c,
1625 struct conn_dir *cd, struct io_uring_cqe *cqe)
1626 {
1627 struct conn_dir *ocd;
1628 int bid, nr_packets;
1629
1630 if (send_ring) {
1631 if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
1632 fprintf(stderr, "no buffer in send?! %d\n", cqe->res);
1633 return 1;
1634 }
1635 bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
1636 } else {
1637 bid = cqe_to_bid(cqe);
1638 }
1639
1640 /*
1641 * CQE notifications only happen with send/sendmsg zerocopy. They
1642 * tell us that the data has been acked, and that hence the buffer
1643 * is now free to reuse. Waiting on an ACK for each packet will slow
1644 * us down tremendously, so do all of our sends and then wait for
1645 * the ACKs to come in. They tend to come in bundles anyway. Once
1646 * all acks are done (cd->snd_notif == 0), then fire off the next
1647 * receive.
1648 */
1649 if (cqe->flags & IORING_CQE_F_NOTIF) {
1650 cd->snd_notif--;
1651 } else {
1652 if (cqe->res && cqe->res < buf_size)
1653 cd->snd_shrt++;
1654
1655 /*
1656 * BIDI will use the same buffer pool and do sends on both CDs,
1657 * so can't reliably check. TODO.
1658 */
1659 if (!bidi && send_ring && cd->snd_next_bid != -1 &&
1660 bid != cd->snd_next_bid) {
1661 fprintf(stderr, "send bid %d, wanted %d at %lu\n", bid,
1662 cd->snd_next_bid, cd->out_bytes);
1663 goto out_close;
1664 }
1665
1666 assert(bid <= nr_bufs);
1667
1668 vlog("send: got %d, %lu\n", cqe->res, cd->out_bytes);
1669
1670 if (buf_ring_inc)
1671 nr_packets = handle_send_inc(c, cd, bid, cqe);
1672 else if (send_ring)
1673 nr_packets = handle_send_ring(c, cd, bid, cqe->res);
1674 else
1675 nr_packets = handle_send_buf(c, cd, bid, cqe->res);
1676
1677 if (cd->snd_bucket)
1678 cd->snd_bucket[nr_packets]++;
1679
1680 cd->out_buffers -= nr_packets;
1681 assert(cd->out_buffers >= 0);
1682
1683 cd->snd++;
1684 }
1685
1686 if (!(cqe->flags & IORING_CQE_F_MORE)) {
1687 int do_recv_arm;
1688
1689 cd->pending_send = 0;
1690
1691 /*
1692 * send done - see if the current vec has data to submit, and
1693 * do so if it does. if it doesn't have data yet, nothing to
1694 * do.
1695 */
1696 do_recv_arm = !prep_next_send(ring, c, cd, cqe_to_fd(cqe));
1697
1698 ocd = &c->cd[!cd->index];
1699 if (!cd->snd_notif && do_recv_arm && !ocd->pending_recv) {
1700 int fd = other_dir_fd(c, cqe_to_fd(cqe));
1701
1702 __submit_receive(ring, c, ocd, fd);
1703 }
1704 out_close:
1705 if (pending_shutdown(c))
1706 close_cd(c, cd);
1707 }
1708
1709 vlog("%d: pending sends %d\n", c->tid, cd->pending_send);
1710 return 0;
1711 }
1712
handle_send(struct io_uring * ring,struct io_uring_cqe * cqe)1713 static int handle_send(struct io_uring *ring, struct io_uring_cqe *cqe)
1714 {
1715 struct conn *c = cqe_to_conn(cqe);
1716 struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
1717
1718 return __handle_send(ring, c, cd, cqe);
1719 }
1720
send_error(struct error_handler * err,struct io_uring * ring,struct io_uring_cqe * cqe)1721 static int send_error(struct error_handler *err, struct io_uring *ring,
1722 struct io_uring_cqe *cqe)
1723 {
1724 struct conn *c = cqe_to_conn(cqe);
1725 struct conn_dir *cd = cqe_to_conn_dir(c, cqe);
1726
1727 cd->pending_send = 0;
1728
1729 /* res can have high bit set */
1730 if (cqe->flags & IORING_CQE_F_NOTIF)
1731 return handle_send(ring, cqe);
1732 if (cqe->res != -ENOBUFS)
1733 return default_error(err, ring, cqe);
1734
1735 cd->snd_enobufs++;
1736 return 0;
1737 }
1738
1739 /*
1740 * We don't expect to get here, as we marked it with skipping posting a
1741 * CQE if it was successful. If it does trigger, than means it fails and
1742 * that our close has not been done. Log the shutdown error and issue a new
1743 * separate close.
1744 */
handle_shutdown(struct io_uring * ring,struct io_uring_cqe * cqe)1745 static int handle_shutdown(struct io_uring *ring, struct io_uring_cqe *cqe)
1746 {
1747 struct conn *c = cqe_to_conn(cqe);
1748 struct io_uring_sqe *sqe;
1749 int fd = cqe_to_fd(cqe);
1750
1751 fprintf(stderr, "Got shutdown notification on fd %d\n", fd);
1752
1753 if (!cqe->res)
1754 fprintf(stderr, "Unexpected success shutdown CQE\n");
1755 else if (cqe->res < 0)
1756 fprintf(stderr, "Shutdown got %s\n", strerror(-cqe->res));
1757
1758 sqe = get_sqe(ring);
1759 if (fixed_files)
1760 io_uring_prep_close_direct(sqe, fd);
1761 else
1762 io_uring_prep_close(sqe, fd);
1763 encode_userdata(sqe, c, __CLOSE, 0, fd);
1764 return 0;
1765 }
1766
1767 /*
1768 * Final stage of a connection, the shutdown and close has finished. Mark
1769 * it as disconnected and let the main loop reap it.
1770 */
handle_close(struct io_uring * ring,struct io_uring_cqe * cqe)1771 static int handle_close(struct io_uring *ring, struct io_uring_cqe *cqe)
1772 {
1773 struct conn *c = cqe_to_conn(cqe);
1774 int fd = cqe_to_fd(cqe);
1775
1776 printf("Closed client: id=%d, in_fd=%d, out_fd=%d\n", c->tid, c->in_fd, c->out_fd);
1777 if (fd == c->in_fd)
1778 c->in_fd = -1;
1779 else if (fd == c->out_fd)
1780 c->out_fd = -1;
1781
1782 if (c->in_fd == -1 && c->out_fd == -1) {
1783 c->flags |= CONN_F_DISCONNECTED;
1784
1785 pthread_mutex_lock(&thread_lock);
1786 __show_stats(c);
1787 open_conns--;
1788 pthread_mutex_unlock(&thread_lock);
1789 free_buffer_rings(ring, c);
1790 free_msgs(&c->cd[0]);
1791 free_msgs(&c->cd[1]);
1792 free(c->cd[0].rcv_bucket);
1793 free(c->cd[0].snd_bucket);
1794 }
1795
1796 return 0;
1797 }
1798
handle_cancel(struct io_uring * ring,struct io_uring_cqe * cqe)1799 static int handle_cancel(struct io_uring *ring, struct io_uring_cqe *cqe)
1800 {
1801 struct conn *c = cqe_to_conn(cqe);
1802 int fd = cqe_to_fd(cqe);
1803
1804 c->pending_cancels--;
1805
1806 vlog("%d: got cancel fd %d, refs %d\n", c->tid, fd, c->pending_cancels);
1807
1808 if (!c->pending_cancels) {
1809 queue_shutdown_close(ring, c, c->in_fd);
1810 if (c->out_fd != -1)
1811 queue_shutdown_close(ring, c, c->out_fd);
1812 io_uring_submit(ring);
1813 }
1814
1815 return 0;
1816 }
1817
open_socket(struct conn * c)1818 static void open_socket(struct conn *c)
1819 {
1820 if (is_sink) {
1821 pthread_mutex_lock(&thread_lock);
1822 open_conns++;
1823 pthread_mutex_unlock(&thread_lock);
1824
1825 submit_receive(&c->ring, c);
1826 } else {
1827 struct io_uring_sqe *sqe;
1828 int domain;
1829
1830 if (ipv6)
1831 domain = AF_INET6;
1832 else
1833 domain = AF_INET;
1834
1835 /*
1836 * If fixed_files is set, proxy will use fixed files for any new
1837 * file descriptors it instantiates. Fixd files, or fixed
1838 * descriptors, are io_uring private file descriptors. They
1839 * cannot be accessed outside of io_uring. io_uring holds a
1840 * fixed reference to them, which means that we do not need to
1841 * grab per-request references to them. Particularly for
1842 * threaded applications, grabbing and dropping file references
1843 * for each operation can be costly as the file table is shared.
1844 * This generally shows up as fget/fput related overhead in any
1845 * workload profiles.
1846 *
1847 * Fixed descriptors are passed in via the 'fd' field just like
1848 * regular descriptors, and then marked as such by setting the
1849 * IOSQE_FIXED_FILE flag in the sqe->flags field. Some helpers
1850 * do that automatically, like the below, others will need it
1851 * set manually if they don't have a *direct*() helper.
1852 *
1853 * For operations that instantiate them, like the opening of a
1854 * direct socket, the application may either ask the kernel to
1855 * find a free one (as is done below), or the application may
1856 * manage the space itself and pass in an index for a currently
1857 * free slot in the table. If the kernel is asked to allocate a
1858 * free direct descriptor, note that io_uring does not abide by
1859 * the POSIX mandated "lowest free must be returned". It may
1860 * return any free descriptor of its choosing.
1861 */
1862 sqe = get_sqe(&c->ring);
1863 if (fixed_files)
1864 io_uring_prep_socket_direct_alloc(sqe, domain, SOCK_STREAM, 0, 0);
1865 else
1866 io_uring_prep_socket(sqe, domain, SOCK_STREAM, 0, 0);
1867 encode_userdata(sqe, c, __SOCK, 0, 0);
1868 }
1869 }
1870
1871 /*
1872 * Start of connection, we got our in descriptor.
1873 */
handle_fd_pass(struct io_uring_cqe * cqe)1874 static int handle_fd_pass(struct io_uring_cqe *cqe)
1875 {
1876 struct conn *c = cqe_to_conn(cqe);
1877 int fd = cqe_to_fd(cqe);
1878
1879 vlog("%d: got fd pass %d\n", c->tid, fd);
1880 c->in_fd = fd;
1881 open_socket(c);
1882 return 0;
1883 }
1884
handle_stop(struct io_uring_cqe * cqe)1885 static int handle_stop(struct io_uring_cqe *cqe)
1886 {
1887 struct conn *c = cqe_to_conn(cqe);
1888
1889 printf("Client %d: queueing shutdown\n", c->tid);
1890 queue_cancel(&c->ring, c);
1891 return 0;
1892 }
1893
1894 /*
1895 * Called for each CQE that we receive. Decode the request type that it
1896 * came from, and call the appropriate handler.
1897 */
handle_cqe(struct io_uring * ring,struct io_uring_cqe * cqe)1898 static int handle_cqe(struct io_uring *ring, struct io_uring_cqe *cqe)
1899 {
1900 int ret;
1901
1902 /*
1903 * Unlikely, but there's an error in this CQE. If an error handler
1904 * is defined, call it, and that will deal with it. If no error
1905 * handler is defined, the opcode handler either doesn't care or will
1906 * handle it on its own.
1907 */
1908 if (cqe->res < 0) {
1909 struct error_handler *err = &error_handlers[cqe_to_op(cqe)];
1910
1911 if (err->error_fn)
1912 return err->error_fn(err, ring, cqe);
1913 }
1914
1915 switch (cqe_to_op(cqe)) {
1916 case __ACCEPT:
1917 ret = handle_accept(ring, cqe);
1918 break;
1919 case __SOCK:
1920 ret = handle_sock(ring, cqe);
1921 break;
1922 case __CONNECT:
1923 ret = handle_connect(ring, cqe);
1924 break;
1925 case __RECV:
1926 case __RECVMSG:
1927 ret = handle_recv(ring, cqe);
1928 break;
1929 case __SEND:
1930 case __SENDMSG:
1931 ret = handle_send(ring, cqe);
1932 break;
1933 case __CANCEL:
1934 ret = handle_cancel(ring, cqe);
1935 break;
1936 case __SHUTDOWN:
1937 ret = handle_shutdown(ring, cqe);
1938 break;
1939 case __CLOSE:
1940 ret = handle_close(ring, cqe);
1941 break;
1942 case __FD_PASS:
1943 ret = handle_fd_pass(cqe);
1944 break;
1945 case __STOP:
1946 ret = handle_stop(cqe);
1947 break;
1948 case __NOP:
1949 ret = 0;
1950 break;
1951 default:
1952 fprintf(stderr, "bad user data %lx\n", (long) cqe->user_data);
1953 return 1;
1954 }
1955
1956 return ret;
1957 }
1958
house_keeping(struct io_uring * ring)1959 static void house_keeping(struct io_uring *ring)
1960 {
1961 static unsigned long last_bytes;
1962 unsigned long bytes, elapsed;
1963 struct conn *c;
1964 int i, j;
1965
1966 vlog("House keeping entered\n");
1967
1968 bytes = 0;
1969 for (i = 0; i < nr_conns; i++) {
1970 c = &conns[i];
1971
1972 for (j = 0; j < 2; j++) {
1973 struct conn_dir *cd = &c->cd[j];
1974
1975 bytes += cd->in_bytes + cd->out_bytes;
1976 }
1977 if (c->flags & CONN_F_DISCONNECTED) {
1978 vlog("%d: disconnected\n", i);
1979
1980 if (!(c->flags & CONN_F_REAPED)) {
1981 void *ret;
1982
1983 pthread_join(c->thread, &ret);
1984 c->flags |= CONN_F_REAPED;
1985 }
1986 continue;
1987 }
1988 if (c->flags & CONN_F_DISCONNECTING)
1989 continue;
1990
1991 if (should_shutdown(c)) {
1992 __close_conn(ring, c);
1993 c->flags |= CONN_F_DISCONNECTING;
1994 }
1995 }
1996
1997 elapsed = mtime_since_now(&last_housekeeping);
1998 if (bytes && elapsed >= 900) {
1999 unsigned long bw;
2000
2001 bw = (8 * (bytes - last_bytes) / 1000UL) / elapsed;
2002 if (bw) {
2003 if (open_conns)
2004 printf("Bandwidth (threads=%d): %'luMbit\n", open_conns, bw);
2005 gettimeofday(&last_housekeeping, NULL);
2006 last_bytes = bytes;
2007 }
2008 }
2009 }
2010
2011 /*
2012 * Event loop shared between the parent, and the connections. Could be
2013 * split in two, as they don't handle the same types of events. For the per
2014 * connection loop, 'c' is valid. For the main loop, it's NULL.
2015 */
__event_loop(struct io_uring * ring,struct conn * c)2016 static int __event_loop(struct io_uring *ring, struct conn *c)
2017 {
2018 struct __kernel_timespec active_ts, idle_ts;
2019 int flags;
2020
2021 idle_ts.tv_sec = 0;
2022 idle_ts.tv_nsec = 100000000LL;
2023 active_ts = idle_ts;
2024 if (wait_usec > 1000000) {
2025 active_ts.tv_sec = wait_usec / 1000000;
2026 wait_usec -= active_ts.tv_sec * 1000000;
2027 }
2028 active_ts.tv_nsec = wait_usec * 1000;
2029
2030 gettimeofday(&last_housekeeping, NULL);
2031
2032 flags = 0;
2033 while (1) {
2034 struct __kernel_timespec *ts = &idle_ts;
2035 struct io_uring_cqe *cqe;
2036 unsigned int head;
2037 int ret, i, to_wait;
2038
2039 /*
2040 * If wait_batch is set higher than 1, then we'll wait on
2041 * that amount of CQEs to be posted each loop. If used with
2042 * DEFER_TASKRUN, this can provide a substantial reduction
2043 * in context switch rate as the task isn't woken until the
2044 * requested number of events can be returned.
2045 *
2046 * Can be used with -t to set a wait_usec timeout as well.
2047 * For example, if an application can deal with 250 usec
2048 * of wait latencies, it can set -w8 -t250 which will cause
2049 * io_uring to return when either 8 events have been received,
2050 * or if 250 usec of waiting has passed.
2051 *
2052 * If we don't have any open connections, wait on just 1
2053 * always.
2054 */
2055 to_wait = 1;
2056 if (open_conns && !flags) {
2057 ts = &active_ts;
2058 to_wait = wait_batch;
2059 }
2060
2061 vlog("Submit and wait for %d\n", to_wait);
2062 ret = io_uring_submit_and_wait_timeout(ring, &cqe, to_wait, ts, NULL);
2063
2064 if (*ring->cq.koverflow)
2065 printf("overflow %u\n", *ring->cq.koverflow);
2066 if (*ring->sq.kflags & IORING_SQ_CQ_OVERFLOW)
2067 printf("saw overflow\n");
2068
2069 vlog("Submit and wait: %d\n", ret);
2070
2071 i = flags = 0;
2072 io_uring_for_each_cqe(ring, head, cqe) {
2073 if (handle_cqe(ring, cqe))
2074 return 1;
2075 flags |= cqe_to_conn(cqe)->flags;
2076 ++i;
2077 }
2078
2079 vlog("Handled %d events\n", i);
2080
2081 /*
2082 * Advance the CQ ring for seen events when we've processed
2083 * all of them in this loop. This can also be done with
2084 * io_uring_cqe_seen() in each handler above, which just marks
2085 * that single CQE as seen. However, it's more efficient to
2086 * mark a batch as seen when we're done with that batch.
2087 */
2088 if (i) {
2089 io_uring_cq_advance(ring, i);
2090 events += i;
2091 }
2092
2093 event_loops++;
2094 if (c) {
2095 if (c->flags & CONN_F_DISCONNECTED)
2096 break;
2097 } else {
2098 house_keeping(ring);
2099 }
2100 }
2101
2102 return 0;
2103 }
2104
2105 /*
2106 * Main event loop, Submit our multishot accept request, and then just loop
2107 * around handling incoming connections.
2108 */
parent_loop(struct io_uring * ring,int fd)2109 static int parent_loop(struct io_uring *ring, int fd)
2110 {
2111 struct io_uring_sqe *sqe;
2112
2113 /*
2114 * proxy provides a way to use either multishot receive or not, but
2115 * for accept, we always use multishot. A multishot accept request
2116 * needs only be armed once, and then it'll trigger a completion and
2117 * post a CQE whenever a new connection is accepted. No need to do
2118 * anything else, unless the multishot accept terminates. This happens
2119 * if it encounters an error. Applications should check for
2120 * IORING_CQE_F_MORE in cqe->flags - this tells you if more completions
2121 * are expected from this request or not. Non-multishot never have
2122 * this set, where multishot will always have this set unless an error
2123 * occurs.
2124 */
2125 sqe = get_sqe(ring);
2126 if (fixed_files)
2127 io_uring_prep_multishot_accept_direct(sqe, fd, NULL, NULL, 0);
2128 else
2129 io_uring_prep_multishot_accept(sqe, fd, NULL, NULL, 0);
2130 __encode_userdata(sqe, 0, __ACCEPT, 0, fd);
2131
2132 return __event_loop(ring, NULL);
2133 }
2134
init_ring(struct io_uring * ring,int nr_files)2135 static int init_ring(struct io_uring *ring, int nr_files)
2136 {
2137 struct io_uring_params params;
2138 int ret;
2139
2140 /*
2141 * By default, set us up with a big CQ ring. Not strictly needed
2142 * here, but it's very important to never overflow the CQ ring.
2143 * Events will not be dropped if this happens, but it does slow
2144 * the application down in dealing with overflown events.
2145 *
2146 * Set SINGLE_ISSUER, which tells the kernel that only one thread
2147 * is doing IO submissions. This enables certain optimizations in
2148 * the kernel.
2149 */
2150 memset(¶ms, 0, sizeof(params));
2151 params.flags |= IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_CLAMP;
2152 params.flags |= IORING_SETUP_CQSIZE;
2153 params.cq_entries = 1024;
2154
2155 /*
2156 * If use_huge is set, setup the ring with IORING_SETUP_NO_MMAP. This
2157 * means that the application allocates the memory for the ring, and
2158 * the kernel maps it. The alternative is having the kernel allocate
2159 * the memory, and then liburing will mmap it. But we can't really
2160 * support huge pages that way. If this fails, then ensure that the
2161 * system has huge pages set aside upfront.
2162 */
2163 if (use_huge)
2164 params.flags |= IORING_SETUP_NO_MMAP;
2165
2166 /*
2167 * DEFER_TASKRUN decouples async event reaping and retrying from
2168 * regular system calls. If this isn't set, then io_uring uses
2169 * normal task_work for this. task_work is always being run on any
2170 * exit to userspace. Real applications do more than just call IO
2171 * related system calls, and hence we can be running this work way
2172 * too often. Using DEFER_TASKRUN defers any task_work running to
2173 * when the application enters the kernel anyway to wait on new
2174 * events. It's generally the preferred and recommended way to setup
2175 * a ring.
2176 */
2177 if (defer_tw) {
2178 params.flags |= IORING_SETUP_DEFER_TASKRUN;
2179 sqpoll = 0;
2180 }
2181
2182 /*
2183 * SQPOLL offloads any request submission and retry operations to a
2184 * dedicated thread. This enables an application to do IO without
2185 * ever having to enter the kernel itself. The SQPOLL thread will
2186 * stay busy as long as there's work to do, and go to sleep if
2187 * sq_thread_idle msecs have passed. If it's running, submitting new
2188 * IO just needs to make them visible to the SQPOLL thread, it needs
2189 * not enter the kernel. For submission, the application will only
2190 * enter the kernel if the SQPOLL has been idle long enough that it
2191 * has gone to sleep.
2192 *
2193 * Waiting on events still need to enter the kernel, if none are
2194 * available. The application may also use io_uring_peek_cqe() to
2195 * check for new events without entering the kernel, as completions
2196 * will be continually produced to the CQ ring by the SQPOLL thread
2197 * as they occur.
2198 */
2199 if (sqpoll) {
2200 params.flags |= IORING_SETUP_SQPOLL;
2201 params.sq_thread_idle = 1000;
2202 defer_tw = 0;
2203 }
2204
2205 /*
2206 * If neither DEFER_TASKRUN or SQPOLL is used, set COOP_TASKRUN. This
2207 * avoids heavy signal based notifications, which can force an
2208 * application to enter the kernel and process it as soon as they
2209 * occur.
2210 */
2211 if (!sqpoll && !defer_tw)
2212 params.flags |= IORING_SETUP_COOP_TASKRUN;
2213
2214 /*
2215 * The SQ ring size need not be larger than any batch of requests
2216 * that need to be prepared before submit. Normally in a loop we'd
2217 * only need a few, if any, particularly if multishot is used.
2218 */
2219 ret = io_uring_queue_init_params(ring_size, ring, ¶ms);
2220 if (ret) {
2221 fprintf(stderr, "%s\n", strerror(-ret));
2222 return 1;
2223 }
2224
2225 /*
2226 * If send serialization is available and no option was given to use
2227 * it or not, default it to on. If it was turned on and the kernel
2228 * doesn't support it, turn it off.
2229 */
2230 if (params.features & IORING_FEAT_SEND_BUF_SELECT) {
2231 if (send_ring == -1)
2232 send_ring = 1;
2233 } else {
2234 if (send_ring == 1) {
2235 fprintf(stderr, "Kernel doesn't support ring provided "
2236 "buffers for sends, disabled\n");
2237 }
2238 send_ring = 0;
2239 }
2240
2241 if (!send_ring && snd_bundle) {
2242 fprintf(stderr, "Can't use send bundle without send_ring\n");
2243 snd_bundle = 0;
2244 }
2245
2246 if (fixed_files) {
2247 /*
2248 * If fixed files are used, we need to allocate a fixed file
2249 * table upfront where new direct descriptors can be managed.
2250 */
2251 ret = io_uring_register_files_sparse(ring, nr_files);
2252 if (ret) {
2253 fprintf(stderr, "file register: %d\n", ret);
2254 return 1;
2255 }
2256
2257 /*
2258 * If fixed files are used, we also register the ring fd. See
2259 * comment near io_uring_prep_socket_direct_alloc() further
2260 * down. This avoids the fget/fput overhead associated with
2261 * the io_uring_enter(2) system call itself, which is used to
2262 * submit and wait on events.
2263 */
2264 ret = io_uring_register_ring_fd(ring);
2265 if (ret != 1) {
2266 fprintf(stderr, "ring register: %d\n", ret);
2267 return 1;
2268 }
2269 }
2270
2271 if (napi) {
2272 struct io_uring_napi n = {
2273 .prefer_busy_poll = napi > 1 ? 1 : 0,
2274 .busy_poll_to = napi_timeout,
2275 };
2276
2277 ret = io_uring_register_napi(ring, &n);
2278 if (ret) {
2279 fprintf(stderr, "io_uring_register_napi: %d\n", ret);
2280 if (ret != -EINVAL)
2281 return 1;
2282 fprintf(stderr, "NAPI not available, turned off\n");
2283 }
2284 }
2285
2286 return 0;
2287 }
2288
thread_main(void * data)2289 static void *thread_main(void *data)
2290 {
2291 struct conn *c = data;
2292 int ret;
2293
2294 c->flags |= CONN_F_STARTED;
2295
2296 /* we need a max of 4 descriptors for each client */
2297 ret = init_ring(&c->ring, 4);
2298 if (ret)
2299 goto done;
2300
2301 if (setup_buffer_rings(&c->ring, c))
2302 goto done;
2303
2304 /*
2305 * If we're using fixed files, then we need to wait for the parent
2306 * to install the c->in_fd into our direct descriptor table. When
2307 * that happens, we'll set things up. If we're not using fixed files,
2308 * we can set up the receive or connect now.
2309 */
2310 if (!fixed_files)
2311 open_socket(c);
2312
2313 /* we're ready */
2314 pthread_barrier_wait(&c->startup_barrier);
2315
2316 __event_loop(&c->ring, c);
2317 done:
2318 return NULL;
2319 }
2320
usage(const char * name)2321 static void usage(const char *name)
2322 {
2323 printf("%s:\n", name);
2324 printf("\t-m:\t\tUse multishot receive (%d)\n", recv_mshot);
2325 printf("\t-d:\t\tUse DEFER_TASKRUN (%d)\n", defer_tw);
2326 printf("\t-S:\t\tUse SQPOLL (%d)\n", sqpoll);
2327 printf("\t-f:\t\tUse only fixed files (%d)\n", fixed_files);
2328 printf("\t-a:\t\tUse huge pages for the ring (%d)\n", use_huge);
2329 printf("\t-t:\t\tTimeout for waiting on CQEs (usec) (%d)\n", wait_usec);
2330 printf("\t-w:\t\tNumber of CQEs to wait for each loop (%d)\n", wait_batch);
2331 printf("\t-B:\t\tUse bi-directional mode (%d)\n", bidi);
2332 printf("\t-s:\t\tAct only as a sink (%d)\n", is_sink);
2333 printf("\t-q:\t\tRing size to use (%d)\n", ring_size);
2334 printf("\t-H:\t\tHost to connect to (%s)\n", host);
2335 printf("\t-r:\t\tPort to receive on (%d)\n", receive_port);
2336 printf("\t-p:\t\tPort to connect to (%d)\n", send_port);
2337 printf("\t-6:\t\tUse IPv6 (%d)\n", ipv6);
2338 printf("\t-N:\t\tUse NAPI polling (%d)\n", napi);
2339 printf("\t-T:\t\tNAPI timeout (usec) (%d)\n", napi_timeout);
2340 printf("\t-b:\t\tSend/receive buf size (%d)\n", buf_size);
2341 printf("\t-n:\t\tNumber of provided buffers (pow2) (%d)\n", nr_bufs);
2342 printf("\t-u:\t\tUse provided buffers for send (%d)\n", send_ring);
2343 printf("\t-C:\t\tUse bundles for send (%d)\n", snd_bundle);
2344 printf("\t-z:\t\tUse zerocopy send (%d)\n", snd_zc);
2345 printf("\t-c:\t\tUse bundles for recv (%d)\n", snd_bundle);
2346 printf("\t-M:\t\tUse sendmsg (%d)\n", snd_msg);
2347 printf("\t-M:\t\tUse recvmsg (%d)\n", rcv_msg);
2348 printf("\t-x:\t\tShow extended stats (%d)\n", ext_stat);
2349 printf("\t-V:\t\tIncrease verbosity (%d)\n", verbose);
2350 }
2351
2352 /*
2353 * Options parsing the ring / net setup
2354 */
main(int argc,char * argv[])2355 int main(int argc, char *argv[])
2356 {
2357 struct io_uring ring;
2358 struct sigaction sa = { };
2359 const char *optstring;
2360 int opt, ret, fd;
2361
2362 setlocale(LC_NUMERIC, "en_US");
2363
2364 page_size = sysconf(_SC_PAGESIZE);
2365 if (page_size < 0) {
2366 perror("sysconf(_SC_PAGESIZE)");
2367 return 1;
2368 }
2369
2370 pthread_mutex_init(&thread_lock, NULL);
2371
2372 optstring = "m:d:S:s:b:f:H:r:p:n:B:N:T:w:t:M:R:u:c:C:q:a:x:z:i:6Vh?";
2373 while ((opt = getopt(argc, argv, optstring)) != -1) {
2374 switch (opt) {
2375 case 'm':
2376 recv_mshot = !!atoi(optarg);
2377 break;
2378 case 'S':
2379 sqpoll = !!atoi(optarg);
2380 break;
2381 case 'd':
2382 defer_tw = !!atoi(optarg);
2383 break;
2384 case 'b':
2385 buf_size = atoi(optarg);
2386 break;
2387 case 'n':
2388 nr_bufs = atoi(optarg);
2389 break;
2390 case 'u':
2391 send_ring = !!atoi(optarg);
2392 break;
2393 case 'c':
2394 rcv_bundle = !!atoi(optarg);
2395 break;
2396 case 'C':
2397 snd_bundle = !!atoi(optarg);
2398 break;
2399 case 'w':
2400 wait_batch = atoi(optarg);
2401 break;
2402 case 't':
2403 wait_usec = atoi(optarg);
2404 break;
2405 case 's':
2406 is_sink = !!atoi(optarg);
2407 break;
2408 case 'f':
2409 fixed_files = !!atoi(optarg);
2410 break;
2411 case 'H':
2412 host = strdup(optarg);
2413 break;
2414 case 'r':
2415 receive_port = atoi(optarg);
2416 break;
2417 case 'p':
2418 send_port = atoi(optarg);
2419 break;
2420 case 'B':
2421 bidi = !!atoi(optarg);
2422 break;
2423 case 'N':
2424 napi = !!atoi(optarg);
2425 break;
2426 case 'T':
2427 napi_timeout = atoi(optarg);
2428 break;
2429 case '6':
2430 ipv6 = true;
2431 break;
2432 case 'M':
2433 snd_msg = !!atoi(optarg);
2434 break;
2435 case 'z':
2436 snd_zc = !!atoi(optarg);
2437 break;
2438 case 'R':
2439 rcv_msg = !!atoi(optarg);
2440 break;
2441 case 'q':
2442 ring_size = atoi(optarg);
2443 break;
2444 case 'i':
2445 buf_ring_inc = !!atoi(optarg);
2446 break;
2447 case 'a':
2448 use_huge = !!atoi(optarg);
2449 break;
2450 case 'x':
2451 ext_stat = !!atoi(optarg);
2452 break;
2453 case 'V':
2454 verbose++;
2455 break;
2456 case 'h':
2457 default:
2458 usage(argv[0]);
2459 return 1;
2460 }
2461 }
2462
2463 if (bidi && is_sink) {
2464 fprintf(stderr, "Can't be both bidi proxy and sink\n");
2465 return 1;
2466 }
2467 if (snd_msg && sqpoll) {
2468 fprintf(stderr, "SQPOLL with msg variants disabled\n");
2469 snd_msg = 0;
2470 }
2471 if (rcv_msg && rcv_bundle) {
2472 fprintf(stderr, "Can't use bundles with recvmsg\n");
2473 rcv_msg = 0;
2474 }
2475 if (snd_msg && snd_bundle) {
2476 fprintf(stderr, "Can't use bundles with sendmsg\n");
2477 snd_msg = 0;
2478 }
2479 if (snd_msg && send_ring) {
2480 fprintf(stderr, "Can't use send ring sendmsg\n");
2481 snd_msg = 0;
2482 }
2483 if (snd_zc && (send_ring || snd_bundle)) {
2484 fprintf(stderr, "Can't use send zc with bundles or ring\n");
2485 send_ring = snd_bundle = 0;
2486 }
2487 /*
2488 * For recvmsg w/multishot, we waste some data at the head of the
2489 * packet every time. Adjust the buffer size to account for that,
2490 * so we're still handing 'buf_size' actual payload of data.
2491 */
2492 if (rcv_msg && recv_mshot) {
2493 fprintf(stderr, "Adjusted buf size for recvmsg w/multishot\n");
2494 buf_size += sizeof(struct io_uring_recvmsg_out);
2495 }
2496
2497 br_mask = nr_bufs - 1;
2498
2499 fd = setup_listening_socket(receive_port, ipv6);
2500 if (is_sink)
2501 send_port = -1;
2502
2503 if (fd == -1)
2504 return 1;
2505
2506 atexit(show_stats);
2507 sa.sa_handler = sig_int;
2508 sa.sa_flags = SA_RESTART;
2509 sigaction(SIGINT, &sa, NULL);
2510
2511 ret = init_ring(&ring, MAX_CONNS * 3);
2512 if (ret)
2513 return ret;
2514
2515 printf("Backend: sqpoll=%d, defer_tw=%d, fixed_files=%d, "
2516 "is_sink=%d, buf_size=%d, nr_bufs=%d, host=%s, send_port=%d, "
2517 "receive_port=%d, napi=%d, napi_timeout=%d, huge_page=%d\n",
2518 sqpoll, defer_tw, fixed_files, is_sink,
2519 buf_size, nr_bufs, host, send_port, receive_port,
2520 napi, napi_timeout, use_huge);
2521 printf(" recv options: recvmsg=%d, recv_mshot=%d, recv_bundle=%d\n",
2522 rcv_msg, recv_mshot, rcv_bundle);
2523 printf(" send options: sendmsg=%d, send_ring=%d, send_bundle=%d, "
2524 "send_zerocopy=%d\n", snd_msg, send_ring, snd_bundle,
2525 snd_zc);
2526
2527 return parent_loop(&ring, fd);
2528 }
2529