• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1From 8a68ee510f5da20edf7fa06da701713ef10db930 Mon Sep 17 00:00:00 2001
2From: jiangheng12 <jiangheng14@huawei.com>
3Date: Thu, 16 Mar 2023 19:59:26 +0800
4Subject: [PATCH] same node & gazellectl -a
5
6---
7 src/api/sockets.c        | 21 +++++++++++++++++++++
8 src/core/ipv4/ip4_frag.c |  4 ++++
9 src/core/netif.c         |  7 ++++---
10 src/core/pbuf.c          |  6 ++++++
11 src/core/tcp.c           | 39 +++++++++++++++++++++++++++++++++++++++
12 src/core/tcp_in.c        |  6 ++++++
13 src/core/tcp_out.c       | 11 +++++++++++
14 src/include/lwip/pbuf.h  |  3 +++
15 src/include/lwip/tcp.h   | 10 ++++++++++
16 src/include/lwipopts.h   |  7 +++++++
17 src/include/lwipsock.h   | 37 +++++++++++++++++++++++++++++++++++++
18 11 files changed, 148 insertions(+), 3 deletions(-)
19
20diff --git a/src/api/sockets.c b/src/api/sockets.c
21index 356e345..7a5da26 100644
22--- a/src/api/sockets.c
23+++ b/src/api/sockets.c
24@@ -605,6 +605,10 @@ alloc_socket(struct netconn *newconn, int accepted, int flags)
25      * (unless it has been created by accept()). */
26     sockets[i].sendevent  = (NETCONNTYPE_GROUP(newconn->type) == NETCONN_TCP ? (accepted != 0) : 1);
27     sockets[i].errevent   = 0;
28+    sockets[i].same_node_rx_ring = NULL;
29+    sockets[i].same_node_rx_ring_mz = NULL;
30+    sockets[i].same_node_tx_ring = NULL;
31+    sockets[i].same_node_tx_ring_mz = NULL;
32     return i + LWIP_SOCKET_OFFSET;
33   } else {
34     lwip_close(i);
35@@ -716,6 +720,11 @@ free_socket(struct lwip_sock *sock, int is_tcp)
36   /* Protect socket array */
37   SYS_ARCH_PROTECT(lev);
38
39+#if GAZELLE_ENABLE
40+  /* remove sock from same_node_recv_lit */
41+  list_del_node_null(&sock->recv_list);
42+#endif
43+
44   freed = free_socket_locked(sock, is_tcp, &conn, &lastdata);
45   SYS_ARCH_UNPROTECT(lev);
46   /* don't use 'sock' after this line, as another task might have allocated it */
47@@ -780,6 +789,18 @@ lwip_accept4(int s, struct sockaddr *addr, socklen_t *addrlen, int flags)
48   LWIP_ASSERT("invalid socket index", (newsock >= LWIP_SOCKET_OFFSET) && (newsock < NUM_SOCKETS + LWIP_SOCKET_OFFSET));
49 #endif /* GAZELLE_ENABLE */
50   nsock = &sockets[newsock - LWIP_SOCKET_OFFSET];
51+#if GAZELLE_ENABLE
52+  struct tcp_pcb *pcb = newconn->pcb.tcp;
53+  if (pcb->client_rx_ring != NULL && pcb->client_tx_ring != NULL) {
54+    if (find_same_node_memzone(pcb, nsock) != 0) {
55+      netconn_delete(newconn);
56+      free_socket(nsock, 1);
57+      sock_set_errno(sock, ENOTCONN);
58+      done_socket(sock);
59+      return -1;
60+    }
61+  }
62+#endif
63
64   /* See event_callback: If data comes in right away after an accept, even
65    * though the server task might not have created a new socket yet.
66diff --git a/src/core/ipv4/ip4_frag.c b/src/core/ipv4/ip4_frag.c
67index f15b798..e01ea51 100644
68--- a/src/core/ipv4/ip4_frag.c
69+++ b/src/core/ipv4/ip4_frag.c
70@@ -729,6 +729,7 @@ ip_frag_free_pbuf_custom_ref(struct pbuf_custom_ref *p)
71
72 /** Free-callback function to free a 'struct pbuf_custom_ref', called by
73  * pbuf_free. */
74+#if !GAZELLE_ENABLE
75 static void
76 ipfrag_free_pbuf_custom(struct pbuf *p)
77 {
78@@ -740,6 +741,7 @@ ipfrag_free_pbuf_custom(struct pbuf *p)
79   }
80   ip_frag_free_pbuf_custom_ref(pcr);
81 }
82+#endif
83 #endif /* !LWIP_NETIF_TX_SINGLE_PBUF */
84
85 /**
86@@ -851,7 +853,9 @@ ip4_frag(struct pbuf *p, struct netif *netif, const ip4_addr_t *dest)
87       }
88       pbuf_ref(p);
89       pcr->original = p;
90+#if !GAZELLE_ENABLE
91       pcr->pc.custom_free_function = ipfrag_free_pbuf_custom;
92+#endif
93
94       /* Add it to end of rambuf's chain, but using pbuf_cat, not pbuf_chain
95        * so that it is removed when pbuf_dechain is later called on rambuf.
96diff --git a/src/core/netif.c b/src/core/netif.c
97index 70392cb..86b74a0 100644
98--- a/src/core/netif.c
99+++ b/src/core/netif.c
100@@ -1065,7 +1065,7 @@ netif_set_link_callback(struct netif *netif, netif_status_callback_fn link_callb
101 }
102 #endif /* LWIP_NETIF_LINK_CALLBACK */
103
104-#if ENABLE_LOOPBACK
105+#if !GAZELLE_ENABLE
106 /**
107  * @ingroup netif
108  * Send an IP packet to be received on the same netif (loopif-like).
109@@ -1184,6 +1184,7 @@ netif_loop_output(struct netif *netif, struct pbuf *p)
110
111   return ERR_OK;
112 }
113+#endif
114
115 #if LWIP_HAVE_LOOPIF
116 #if LWIP_IPV4
117@@ -1205,7 +1206,7 @@ netif_loop_output_ipv6(struct netif *netif, struct pbuf *p, const ip6_addr_t *ad
118 #endif /* LWIP_IPV6 */
119 #endif /* LWIP_HAVE_LOOPIF */
120
121-
122+#if !GAZELLE_ENABLE
123 /**
124  * Call netif_poll() in the main loop of your application. This is to prevent
125  * reentering non-reentrant functions like tcp_input(). Packets passed to
126@@ -1277,6 +1278,7 @@ netif_poll(struct netif *netif)
127   }
128   SYS_ARCH_UNPROTECT(lev);
129 }
130+#endif
131
132 #if !LWIP_NETIF_LOOPBACK_MULTITHREADING
133 /**
134@@ -1292,7 +1294,6 @@ netif_poll_all(void)
135   }
136 }
137 #endif /* !LWIP_NETIF_LOOPBACK_MULTITHREADING */
138-#endif /* ENABLE_LOOPBACK */
139
140 #if LWIP_NUM_NETIF_CLIENT_DATA > 0
141 /**
142diff --git a/src/core/pbuf.c b/src/core/pbuf.c
143index dd71519..2385e57 100644
144--- a/src/core/pbuf.c
145+++ b/src/core/pbuf.c
146@@ -69,6 +69,7 @@
147  */
148
149 #include "lwip/opt.h"
150+#include "lwipsock.h"
151
152 #include "lwip/pbuf.h"
153 #include "lwip/stats.h"
154@@ -189,6 +190,7 @@ pbuf_init_alloced_pbuf(struct pbuf *p, void *payload, u16_t tot_len, u16_t len,
155   p->flags = flags;
156   p->ref = 1;
157   p->if_idx = NETIF_NO_INDEX;
158+  p->pcb = NULL;
159 }
160
161 /**
162@@ -777,9 +779,13 @@ pbuf_free(struct pbuf *p)
163 #if LWIP_SUPPORT_CUSTOM_PBUF
164       /* is this a custom pbuf? */
165       if ((p->flags & PBUF_FLAG_IS_CUSTOM) != 0) {
166+#if GAZELLE_ENABLE
167+        gazelle_free_pbuf(p);
168+#else
169         struct pbuf_custom *pc = (struct pbuf_custom *)p;
170         LWIP_ASSERT("pc->custom_free_function != NULL", pc->custom_free_function != NULL);
171         pc->custom_free_function(p);
172+#endif
173       } else
174 #endif /* LWIP_SUPPORT_CUSTOM_PBUF */
175       {
176diff --git a/src/core/tcp.c b/src/core/tcp.c
177index 69a39f6..538a664 100644
178--- a/src/core/tcp.c
179+++ b/src/core/tcp.c
180@@ -116,6 +116,8 @@
181 #include <string.h>
182 #include <pthread.h>
183
184+#include "lwipsock.h"
185+
186 #ifdef LWIP_HOOK_FILENAME
187 #include LWIP_HOOK_FILENAME
188 #endif
189@@ -250,6 +252,18 @@ void
190 tcp_free(struct tcp_pcb *pcb)
191 {
192 #if GAZELLE_ENABLE
193+  if (pcb->free_ring == 1) {
194+    struct netconn *netconn = NULL;
195+    struct lwip_sock *sock = NULL;
196+    rte_ring_free(pcb->client_rx_ring);
197+    rte_ring_free(pcb->client_tx_ring);
198+    netconn = (struct netconn *)pcb->callback_arg;
199+    sock = get_socket(netconn->socket);
200+    rte_memzone_free(sock->same_node_rx_ring->mz);
201+    rte_memzone_free(sock->same_node_rx_ring_mz);
202+    rte_memzone_free(sock->same_node_tx_ring->mz);
203+    rte_memzone_free(sock->same_node_tx_ring_mz);
204+  }
205   vdev_unreg_done(pcb);
206   release_port(pcb->local_port);
207 #endif
208@@ -996,6 +1010,15 @@ tcp_listen_with_backlog_and_err(struct tcp_pcb *pcb, u8_t backlog, err_t *err)
209   /* pcb transfer to lpcb and reg into tcp_listen_pcbs. freeing pcb shouldn't release sock table in here.
210    * local_port=0 avoid to release sock table in tcp_free */
211   pcb->local_port = 0;
212+
213+  char name[RING_NAME_LEN];
214+  snprintf(name, sizeof(name), "listen_rx_ring_%u", lpcb->local_port);
215+  if (rte_ring_lookup(name) != NULL) {
216+      /* port reuse */
217+      lpcb->listen_rx_ring = NULL;
218+  } else {
219+      same_node_ring_create(&lpcb->listen_rx_ring, SAME_NODE_RING_SIZE, lpcb->local_port, "listen", "rx");
220+  }
221 #endif
222   tcp_free(pcb);
223 #if LWIP_CALLBACK_API
224@@ -1262,6 +1285,16 @@ tcp_connect(struct tcp_pcb *pcb, const ip_addr_t *ipaddr, u16_t port,
225 #endif /* SO_REUSE */
226   }
227
228+#if GAZELLE_ENABLE
229+  /* communication between processes on the same node */
230+  if (ip_addr_cmp(&pcb->local_ip, &pcb->remote_ip)) {
231+    ret = create_same_node_ring(pcb);
232+    if (ret != 0) {
233+      return ret;
234+    }
235+  }
236+#endif
237+
238   iss = tcp_next_iss(pcb);
239   pcb->rcv_nxt = 0;
240   pcb->snd_nxt = iss;
241@@ -2090,7 +2123,13 @@ tcp_alloc(u8_t prio)
242     pcb->keep_intvl = TCP_KEEPINTVL_DEFAULT;
243     pcb->keep_cnt   = TCP_KEEPCNT_DEFAULT;
244 #endif /* LWIP_TCP_KEEPALIVE */
245+#if GAZELLE_ENABLE
246+    pcb->client_rx_ring = NULL;
247+    pcb->client_tx_ring = NULL;
248+    pcb->free_ring = 0;
249+#endif
250   }
251+
252   return pcb;
253 }
254
255diff --git a/src/core/tcp_in.c b/src/core/tcp_in.c
256index dd83260..719cf04 100644
257--- a/src/core/tcp_in.c
258+++ b/src/core/tcp_in.c
259@@ -42,6 +42,7 @@
260  */
261
262 #include "lwip/opt.h"
263+#include "lwipsock.h"
264
265 #if LWIP_TCP /* don't build if not configured for use in lwipopts.h */
266
267@@ -806,6 +807,11 @@ tcp_listen_input(struct tcp_pcb_listen *pcb)
268
269 #if GAZELLE_ENABLE
270     vdev_reg_done(REG_RING_TCP_CONNECT, npcb);
271+    if (ip_addr_cmp(&npcb->local_ip, &npcb->remote_ip)) {
272+      if (find_same_node_ring(npcb) != 0) {
273+        return;
274+      }
275+    }
276 #endif
277
278     /* Parse any options in the SYN. */
279diff --git a/src/core/tcp_out.c b/src/core/tcp_out.c
280index 8100e18..b1c317d 100644
281--- a/src/core/tcp_out.c
282+++ b/src/core/tcp_out.c
283@@ -725,6 +725,10 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u16_t len, u8_t apiflags)
284       goto memerr;
285     }
286
287+#if GAZELLE_ENABLE
288+    lstack_calculate_aggregate(2, p->tot_len);
289+#endif
290+
291     if ((seg = tcp_create_segment(pcb, p, 0, pcb->snd_lbb + pos, optflags)) == NULL) {
292 #if GAZELLE_ENABLE
293       if (pos > 0) {
294@@ -1705,6 +1709,10 @@ tcp_output_segment(struct tcp_seg *seg, struct tcp_pcb *pcb, struct netif *netif
295   int seg_chksum_was_swapped = 0;
296 #endif
297
298+#if USE_LIBOS
299+  lstack_calculate_aggregate(1, seg->len);
300+#endif
301+
302   LWIP_ASSERT("tcp_output_segment: invalid seg", seg != NULL);
303   LWIP_ASSERT("tcp_output_segment: invalid pcb", pcb != NULL);
304   LWIP_ASSERT("tcp_output_segment: invalid netif", netif != NULL);
305@@ -1899,6 +1907,8 @@ tcp_output_segment(struct tcp_seg *seg, struct tcp_pcb *pcb, struct netif *netif
306   PERF_START(PERF_LAYER_IP, PERF_POINT_IP_SEND);
307
308   NETIF_SET_HINTS(netif, &(pcb->netif_hints));
309+
310+  seg->p->pcb = pcb;
311   err = ip_output_if(seg->p, &pcb->local_ip, &pcb->remote_ip, pcb->ttl,
312                      pcb->tos, IP_PROTO_TCP, netif);
313   NETIF_RESET_HINTS(netif);
314@@ -2236,6 +2246,7 @@ tcp_output_control_segment(struct tcp_pcb *pcb, struct pbuf *p,
315   err_t err;
316   struct netif *netif;
317
318+  p->pcb = pcb;
319   LWIP_ASSERT("tcp_output_control_segment: invalid pbuf", p != NULL);
320
321   if (pcb == NULL || pcb->pcb_if == NULL) {
322diff --git a/src/include/lwip/pbuf.h b/src/include/lwip/pbuf.h
323index 6c4ca44..9321afc 100644
324--- a/src/include/lwip/pbuf.h
325+++ b/src/include/lwip/pbuf.h
326@@ -235,6 +235,7 @@ struct pbuf {
327   u8_t head;
328   struct pbuf *last;
329   pthread_spinlock_t pbuf_lock;
330+  struct tcp_pcb *pcb;
331 #endif /* GAZELLE_ENABLE CHECKSUM_OFFLOAD_SWITCH */
332
333   /** In case the user needs to store data custom data on a pbuf */
334@@ -263,7 +264,9 @@ struct pbuf_custom {
335   /** The actual pbuf */
336   struct pbuf pbuf;
337   /** This function is called when pbuf_free deallocates this pbuf(_custom) */
338+#if !GAZELLE_ENABLE
339   pbuf_free_custom_fn custom_free_function;
340+#endif
341 };
342 #endif /* LWIP_SUPPORT_CUSTOM_PBUF */
343
344diff --git a/src/include/lwip/tcp.h b/src/include/lwip/tcp.h
345index b822f40..e13099c 100644
346--- a/src/include/lwip/tcp.h
347+++ b/src/include/lwip/tcp.h
348@@ -260,6 +260,9 @@ struct tcp_pcb_listen {
349   u8_t master_lpcb;
350 #endif
351
352+#if GAZELLE_ENABLE
353+  struct rte_ring *listen_rx_ring;
354+#endif
355 };
356
357
358@@ -418,6 +421,13 @@ struct tcp_pcb {
359   u8_t rcv_scale;
360 #endif
361
362+#if GAZELLE_ENABLE
363+#define SAME_NODE_RING_SIZE 512
364+  struct rte_ring *client_rx_ring;
365+  struct rte_ring *client_tx_ring;
366+  u8_t free_ring;
367+#endif
368+
369   u8_t need_tso_send;
370 };
371
372diff --git a/src/include/lwipopts.h b/src/include/lwipopts.h
373index 414ead4..0d2a6d9 100644
374--- a/src/include/lwipopts.h
375+++ b/src/include/lwipopts.h
376@@ -235,4 +235,11 @@
377
378 #define SIOCSHIWAT 1
379
380+/*
381+   ------------------------------------
382+   ---------- Netif options ----------
383+   ------------------------------------
384+*/
385+#define LWIP_NETIF_LOOPBACK 1
386+
387 #endif /* __LWIPOPTS_H__ */
388diff --git a/src/include/lwipsock.h b/src/include/lwipsock.h
389index 7e16ec8..f917d8a 100644
390--- a/src/include/lwipsock.h
391+++ b/src/include/lwipsock.h
392@@ -65,7 +65,19 @@ struct protocol_stack;
393 struct wakeup_poll;
394 struct rte_ring;
395 #include <rte_common.h>
396+#include <rte_memzone.h>
397+
398+// 8M
399+#define SAME_NODE_RING_LEN (unsigned long long)(8388608)
400+#define SAME_NODE_RING_MASK (unsigned long long)(8388608 - 1)
401+#define RING_NAME_LEN 32
402+struct same_node_ring {
403+    const struct rte_memzone *mz;
404+    unsigned long long sndbegin;
405+    unsigned long long sndend;
406+};
407 #endif
408+
409 /** Contains all internal pointers and states used for a socket */
410 struct lwip_sock {
411   /** sockets currently are built on netconns, each socket has one netconn */
412@@ -120,9 +132,25 @@ struct lwip_sock {
413   struct protocol_stack *stack;
414   struct rte_ring *recv_ring;
415   struct rte_ring *send_ring;
416+
417+  /* same node send data ring */
418+  struct same_node_ring *same_node_rx_ring;
419+  const struct rte_memzone *same_node_rx_ring_mz;
420+  struct same_node_ring *same_node_tx_ring;
421+  const struct rte_memzone *same_node_tx_ring_mz;
422 #endif
423 };
424
425+#if GAZELLE_ENABLE
426+static inline unsigned same_node_ring_count(struct lwip_sock *sock)
427+{
428+  const unsigned long long cur_begin = __atomic_load_n(&sock->same_node_rx_ring->sndbegin, __ATOMIC_RELAXED);
429+  const unsigned long long cur_end = __atomic_load_n(&sock->same_node_rx_ring->sndend, __ATOMIC_RELAXED);
430+
431+  return cur_end - cur_begin;
432+}
433+#endif
434+
435 #ifndef set_errno
436 #define set_errno(err) do { if (err) { errno = (err); } } while(0)
437 #endif
438@@ -142,6 +170,15 @@ extern struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size
439 extern void gazelle_init_sock(int32_t fd);
440 extern void gazelle_clean_sock(int32_t fd);
441 extern void write_lwip_over(struct lwip_sock *sock);
442+extern void netif_poll(struct netif *netif);
443+extern err_t netif_loop_output(struct netif *netif, struct pbuf *p);
444+extern err_t find_same_node_memzone(struct tcp_pcb *pcb, struct lwip_sock *nsock);
445+extern err_t same_node_memzone_create(const struct rte_memzone **zone, int size, int port, char *name, char *);
446+extern err_t same_node_ring_create(struct rte_ring **ring, int size, int port, char *name, char *rx);
447+extern err_t create_same_node_ring(struct tcp_pcb *pcb);
448+extern err_t find_same_node_ring(struct tcp_pcb *pcb);
449+extern void gazelle_free_pbuf(struct pbuf *pbuf);
450+extern void lstack_calculate_aggregate(int type, uint32_t len);
451 #endif /* GAZELLE_ENABLE */
452
453 struct lwip_sock *get_socket(int s);
454--
4552.23.0
456
457