From 8a68ee510f5da20edf7fa06da701713ef10db930 Mon Sep 17 00:00:00 2001 From: jiangheng12 Date: Thu, 16 Mar 2023 19:59:26 +0800 Subject: [PATCH] same node & gazellectl -a --- src/api/sockets.c | 21 +++++++++++++++++++++ src/core/ipv4/ip4_frag.c | 4 ++++ src/core/netif.c | 7 ++++--- src/core/pbuf.c | 6 ++++++ src/core/tcp.c | 39 +++++++++++++++++++++++++++++++++++++++ src/core/tcp_in.c | 6 ++++++ src/core/tcp_out.c | 11 +++++++++++ src/include/lwip/pbuf.h | 3 +++ src/include/lwip/tcp.h | 10 ++++++++++ src/include/lwipopts.h | 7 +++++++ src/include/lwipsock.h | 37 +++++++++++++++++++++++++++++++++++++ 11 files changed, 148 insertions(+), 3 deletions(-) diff --git a/src/api/sockets.c b/src/api/sockets.c index 356e345..7a5da26 100644 --- a/src/api/sockets.c +++ b/src/api/sockets.c @@ -605,6 +605,10 @@ alloc_socket(struct netconn *newconn, int accepted, int flags) * (unless it has been created by accept()). */ sockets[i].sendevent = (NETCONNTYPE_GROUP(newconn->type) == NETCONN_TCP ? (accepted != 0) : 1); sockets[i].errevent = 0; + sockets[i].same_node_rx_ring = NULL; + sockets[i].same_node_rx_ring_mz = NULL; + sockets[i].same_node_tx_ring = NULL; + sockets[i].same_node_tx_ring_mz = NULL; return i + LWIP_SOCKET_OFFSET; } else { lwip_close(i); @@ -716,6 +720,11 @@ free_socket(struct lwip_sock *sock, int is_tcp) /* Protect socket array */ SYS_ARCH_PROTECT(lev); +#if GAZELLE_ENABLE + /* remove sock from same_node_recv_lit */ + list_del_node_null(&sock->recv_list); +#endif + freed = free_socket_locked(sock, is_tcp, &conn, &lastdata); SYS_ARCH_UNPROTECT(lev); /* don't use 'sock' after this line, as another task might have allocated it */ @@ -780,6 +789,18 @@ lwip_accept4(int s, struct sockaddr *addr, socklen_t *addrlen, int flags) LWIP_ASSERT("invalid socket index", (newsock >= LWIP_SOCKET_OFFSET) && (newsock < NUM_SOCKETS + LWIP_SOCKET_OFFSET)); #endif /* GAZELLE_ENABLE */ nsock = &sockets[newsock - LWIP_SOCKET_OFFSET]; +#if GAZELLE_ENABLE + struct tcp_pcb *pcb = newconn->pcb.tcp; + if (pcb->client_rx_ring != NULL && pcb->client_tx_ring != NULL) { + if (find_same_node_memzone(pcb, nsock) != 0) { + netconn_delete(newconn); + free_socket(nsock, 1); + sock_set_errno(sock, ENOTCONN); + done_socket(sock); + return -1; + } + } +#endif /* See event_callback: If data comes in right away after an accept, even * though the server task might not have created a new socket yet. diff --git a/src/core/ipv4/ip4_frag.c b/src/core/ipv4/ip4_frag.c index f15b798..e01ea51 100644 --- a/src/core/ipv4/ip4_frag.c +++ b/src/core/ipv4/ip4_frag.c @@ -729,6 +729,7 @@ ip_frag_free_pbuf_custom_ref(struct pbuf_custom_ref *p) /** Free-callback function to free a 'struct pbuf_custom_ref', called by * pbuf_free. */ +#if !GAZELLE_ENABLE static void ipfrag_free_pbuf_custom(struct pbuf *p) { @@ -740,6 +741,7 @@ ipfrag_free_pbuf_custom(struct pbuf *p) } ip_frag_free_pbuf_custom_ref(pcr); } +#endif #endif /* !LWIP_NETIF_TX_SINGLE_PBUF */ /** @@ -851,7 +853,9 @@ ip4_frag(struct pbuf *p, struct netif *netif, const ip4_addr_t *dest) } pbuf_ref(p); pcr->original = p; +#if !GAZELLE_ENABLE pcr->pc.custom_free_function = ipfrag_free_pbuf_custom; +#endif /* Add it to end of rambuf's chain, but using pbuf_cat, not pbuf_chain * so that it is removed when pbuf_dechain is later called on rambuf. diff --git a/src/core/netif.c b/src/core/netif.c index 70392cb..86b74a0 100644 --- a/src/core/netif.c +++ b/src/core/netif.c @@ -1065,7 +1065,7 @@ netif_set_link_callback(struct netif *netif, netif_status_callback_fn link_callb } #endif /* LWIP_NETIF_LINK_CALLBACK */ -#if ENABLE_LOOPBACK +#if !GAZELLE_ENABLE /** * @ingroup netif * Send an IP packet to be received on the same netif (loopif-like). @@ -1184,6 +1184,7 @@ netif_loop_output(struct netif *netif, struct pbuf *p) return ERR_OK; } +#endif #if LWIP_HAVE_LOOPIF #if LWIP_IPV4 @@ -1205,7 +1206,7 @@ netif_loop_output_ipv6(struct netif *netif, struct pbuf *p, const ip6_addr_t *ad #endif /* LWIP_IPV6 */ #endif /* LWIP_HAVE_LOOPIF */ - +#if !GAZELLE_ENABLE /** * Call netif_poll() in the main loop of your application. This is to prevent * reentering non-reentrant functions like tcp_input(). Packets passed to @@ -1277,6 +1278,7 @@ netif_poll(struct netif *netif) } SYS_ARCH_UNPROTECT(lev); } +#endif #if !LWIP_NETIF_LOOPBACK_MULTITHREADING /** @@ -1292,7 +1294,6 @@ netif_poll_all(void) } } #endif /* !LWIP_NETIF_LOOPBACK_MULTITHREADING */ -#endif /* ENABLE_LOOPBACK */ #if LWIP_NUM_NETIF_CLIENT_DATA > 0 /** diff --git a/src/core/pbuf.c b/src/core/pbuf.c index dd71519..2385e57 100644 --- a/src/core/pbuf.c +++ b/src/core/pbuf.c @@ -69,6 +69,7 @@ */ #include "lwip/opt.h" +#include "lwipsock.h" #include "lwip/pbuf.h" #include "lwip/stats.h" @@ -189,6 +190,7 @@ pbuf_init_alloced_pbuf(struct pbuf *p, void *payload, u16_t tot_len, u16_t len, p->flags = flags; p->ref = 1; p->if_idx = NETIF_NO_INDEX; + p->pcb = NULL; } /** @@ -777,9 +779,13 @@ pbuf_free(struct pbuf *p) #if LWIP_SUPPORT_CUSTOM_PBUF /* is this a custom pbuf? */ if ((p->flags & PBUF_FLAG_IS_CUSTOM) != 0) { +#if GAZELLE_ENABLE + gazelle_free_pbuf(p); +#else struct pbuf_custom *pc = (struct pbuf_custom *)p; LWIP_ASSERT("pc->custom_free_function != NULL", pc->custom_free_function != NULL); pc->custom_free_function(p); +#endif } else #endif /* LWIP_SUPPORT_CUSTOM_PBUF */ { diff --git a/src/core/tcp.c b/src/core/tcp.c index 69a39f6..538a664 100644 --- a/src/core/tcp.c +++ b/src/core/tcp.c @@ -116,6 +116,8 @@ #include #include +#include "lwipsock.h" + #ifdef LWIP_HOOK_FILENAME #include LWIP_HOOK_FILENAME #endif @@ -250,6 +252,18 @@ void tcp_free(struct tcp_pcb *pcb) { #if GAZELLE_ENABLE + if (pcb->free_ring == 1) { + struct netconn *netconn = NULL; + struct lwip_sock *sock = NULL; + rte_ring_free(pcb->client_rx_ring); + rte_ring_free(pcb->client_tx_ring); + netconn = (struct netconn *)pcb->callback_arg; + sock = get_socket(netconn->socket); + rte_memzone_free(sock->same_node_rx_ring->mz); + rte_memzone_free(sock->same_node_rx_ring_mz); + rte_memzone_free(sock->same_node_tx_ring->mz); + rte_memzone_free(sock->same_node_tx_ring_mz); + } vdev_unreg_done(pcb); release_port(pcb->local_port); #endif @@ -996,6 +1010,15 @@ tcp_listen_with_backlog_and_err(struct tcp_pcb *pcb, u8_t backlog, err_t *err) /* pcb transfer to lpcb and reg into tcp_listen_pcbs. freeing pcb shouldn't release sock table in here. * local_port=0 avoid to release sock table in tcp_free */ pcb->local_port = 0; + + char name[RING_NAME_LEN]; + snprintf(name, sizeof(name), "listen_rx_ring_%u", lpcb->local_port); + if (rte_ring_lookup(name) != NULL) { + /* port reuse */ + lpcb->listen_rx_ring = NULL; + } else { + same_node_ring_create(&lpcb->listen_rx_ring, SAME_NODE_RING_SIZE, lpcb->local_port, "listen", "rx"); + } #endif tcp_free(pcb); #if LWIP_CALLBACK_API @@ -1262,6 +1285,16 @@ tcp_connect(struct tcp_pcb *pcb, const ip_addr_t *ipaddr, u16_t port, #endif /* SO_REUSE */ } +#if GAZELLE_ENABLE + /* communication between processes on the same node */ + if (ip_addr_cmp(&pcb->local_ip, &pcb->remote_ip)) { + ret = create_same_node_ring(pcb); + if (ret != 0) { + return ret; + } + } +#endif + iss = tcp_next_iss(pcb); pcb->rcv_nxt = 0; pcb->snd_nxt = iss; @@ -2090,7 +2123,13 @@ tcp_alloc(u8_t prio) pcb->keep_intvl = TCP_KEEPINTVL_DEFAULT; pcb->keep_cnt = TCP_KEEPCNT_DEFAULT; #endif /* LWIP_TCP_KEEPALIVE */ +#if GAZELLE_ENABLE + pcb->client_rx_ring = NULL; + pcb->client_tx_ring = NULL; + pcb->free_ring = 0; +#endif } + return pcb; } diff --git a/src/core/tcp_in.c b/src/core/tcp_in.c index dd83260..719cf04 100644 --- a/src/core/tcp_in.c +++ b/src/core/tcp_in.c @@ -42,6 +42,7 @@ */ #include "lwip/opt.h" +#include "lwipsock.h" #if LWIP_TCP /* don't build if not configured for use in lwipopts.h */ @@ -806,6 +807,11 @@ tcp_listen_input(struct tcp_pcb_listen *pcb) #if GAZELLE_ENABLE vdev_reg_done(REG_RING_TCP_CONNECT, npcb); + if (ip_addr_cmp(&npcb->local_ip, &npcb->remote_ip)) { + if (find_same_node_ring(npcb) != 0) { + return; + } + } #endif /* Parse any options in the SYN. */ diff --git a/src/core/tcp_out.c b/src/core/tcp_out.c index 8100e18..b1c317d 100644 --- a/src/core/tcp_out.c +++ b/src/core/tcp_out.c @@ -725,6 +725,10 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u16_t len, u8_t apiflags) goto memerr; } +#if GAZELLE_ENABLE + lstack_calculate_aggregate(2, p->tot_len); +#endif + if ((seg = tcp_create_segment(pcb, p, 0, pcb->snd_lbb + pos, optflags)) == NULL) { #if GAZELLE_ENABLE if (pos > 0) { @@ -1705,6 +1709,10 @@ tcp_output_segment(struct tcp_seg *seg, struct tcp_pcb *pcb, struct netif *netif int seg_chksum_was_swapped = 0; #endif +#if USE_LIBOS + lstack_calculate_aggregate(1, seg->len); +#endif + LWIP_ASSERT("tcp_output_segment: invalid seg", seg != NULL); LWIP_ASSERT("tcp_output_segment: invalid pcb", pcb != NULL); LWIP_ASSERT("tcp_output_segment: invalid netif", netif != NULL); @@ -1899,6 +1907,8 @@ tcp_output_segment(struct tcp_seg *seg, struct tcp_pcb *pcb, struct netif *netif PERF_START(PERF_LAYER_IP, PERF_POINT_IP_SEND); NETIF_SET_HINTS(netif, &(pcb->netif_hints)); + + seg->p->pcb = pcb; err = ip_output_if(seg->p, &pcb->local_ip, &pcb->remote_ip, pcb->ttl, pcb->tos, IP_PROTO_TCP, netif); NETIF_RESET_HINTS(netif); @@ -2236,6 +2246,7 @@ tcp_output_control_segment(struct tcp_pcb *pcb, struct pbuf *p, err_t err; struct netif *netif; + p->pcb = pcb; LWIP_ASSERT("tcp_output_control_segment: invalid pbuf", p != NULL); if (pcb == NULL || pcb->pcb_if == NULL) { diff --git a/src/include/lwip/pbuf.h b/src/include/lwip/pbuf.h index 6c4ca44..9321afc 100644 --- a/src/include/lwip/pbuf.h +++ b/src/include/lwip/pbuf.h @@ -235,6 +235,7 @@ struct pbuf { u8_t head; struct pbuf *last; pthread_spinlock_t pbuf_lock; + struct tcp_pcb *pcb; #endif /* GAZELLE_ENABLE CHECKSUM_OFFLOAD_SWITCH */ /** In case the user needs to store data custom data on a pbuf */ @@ -263,7 +264,9 @@ struct pbuf_custom { /** The actual pbuf */ struct pbuf pbuf; /** This function is called when pbuf_free deallocates this pbuf(_custom) */ +#if !GAZELLE_ENABLE pbuf_free_custom_fn custom_free_function; +#endif }; #endif /* LWIP_SUPPORT_CUSTOM_PBUF */ diff --git a/src/include/lwip/tcp.h b/src/include/lwip/tcp.h index b822f40..e13099c 100644 --- a/src/include/lwip/tcp.h +++ b/src/include/lwip/tcp.h @@ -260,6 +260,9 @@ struct tcp_pcb_listen { u8_t master_lpcb; #endif +#if GAZELLE_ENABLE + struct rte_ring *listen_rx_ring; +#endif }; @@ -418,6 +421,13 @@ struct tcp_pcb { u8_t rcv_scale; #endif +#if GAZELLE_ENABLE +#define SAME_NODE_RING_SIZE 512 + struct rte_ring *client_rx_ring; + struct rte_ring *client_tx_ring; + u8_t free_ring; +#endif + u8_t need_tso_send; }; diff --git a/src/include/lwipopts.h b/src/include/lwipopts.h index 414ead4..0d2a6d9 100644 --- a/src/include/lwipopts.h +++ b/src/include/lwipopts.h @@ -235,4 +235,11 @@ #define SIOCSHIWAT 1 +/* + ------------------------------------ + ---------- Netif options ---------- + ------------------------------------ +*/ +#define LWIP_NETIF_LOOPBACK 1 + #endif /* __LWIPOPTS_H__ */ diff --git a/src/include/lwipsock.h b/src/include/lwipsock.h index 7e16ec8..f917d8a 100644 --- a/src/include/lwipsock.h +++ b/src/include/lwipsock.h @@ -65,7 +65,19 @@ struct protocol_stack; struct wakeup_poll; struct rte_ring; #include +#include + +// 8M +#define SAME_NODE_RING_LEN (unsigned long long)(8388608) +#define SAME_NODE_RING_MASK (unsigned long long)(8388608 - 1) +#define RING_NAME_LEN 32 +struct same_node_ring { + const struct rte_memzone *mz; + unsigned long long sndbegin; + unsigned long long sndend; +}; #endif + /** Contains all internal pointers and states used for a socket */ struct lwip_sock { /** sockets currently are built on netconns, each socket has one netconn */ @@ -120,9 +132,25 @@ struct lwip_sock { struct protocol_stack *stack; struct rte_ring *recv_ring; struct rte_ring *send_ring; + + /* same node send data ring */ + struct same_node_ring *same_node_rx_ring; + const struct rte_memzone *same_node_rx_ring_mz; + struct same_node_ring *same_node_tx_ring; + const struct rte_memzone *same_node_tx_ring_mz; #endif }; +#if GAZELLE_ENABLE +static inline unsigned same_node_ring_count(struct lwip_sock *sock) +{ + const unsigned long long cur_begin = __atomic_load_n(&sock->same_node_rx_ring->sndbegin, __ATOMIC_RELAXED); + const unsigned long long cur_end = __atomic_load_n(&sock->same_node_rx_ring->sndend, __ATOMIC_RELAXED); + + return cur_end - cur_begin; +} +#endif + #ifndef set_errno #define set_errno(err) do { if (err) { errno = (err); } } while(0) #endif @@ -142,6 +170,15 @@ extern struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size extern void gazelle_init_sock(int32_t fd); extern void gazelle_clean_sock(int32_t fd); extern void write_lwip_over(struct lwip_sock *sock); +extern void netif_poll(struct netif *netif); +extern err_t netif_loop_output(struct netif *netif, struct pbuf *p); +extern err_t find_same_node_memzone(struct tcp_pcb *pcb, struct lwip_sock *nsock); +extern err_t same_node_memzone_create(const struct rte_memzone **zone, int size, int port, char *name, char *); +extern err_t same_node_ring_create(struct rte_ring **ring, int size, int port, char *name, char *rx); +extern err_t create_same_node_ring(struct tcp_pcb *pcb); +extern err_t find_same_node_ring(struct tcp_pcb *pcb); +extern void gazelle_free_pbuf(struct pbuf *pbuf); +extern void lstack_calculate_aggregate(int type, uint32_t len); #endif /* GAZELLE_ENABLE */ struct lwip_sock *get_socket(int s); -- 2.23.0