• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "spunge_core.h"
17 #ifdef FILLP_LINUX
18 #include <errno.h>
19 #endif
20 #include <stdio.h>
21 #include "securec.h"
22 #include "sysio.h"
23 #include "res.h"
24 #include "socket_common.h"
25 #include "fillp_flow_control.h"
26 #include "timing_wheel.h"
27 #include "fillp_buf_item.h"
28 #include "callbacks.h"
29 #include "fillp_common.h"
30 #include "spunge.h"
31 #include "spunge_stack.h"
32 #include "spunge_message.h"
33 #include "fillp_output.h"
34 #include "fillp_input.h"
35 #include "fillp_dfx.h"
36 
37 #ifdef __cplusplus
38 extern "C" {
39 #endif
40 
41 SYS_ARCH_SEM g_resDeinitSem;
42 #define BIT_MOVE_CNT 3
43 #define RECV_RATE_PAR_LOW 0.98
44 #define RECV_RATE_PAT_HIGH 1.02
45 #define RECV_STATE_THRESHOLD 10
46 
47 void SpungeFreeInstanceResource(struct SpungeInstance *inst);
48 
49 
SpungeDoRecvCycle(struct SockOsSocket * osSock,struct SpungeInstance * inst)50 void SpungeDoRecvCycle(struct SockOsSocket *osSock, struct SpungeInstance *inst)
51 {
52     FILLP_UINT32 i;
53     struct NetBuf buf;
54     struct SpungePcb *spcb = FILLP_NULL_PTR;
55 
56     if (!OS_SOCK_OPS_FUNC_VALID(osSock, fetchPacket)) {
57         return;
58     }
59 
60     (void)memset_s(&buf, sizeof(buf), 0, sizeof(buf));
61     buf.p = inst->tmpBuf[0];
62     for (i = 0; i < g_resource.udp.rxBurst; i++) {
63         spcb = osSock->ioSock->ops->fetchPacket((void *)osSock, (void *)&buf, 0);
64         if (spcb != FILLP_NULL_PTR) {
65             FillpDoInput(&spcb->fpcb, &buf, inst);
66             continue;
67         } else {
68             break;
69         }
70     }
71 }
72 
SpungeCalExpectedBytes(FILLP_UINT32 * sendPktNum,struct SpungePcb * pcb,struct FtSocket * sock,struct FillpFlowControl * flowControl,FILLP_LLONG detaTime)73 static FILLP_UINT32 SpungeCalExpectedBytes(FILLP_UINT32 *sendPktNum, struct SpungePcb *pcb,
74     struct FtSocket *sock, struct FillpFlowControl *flowControl, FILLP_LLONG detaTime)
75 {
76     FILLP_UINT32 bytesExpected;
77     FILLP_UINT32 pktNum = sock->resConf.udp.txBurst;
78 
79     if (flowControl->sendInterval) {
80         pktNum = (FILLP_UINT32)(detaTime / flowControl->sendInterval);
81     }
82 
83     if (pktNum <= (sock->resConf.udp.txBurst)) {
84         /* sendRate is kbps */
85         FILLP_ULLONG bitsExpected = (FILLP_ULLONG)(detaTime * flowControl->sendRate / FILLP_ONE_SECOND);
86         bitsExpected >>= FILLP_TIME_PRECISION;
87         bytesExpected = (FILLP_UINT32)(FILLP_UTILS_BIT2BYTE(bitsExpected));
88         pcb->fpcb.statistics.traffic.packExpSendBytes += bytesExpected;
89         bytesExpected += pcb->fpcb.send.flowControl.remainBytes;
90     } else {
91         pktNum = sock->resConf.udp.txBurst;
92         bytesExpected = (FILLP_UINT32)(pktNum * pcb->fpcb.pktSize);
93         pcb->fpcb.statistics.traffic.packExpSendBytes += bytesExpected;
94     }
95     *sendPktNum = pktNum;
96     FILLP_LOGDBG("before_send_cycle fillp_sock_id:%d unRecvNum:%u, unAck:%u\r\n",
97         sock->index, pcb->fpcb.send.unrecvList.nodeNum, pcb->fpcb.send.unackList.count);
98     return bytesExpected;
99 }
100 
SpungeDoSendUpdate(struct SpungePcb * pcb,FILLP_UINT32 sendBytes,FILLP_UINT32 bytesExpected)101 static void SpungeDoSendUpdate(struct SpungePcb *pcb, FILLP_UINT32 sendBytes, FILLP_UINT32 bytesExpected)
102 {
103     if ((sendBytes > 0) && (bytesExpected >= (FILLP_UINT32)sendBytes)) {
104         pcb->fpcb.statistics.traffic.packSendBytes += sendBytes;
105         pcb->fpcb.send.flowControl.remainBytes = (bytesExpected - (FILLP_UINT32)sendBytes);
106     } else {
107         pcb->fpcb.send.flowControl.remainBytes = 0;
108     }
109 }
110 
SpungeDoSendCycle(struct SpungePcb * pcb,struct SpungeInstance * inst,FILLP_LLONG detaTime)111 void SpungeDoSendCycle(struct SpungePcb *pcb, struct SpungeInstance *inst, FILLP_LLONG detaTime)
112 {
113     FILLP_UINT32 sendPktNum;
114     FILLP_UINT32 sendBytes = 0;
115     FILLP_UINT32 tmpBytes = 0;
116     FILLP_UINT32 bytesExpected;
117 
118     if ((pcb == FILLP_NULL_PTR) || (pcb->conn == FILLP_NULL_PTR)) {
119         FILLP_LOGERR("NULL Pointer");
120         return;
121     }
122 
123     FILLP_SIZE_T pktSize = pcb->fpcb.pktSize;
124     struct FillpFlowControl *flowControl = &pcb->fpcb.send.flowControl;
125     struct FtNetconn *conn = (struct FtNetconn *)pcb->conn;
126     struct FtSocket *sock = (struct FtSocket *)conn->sock;
127 
128     if (sock == FILLP_NULL_PTR) {
129         FILLP_LOGERR("NULL Pointer");
130         return;
131     }
132 
133     flowControl->sendTime = inst->curTime;
134     bytesExpected = SpungeCalExpectedBytes(&sendPktNum, pcb, sock, flowControl, detaTime);
135 
136     /* flow control alg may need to change bytesExpected for the this send cycle a according to current status */
137     if (pcb->fpcb.algFuncs.updateExpectSendBytes != FILLP_NULL_PTR) {
138         pcb->fpcb.algFuncs.updateExpectSendBytes(&pcb->fpcb, &bytesExpected);
139     }
140 
141     /* If BytesExpected less than pktSize, no need to send, just store the remainBytes */
142     if (bytesExpected >= pktSize) {
143         /* Make sure that the send bytes won't more than bytesExpected */
144         tmpBytes = (FILLP_UINT32)(bytesExpected - pktSize);
145 
146         sendBytes = FillpSendOne(&pcb->fpcb, tmpBytes, sendPktNum);
147         SpungeDoSendUpdate(pcb, sendBytes, bytesExpected);
148     } else {
149         pcb->fpcb.send.flowControl.remainBytes = bytesExpected;
150     }
151 
152     FILLP_LOGDBG("after_send_cycle: fillp_sock_id:%d expected bytes:%u sentBytes:%u remain:%u \r\n",
153         sock->index, sendBytes, tmpBytes, pcb->fpcb.send.flowControl.remainBytes);
154 
155     if ((pcb->fpcb.send.flowControl.remainBytes) || (!HLIST_EMPTY(&pcb->fpcb.send.unSendList)) ||
156         (pcb->fpcb.send.redunList.nodeNum) || (pcb->fpcb.send.unrecvList.nodeNum)) {
157         FillpEnableSendTimer(&pcb->fpcb);
158     } else {
159         FillpDisableSendTimer(&pcb->fpcb);
160     }
161 
162     return;
163 }
164 
SpungeDestroySockTableSocket(struct FtSocketTable * table,int tableIndex)165 static void SpungeDestroySockTableSocket(struct FtSocketTable *table, int tableIndex)
166 {
167     struct FtSocket *sock = FILLP_NULL_PTR;
168 
169     if (table == FILLP_NULL_PTR) {
170         return;
171     }
172 
173     sock = table->sockPool[tableIndex];
174     if (sock == FILLP_NULL_PTR) {
175         return;
176     }
177     (void)SYS_ARCH_RWSEM_DESTROY(&sock->sockConnSem);
178     (void)SYS_ARCH_SEM_DESTROY(&sock->connBlockSem);
179     (void)SYS_ARCH_SEM_DESTROY(&sock->sockCloseProtect);
180     (void)SYS_ARCH_SEM_DESTROY(&sock->epollTaskListLock);
181     SpungeFree(sock, SPUNGE_ALLOC_TYPE_CALLOC);
182     table->sockPool[tableIndex] = FILLP_NULL_PTR;
183 }
184 
185 /* SFT */
SpungeCreateSockTable(FILLP_UINT maxSock)186 struct FtSocketTable *SpungeCreateSockTable(FILLP_UINT maxSock)
187 {
188     int i;
189     struct FtSocketTable *table;
190     table = (struct FtSocketTable *)SpungeAlloc(1, sizeof(struct FtSocketTable), SPUNGE_ALLOC_TYPE_CALLOC);
191     if (table == FILLP_NULL_PTR) {
192         FILLP_LOGERR("Failed to allocate memory for socket table \r\n");
193         return FILLP_NULL_PTR;
194     }
195 
196     table->freeQueqe = FillpQueueCreate("sock_free_table", (FILLP_SIZE_T)maxSock, SPUNGE_ALLOC_TYPE_CALLOC);
197 
198     if (table->freeQueqe == FILLP_NULL_PTR) {
199         FILLP_LOGERR("Fail to create socket table free queue");
200         goto ERR_FAIL;
201     }
202 
203     FillpQueueSetConsSafe(table->freeQueqe, FILLP_TRUE);
204     FillpQueueSetProdSafe(table->freeQueqe, FILLP_TRUE);
205 
206     table->sockPool =
207         (struct FtSocket **)SpungeAlloc(maxSock, (FILLP_SIZE_T)sizeof(struct FtSocket *), SPUNGE_ALLOC_TYPE_CALLOC);
208     if (table->sockPool == FILLP_NULL_PTR) {
209         FILLP_LOGERR("Failed to allocate memory for sockPool of socket table");
210         goto ERR_FAIL;
211     }
212 
213     table->size = (FILLP_INT)maxSock;
214     SYS_ARCH_ATOMIC_SET(&table->used, 0);
215     for (i = 0; i < table->size; i++) {
216         table->sockPool[i] = FILLP_NULL_PTR;
217     }
218 
219     return table;
220 
221 ERR_FAIL:
222     if (table->freeQueqe != FILLP_NULL_PTR) {
223         FillpQueueDestroy(table->freeQueqe);
224         table->freeQueqe = FILLP_NULL_PTR;
225     }
226 
227     if (table->sockPool != FILLP_NULL_PTR) {
228         SpungeFree(table->sockPool, SPUNGE_ALLOC_TYPE_CALLOC);
229         table->sockPool = FILLP_NULL_PTR;
230     }
231 
232     SpungeFree(table, SPUNGE_ALLOC_TYPE_CALLOC);
233 
234     return FILLP_NULL_PTR;
235 }
236 
237 
238 /* SFT */
SpungeDestroySockTable(struct FtSocketTable * table)239 void SpungeDestroySockTable(struct FtSocketTable *table)
240 {
241     FILLP_INT i;
242 
243     for (i = 0; i < SYS_ARCH_ATOMIC_READ(&table->used); i++) {
244         SpungeDestroySockTableSocket(table, i);
245     }
246 
247     if (table->freeQueqe != FILLP_NULL_PTR) {
248         FillpQueueDestroy(table->freeQueqe);
249         table->freeQueqe = FILLP_NULL_PTR;
250     }
251 
252     if (table->sockPool != FILLP_NULL_PTR) {
253         SpungeFree(table->sockPool, SPUNGE_ALLOC_TYPE_CALLOC);
254         table->sockPool = FILLP_NULL_PTR;
255     }
256 
257     /* NULL check for table already done at the caller, and also in the above
258     check, table is dereferenced without validating, so need to check for NULL
259     again here before freeing it */
260     SpungeFree(table, SPUNGE_ALLOC_TYPE_CALLOC);
261 }
262 
SpungeInstMsgBoxInit(struct SpungeInstance * inst)263 static FILLP_INT SpungeInstMsgBoxInit(struct SpungeInstance *inst)
264 {
265     (void)SYS_ARCH_ATOMIC_SET(&inst->msgUsingCount, 0);
266     inst->msgBox = FillpQueueCreate("spunge_msg_box", g_spunge->resConf.maxMsgItemNum, SPUNGE_ALLOC_TYPE_MALLOC);
267     if (inst->msgBox == FILLP_NULL_PTR) {
268         FILLP_LOGERR("Init inst->msgBox Fail");
269         return ERR_NORES;
270     }
271 
272     FillpQueueSetConsSafe(inst->msgBox, FILLP_TRUE);
273     FillpQueueSetProdSafe(inst->msgBox, FILLP_TRUE);
274 
275     inst->msgPool = SpungeMsgCreatePool(FILLP_MSG_ITEM_INIT_NUM, (int)g_spunge->resConf.maxMsgItemNum);
276     if (inst->msgPool == FILLP_NULL_PTR) {
277         FILLP_LOGERR("create msg pool fail");
278         return ERR_NORES;
279     }
280 
281     DympSetConsSafe(inst->msgPool, FILLP_TRUE);
282     DympSetProdSafe(inst->msgPool, FILLP_TRUE);
283     return ERR_OK;
284 }
285 
SpungeInstSendInit(struct SpungeInstance * inst)286 static FILLP_INT SpungeInstSendInit(struct SpungeInstance *inst)
287 {
288     int i;
289 
290     /* To control on client sending */
291     inst->rateControl.connectionNum = FILLP_NULL;
292 
293     inst->rateControl.recv.maxRate = g_resource.flowControl.maxRecvRate;
294 
295     /* To control on server sending */
296     inst->rateControl.send.maxRate = g_resource.flowControl.maxRate;
297 
298     inst->thresdSemInited = FILLP_FALSE;
299     int ret = SYS_ARCH_SEM_INIT(&inst->threadSem, 1);
300     if (ret != FILLP_OK) {
301         FILLP_LOGERR("SYS_ARCH_SEM_INIT fails");
302         return ERR_NORES;
303     }
304     inst->thresdSemInited = FILLP_TRUE;
305 
306     inst->unsendItem =
307         SpungeAlloc(FILLP_UNSEND_BOX_LOOP_CHECK_BURST, sizeof(struct FillpPcbItem *), SPUNGE_ALLOC_TYPE_CALLOC);
308     if (inst->unsendItem == FILLP_NULL_PTR) {
309         FILLP_LOGERR("inst->unsendItem NULL");
310         return ERR_NORES;
311     }
312 
313     for (i = 0; i < FILLP_VLEN; i++) {
314         inst->tmpBuf[i] = SpungeAlloc(1, (sizeof(FILLP_CHAR) * FILLP_MAX_PKT_SIZE), SPUNGE_ALLOC_TYPE_MALLOC);
315         if (inst->tmpBuf[i] == FILLP_NULL_PTR) {
316             FILLP_LOGERR("inst->tmpBuf[%d] is NULL", i);
317             return ERR_NORES;
318         }
319     }
320 
321     HLIST_INIT(&inst->sendPcbList);
322     for (i = 0; i < FILLP_INST_UNSEND_BOX_NUM; i++) {
323         inst->unsendBox[i] = FillpQueueCreate("socket_send_box", FILLP_INST_UNSEND_BOX_SIZE, SPUNGE_ALLOC_TYPE_MALLOC);
324         if (inst->unsendBox[i] == FILLP_NULL_PTR) {
325             FILLP_LOGERR("inst->unsendBox[%d] is NULL", i);
326             return ERR_NORES;
327         }
328 
329         FillpQueueSetConsSafe(inst->unsendBox[i], FILLP_FALSE);
330         FillpQueueSetProdSafe(inst->unsendBox[i], FILLP_TRUE);
331     }
332     return ERR_OK;
333 }
334 
SpungeInstTimerInit(struct SpungeInstance * inst)335 static void SpungeInstTimerInit(struct SpungeInstance *inst)
336 {
337     inst->curTime = SYS_ARCH_GET_CUR_TIME_LONGLONG();
338 
339     (void)memset_s(&inst->macInfo, sizeof(FillpMacInfo), 0, sizeof(FillpMacInfo));
340     FillpMacTimerExpire(&inst->macInfo, inst->curTime);
341 
342     FillpTimingWheelInit(&inst->timingWheel, FILLP_TIMING_WHEEL_ACCURACY);
343 
344     /* Init the global timers */
345     FtGlobalTimerInit(inst);
346     SpungeInitTokenBucket(inst);
347 }
348 
SpungeThreadInit(struct SpungeInstance * inst)349 static FILLP_INT SpungeThreadInit(struct SpungeInstance *inst)
350 {
351     FILLP_THREAD threadId;
352 
353     inst->mainThreadParam.func = SpungeInstanceMainThread;
354     inst->mainThreadParam.param = inst;
355     inst->minSendInterval = FILLP_MAX_SEND_INTERVAL;
356 
357     inst->hasInited = FILLP_TRUE;
358 
359     (void)FILLP_SYS_START_NEWTHREAD(&inst->mainThreadParam, &threadId);
360 
361     return ERR_OK;
362 }
363 
SpungeInstInit(struct SpungeInstance * inst)364 FILLP_INT SpungeInstInit(struct SpungeInstance *inst)
365 {
366     FILLP_INT err;
367 
368     if (inst == FILLP_NULL_PTR) {
369         FILLP_LOGERR("Init inst null");
370         return ERR_NULLPTR;
371     }
372 
373     if (inst->hasInited) {
374         FILLP_LOGERR("Stack has been inited");
375         return ERR_OK;
376     }
377 
378     err = SpungeInstMsgBoxInit(inst);
379     if (err != ERR_OK) {
380         goto FAIL;
381     }
382 
383     HLIST_INIT(&inst->osSockist);
384     HLIST_INIT(&inst->pcbList.list);
385 
386     err = SpungeInstSendInit(inst);
387     if (err != ERR_OK) {
388         goto FAIL;
389     }
390 
391     SpungeInstTimerInit(inst);
392 
393     inst->cleanseDataCtr = 0;
394 
395     err = SpungeThreadInit(inst);
396     if (err != ERR_OK) {
397         goto FAIL;
398     }
399 
400     return ERR_OK;
401 
402 FAIL:
403     SpungeFreeInstanceResource(inst);
404     return err;
405 }
406 
SpungeSysCallRegisted(void)407 static FILLP_INT SpungeSysCallRegisted(void)
408 {
409     FILLP_INT ret;
410 
411     ret = FillpValidateFuncPtr(&g_fillpOsBasicLibFun, sizeof(FillpSysLibBasicCallbackFuncSt));
412     if (ret != ERR_OK) {
413         SET_ERRNO(FILLP_EINVAL);
414         return ret;
415     }
416 
417     ret = FillpValidateFuncPtr(&g_fillpOsSemLibFun, sizeof(FillpSysLibSemCallbackFuncSt));
418     if (ret != ERR_OK) {
419         SET_ERRNO(FILLP_EINVAL);
420         return ret;
421     }
422 
423     ret = FillpValidateFuncPtr(&g_fillpOsSocketLibFun, sizeof(FillpSysLibSockCallbackFuncSt));
424     if (ret != ERR_OK) {
425         SET_ERRNO(FILLP_EINVAL);
426         return ret;
427     }
428 
429     return ERR_OK;
430 }
431 
FtFreeEpollResource(void)432 static void FtFreeEpollResource(void)
433 {
434     if (g_spunge->epitemPool != FILLP_NULL_PTR) {
435         DympDestroyPool(g_spunge->epitemPool);
436         g_spunge->epitemPool = FILLP_NULL_PTR;
437     }
438 
439     if (g_spunge->eventpollPool != FILLP_NULL_PTR) {
440         DympDestroyPool(g_spunge->eventpollPool);
441         g_spunge->eventpollPool = FILLP_NULL_PTR;
442     }
443     return;
444 }
445 
FtAllocateEpollResource(void)446 static FILLP_INT FtAllocateEpollResource(void)
447 {
448     DympoolItemOperaCbSt itemOperaCb = {FILLP_NULL_PTR, FILLP_NULL_PTR};
449     g_spunge->epitemPool = DympCreatePool(FILLP_EPOLL_ITEM_INIT_NUM, (int)g_spunge->resConf.maxEpollEventNum,
450         sizeof(struct EpItem), FILLP_TRUE, &itemOperaCb);
451     if (g_spunge->epitemPool == FILLP_NULL_PTR) {
452         FILLP_LOGERR("create mem pool for g_spunge->epitemPool failed");
453         return ERR_NORES;
454     }
455     DympSetConsSafe(g_spunge->epitemPool, FILLP_TRUE);
456     DympSetProdSafe(g_spunge->epitemPool, FILLP_TRUE);
457 
458     g_spunge->eventpollPool = DympCreatePool(FILLP_EPOLL_ITEM_INIT_NUM, (int)g_spunge->resConf.maxEpollEventNum,
459         sizeof(struct EventPoll), FILLP_TRUE, &itemOperaCb);
460     if (g_spunge->eventpollPool == FILLP_NULL_PTR) {
461         FtFreeEpollResource();
462         FILLP_LOGERR("create Dym pool for g_spunge->eventpollPool failed");
463         return ERR_NORES;
464     }
465     DympSetConsSafe(g_spunge->eventpollPool, FILLP_TRUE);
466     DympSetProdSafe(g_spunge->eventpollPool, FILLP_TRUE);
467     return ERR_OK;
468 }
469 
SpungeAllocInstRes(void)470 static int SpungeAllocInstRes(void)
471 {
472     FILLP_UINT i;
473     FILLP_UINT j;
474     FILLP_INT err;
475 
476     for (i = 0; i < g_spunge->insNum; i++) {
477         (void)memset_s(&g_spunge->instPool[i], sizeof(struct SpungeInstance), 0, sizeof(struct SpungeInstance));
478         g_spunge->instPool[i].instIndex = (FILLP_INT)i;
479         err = SpungeInstInit(&g_spunge->instPool[i]);
480         if (err == ERR_OK) {
481             continue;
482         }
483         FILLP_LOGERR("SpungeInstInit failed :: Instance number :: %u\r\n", i);
484 
485         /* Release instances which are created success */
486         if (i > 0) {
487             g_spunge->insNum = i;
488 
489             g_spunge->hasDeinitBlked = FILLP_TRUE;
490             for (j = 0; j < g_spunge->insNum; j++) {
491                 g_spunge->instPool[j].waitTobeCoreKilled = FILLP_TRUE;
492             }
493 
494             /* After this step g_spunge will be freed, it should not be accessed, caller has check for NULL pointer
495                 before accessing, so it will not cause problem */
496             (void)SYS_ARCH_SEM_WAIT(&g_resDeinitSem);
497         }
498         return err;
499     }
500 
501     return ERR_OK;
502 }
503 
SpungeFreeInstSendRecv(struct SpungeInstance * inst)504 static void SpungeFreeInstSendRecv(struct SpungeInstance *inst)
505 {
506     int j;
507     if (inst->thresdSemInited) {
508         (void)SYS_ARCH_SEM_DESTROY(&inst->threadSem);
509         inst->thresdSemInited = FILLP_FALSE;
510     }
511 
512     for (j = 0; j < FILLP_INST_UNSEND_BOX_NUM; j++) {
513         if (inst->unsendBox[j] != FILLP_NULL_PTR) {
514             FillpQueueDestroy(inst->unsendBox[j]);
515             inst->unsendBox[j] = FILLP_NULL_PTR;
516         } else {
517             break;
518         }
519     }
520     if (inst->unsendItem != FILLP_NULL_PTR) {
521         SpungeFree(inst->unsendItem, SPUNGE_ALLOC_TYPE_CALLOC);
522         inst->unsendItem = FILLP_NULL_PTR;
523     }
524 
525     for (j = 0; j < FILLP_VLEN; j++) {
526         if (inst->tmpBuf[j] == FILLP_NULL_PTR) {
527             break;
528         }
529         SpungeFree(inst->tmpBuf[j], SPUNGE_ALLOC_TYPE_MALLOC);
530         inst->tmpBuf[j] = FILLP_NULL_PTR;
531     }
532 }
533 
SpungeFreeInstanceResource(struct SpungeInstance * inst)534 void SpungeFreeInstanceResource(struct SpungeInstance *inst)
535 {
536     if (inst == FILLP_NULL_PTR) {
537         return;
538     }
539 
540     if (inst->msgBox != FILLP_NULL_PTR) {
541         FillpQueueDestroy(inst->msgBox);
542         inst->msgBox = FILLP_NULL_PTR;
543     }
544 
545     while (SYS_ARCH_ATOMIC_READ(&inst->msgUsingCount) > 0) {
546         FILLP_SLEEP_MS(1);
547     }
548 
549     if (inst->msgPool != FILLP_NULL_PTR) {
550         SpungeMsgPoolDestroy(inst->msgPool);
551         inst->msgPool = FILLP_NULL_PTR;
552     }
553 
554     SpungeFreeInstSendRecv(inst);
555 
556     inst->hasInited = FILLP_FALSE;
557 }
558 
FtGetSpungeRes(struct SpungeResConf * resConf)559 static void FtGetSpungeRes(struct SpungeResConf *resConf)
560 {
561     (void)memset_s(resConf, sizeof(struct SpungeResConf), 0, sizeof(struct SpungeResConf));
562 
563     resConf->maxInstNum = (FILLP_UINT)UTILS_MIN(g_resource.common.maxInstNum, MAX_SPUNGEINSTANCE_NUM);
564     resConf->maxSockNum = g_resource.common.maxSockNum;
565     resConf->maxConnNum = g_resource.common.maxConnNum;
566     resConf->maxMsgItemNum    = ((FILLP_UINT)g_resource.common.maxSockNum * FILLP_SPUNGE_EVENTG_MULT_NUM);
567     resConf->maxTimerItemNum  = ((FILLP_UINT)g_resource.common.maxSockNum * FILLP_ITEM_MULT_NUM);
568     resConf->maxEpollEventNum = (FILLP_UINT)(g_resource.common.maxSockNum * FILLP_ITEM_MULT_NUM);
569     resConf->maxEpollItemNum  = (FILLP_UINT)(g_resource.common.maxSockNum * FILLP_ITEM_MULT_NUM);
570 }
571 
FtGlobalTimerInit(struct SpungeInstance * inst)572 void FtGlobalTimerInit(struct SpungeInstance *inst)
573 {
574     /* Initialize the Fairness timer */
575     inst->fairTimerNode.cbNode.cb = SpinstLoopFairnessChecker;
576     inst->fairTimerNode.cbNode.arg = (void *)inst;
577     inst->fairTimerNode.interval = SPUNGE_WEIGHT_ADJUST_INTERVAL;
578     FillpTimingWheelAddTimer(&inst->timingWheel, (SYS_ARCH_GET_CUR_TIME_LONGLONG() + inst->fairTimerNode.interval),
579         &inst->fairTimerNode);
580     /* Initialize the MAC timer */
581     inst->macTimerNode.cbNode.cb = SpinstLoopMacTimerChecker;
582     inst->macTimerNode.cbNode.arg = (void *)inst;
583     inst->macTimerNode.interval = FILLP_KEY_REFRESH_TIME;
584     FillpTimingWheelAddTimer(&inst->timingWheel, (SYS_ARCH_GET_CUR_TIME_LONGLONG() + inst->macTimerNode.interval),
585         &inst->macTimerNode);
586     return;
587 }
588 
SpungeCheckCallbacks(void)589 static FILLP_INT SpungeCheckCallbacks(void)
590 {
591     FILLP_INT ret = SpungeSysCallRegisted();
592     return ret;
593 }
594 
FtInitGlobalUdpIo(void)595 static FILLP_INT FtInitGlobalUdpIo(void)
596 {
597     g_udpIo.readSet = FILLP_FD_CREATE_FD_SET();
598     if (g_udpIo.readSet == FILLP_NULL_PTR) {
599         FILLP_LOGERR("Malloc g_udpIo.readSet failed");
600         return ERR_NORES;
601     }
602 
603     g_udpIo.readableSet = FILLP_FD_CREATE_FD_SET();
604     if (g_udpIo.readableSet == FILLP_NULL_PTR) {
605         FILLP_LOGERR("Malloc g_udpIo.readableSet failed");
606         return ERR_NORES;
607     }
608 
609     HLIST_INIT(&g_udpIo.listenPcbList);
610 
611     return ERR_OK;
612 }
613 
FtInitGlobalInstPool(void)614 static FILLP_INT FtInitGlobalInstPool(void)
615 {
616     g_spunge->insNum = g_spunge->resConf.maxInstNum;
617     g_spunge->instPool = (struct SpungeInstance *)SpungeAlloc(g_spunge->insNum, sizeof(struct SpungeInstance),
618         SPUNGE_ALLOC_TYPE_MALLOC);
619     if (g_spunge->instPool == FILLP_NULL_PTR) {
620         FILLP_LOGERR("Malloc g_spunge->instPool failed");
621         return ERR_NORES;
622     }
623 
624     return ERR_OK;
625 }
626 
FtInitGlobalSockTable(void)627 static FILLP_INT FtInitGlobalSockTable(void)
628 {
629     g_spunge->sockTable = SpungeCreateSockTable(g_spunge->resConf.maxSockNum);
630     if (g_spunge->sockTable == FILLP_NULL_PTR) {
631         FILLP_LOGERR("Malloc g_spunge->sockTable failed\r\n");
632         return ERR_NORES;
633     }
634     return ERR_OK;
635 }
636 
FtInitGlobalNetPool(void)637 static FILLP_INT FtInitGlobalNetPool(void)
638 {
639     FILLP_UINT netPoolInitSize = FILLP_CONN_ITEM_INIT_NUM;
640 
641     if (netPoolInitSize > g_spunge->resConf.maxConnNum) {
642         netPoolInitSize = g_spunge->resConf.maxConnNum;
643     }
644 
645     DympoolItemOperaCbSt itemOperaCb = {FILLP_NULL_PTR, FILLP_NULL_PTR};
646     g_spunge->netPool = DympCreatePool((FILLP_INT)netPoolInitSize, (int)g_spunge->resConf.maxConnNum,
647         sizeof(struct FtNetconn), FILLP_TRUE, &itemOperaCb);
648     if (g_spunge->netPool == FILLP_NULL_PTR) {
649         FILLP_LOGERR("Malloc g_spunge->netPool failed\r\n");
650         return ERR_NORES;
651     }
652 
653     DympSetConsSafe(g_spunge->netPool, FILLP_TRUE);
654     DympSetProdSafe(g_spunge->netPool, FILLP_FALSE);
655     return ERR_OK;
656 }
657 
FtFreeGlobalUdpIo(void)658 static void FtFreeGlobalUdpIo(void)
659 {
660     if (g_udpIo.readSet != FILLP_NULL_PTR) {
661         FILLP_FD_DESTROY_FD_SET(g_udpIo.readSet);
662         g_udpIo.readSet = FILLP_NULL_PTR;
663     }
664 
665     if (g_udpIo.readableSet != FILLP_NULL_PTR) {
666         FILLP_FD_DESTROY_FD_SET(g_udpIo.readableSet);
667         g_udpIo.readableSet = FILLP_NULL_PTR;
668     }
669 }
670 
FtFreeGlobalSpunge(void)671 static void FtFreeGlobalSpunge(void)
672 {
673     if (g_spunge == FILLP_NULL_PTR) {
674         return;
675     }
676     g_spunge->hasInited = FILLP_FALSE;
677 
678     FtFreeEpollResource();
679 
680     if (g_spunge->sockTable != FILLP_NULL_PTR) {
681         SpungeDestroySockTable(g_spunge->sockTable);
682         g_spunge->sockTable = FILLP_NULL_PTR;
683     }
684 
685     if (g_spunge->netPool != FILLP_NULL_PTR) {
686         DympDestroyPool(g_spunge->netPool);
687         g_spunge->netPool = FILLP_NULL_PTR;
688     }
689 
690     if (g_spunge->instPool != FILLP_NULL_PTR) {
691         SpungeFree(g_spunge->instPool, SPUNGE_ALLOC_TYPE_MALLOC);
692         g_spunge->instPool = FILLP_NULL_PTR;
693     }
694 
695     FtFreeGlobalUdpIo();
696 
697     SpungeFree(g_spunge, SPUNGE_ALLOC_TYPE_MALLOC);
698     g_spunge = FILLP_NULL_PTR;
699 }
700 
FtModuleInit(void)701 static FILLP_INT FtModuleInit(void)
702 {
703     FILLP_INT err;
704     int ret;
705 
706     err = FtInitGlobalUdpIo();
707     if (err != ERR_OK) {
708         return err;
709     }
710 
711     err = FtInitGlobalInstPool();
712     if (err != ERR_OK) {
713         return err;
714     }
715 
716     err = FtInitGlobalSockTable();
717     if (err != ERR_OK) {
718         return err;
719     }
720 
721     err = FtInitGlobalNetPool();
722     if (err != ERR_OK) {
723         return err;
724     }
725 
726     err = FtAllocateEpollResource();
727     if (err != ERR_OK) {
728         FILLP_LOGERR("Alloc epoll resource fail");
729         return err;
730     }
731 
732     ret = SYS_ARCH_SEM_INIT(&g_resDeinitSem, 0);
733     if (ret != FILLP_OK) {
734         FILLP_LOGERR("deinit sem init failed. ");
735         return ERR_NORES;
736     }
737 
738     err = SpungeAllocInstRes();
739     if (err != ERR_OK) {
740         FILLP_LOGERR("Spunge init instances resource fail");
741         (void)SYS_ARCH_SEM_DESTROY(&g_resDeinitSem);
742         return err;
743     }
744     return ERR_OK;
745 }
746 
FtInit(void)747 FILLP_INT FtInit(void)
748 {
749     FILLP_INT err;
750 
751     FILLP_LOGBUTT("init stack");
752     if (g_spunge != FILLP_NULL_PTR) {
753         FILLP_LOGERR("Init already done");
754         return ERR_STACK_ALREADY_INITIALD;
755     }
756 
757     if (SpungeCheckCallbacks() != ERR_OK) {
758         FILLP_LOGERR("User has not registered system callback functions \r\n");
759         return ERR_ADP_SYS_CALLBACK_NOT_REGISTERED;
760     }
761 
762     if (SYS_ARCH_INIT() != ERR_OK) {
763         FILLP_LOGERR("SYS_ARCH_INIT ssp failed \r\n");
764         return ERR_NORES;
765     }
766 
767     g_spunge = (struct Spunge *)SpungeAlloc(1, sizeof(struct Spunge), SPUNGE_ALLOC_TYPE_MALLOC);
768     if (g_spunge == FILLP_NULL_PTR) {
769         FILLP_LOGERR("Alloc g_spunge fail");
770         return ERR_NORES;
771     }
772 
773     (void)memset_s(g_spunge, sizeof(struct Spunge), FILLP_NULL_NUM, sizeof(struct Spunge));
774 
775     FtGetSpungeRes(&g_spunge->resConf);
776 
777     err = FtModuleInit();
778     if (err != ERR_OK) {
779         goto ERR_FAIL;
780     }
781 
782     FILLP_LOGBUTT("FillP_init: Spunge mem_zone alloc finished!");
783 
784     FILLP_LOGBUTT("FillP Core init success!");
785     FILLP_LOGBUTT("version " FILLP_VERSION);
786 
787     g_spunge->traceFlag = 0;
788     g_spunge->hasInited = FILLP_TRUE;
789     FILLP_LOGBUTT("Init success");
790     return ERR_OK;
791 
792 ERR_FAIL:
793     FtFreeGlobalSpunge();
794 
795     FILLP_LOGERR("Init fail,clean up");
796     return err;
797 }
798 
799 
800 /* starts from LSB bit position, cnt starts from 0 */
801 #define SPUNGE_SET_BIT(num, pos) ((num) |= (1U << (pos)))
802 
SpungZeroInstance(void)803 static void SpungZeroInstance(void)
804 {
805     FILLP_BOOL hasDeinitBlked = g_spunge->hasDeinitBlked;
806     /* This logic can work for 32 instance in future need to change if more number of
807         instance are supported */
808     /* instance 0 is already closed so mark in bit field. */
809     FILLP_UINT32 instBitClosed = 1;
810     FILLP_UINT32 i;
811     FILLP_UINT32 instAllBit = (FILLP_UINT32)((1U << g_spunge->insNum) - 1);
812 
813     /* In case of blocking FtDestroy 0th instance should post semaphore after all instance threads are exited,
814         and all resources are release. In case on non blocking FtDestroy 0th instance should free free all
815         reasource no need to post semaphore and need to release semaphore also
816         Wait for other instance threads to release respective resource and exit thread */
817     while (instBitClosed != instAllBit) {
818         FILLP_SLEEP_MS(1);
819         for (i = 1; i < g_spunge->insNum; i++) {
820             if (g_spunge->instPool[i].hasInited == 0) {
821                 /* Mark as closed */
822                 SPUNGE_SET_BIT(instBitClosed, i);
823             }
824         }
825     }
826 
827     /* Free all global resource and reset parameters */
828     InitGlobalResourceDefault();
829     InitGlobalAppResourceDefault();
830     FtFreeGlobalSpunge();
831     FillpSysOsDeinit();
832     FillpDfxDoEvtCbSet(FILLP_NULL_PTR, FILLP_NULL_PTR);
833 
834     /* Signal or release deinit sem */
835     if (hasDeinitBlked) {
836         (void)SYS_ARCH_SEM_POST(&g_resDeinitSem);
837     } else {
838         (void)SYS_ARCH_SEM_DESTROY(&g_resDeinitSem);
839     }
840 }
841 
SpungeDestroyInstance(struct SpungeInstance * inst)842 void SpungeDestroyInstance(struct SpungeInstance *inst)
843 {
844     FILLP_INT instIdx = inst->instIndex;
845 
846     SpungeFreeInstanceResource(inst);
847 
848     if (instIdx == 0) {
849         SpungZeroInstance();
850     }
851 
852     FILLP_LOGERR("Destroy finish index: %d", instIdx);
853 }
854 
FtDestroyInner(FILLP_INT block)855 static void FtDestroyInner(FILLP_INT block)
856 {
857     FILLP_UINT i;
858     FILLP_LOGERR("Destroy stack start, block(%d)", block);
859 
860     if ((g_spunge == FILLP_NULL_PTR) || (!g_spunge->hasInited)) {
861         return;
862     }
863 
864     g_spunge->hasDeinitBlked = (FILLP_BOOL)block;
865 
866     /*
867      * should check g_spunge again,
868      * because the g_spunge may be freed in main thread after all the inst is freed
869      */
870     for (i = 0; g_spunge != FILLP_NULL_PTR && i < g_spunge->insNum; i++) {
871         (void)SYS_ARCH_SEM_WAIT(&g_spunge->instPool[i].threadSem);
872         g_spunge->instPool[i].waitTobeCoreKilled = FILLP_TRUE;
873         (void)SYS_ARCH_SEM_POST(&g_spunge->instPool[i].threadSem);
874     }
875 
876     if ((block) && (SYS_ARCH_SEM_WAIT(&g_resDeinitSem) == 0)) {
877         (void)SYS_ARCH_SEM_DESTROY(&g_resDeinitSem);
878     }
879 
880     FILLP_LOGERR("Destroy finished");
881     return;
882 }
883 
FtDestroy(void)884 void FtDestroy(void)
885 {
886     FtDestroyInner(FILLP_TRUE);
887 }
888 
FtDestroyNonblock(void)889 void FtDestroyNonblock(void)
890 {
891     FtDestroyInner(FILLP_FALSE);
892 }
893 
SpungeHandleMsgCycle(struct SpungeInstance * inst)894 void SpungeHandleMsgCycle(struct SpungeInstance *inst)
895 {
896     struct SpungeMsg *msg = FILLP_NULL_PTR;
897     FILLP_INT ret;
898     FILLP_ULONG i;
899 
900     FILLP_ULONG boxItems = FillpQueueValidOnes(inst->msgBox);
901     if ((boxItems == 0) || (boxItems > inst->msgBox->size)) {
902         boxItems = (FILLP_ULONG)inst->msgBox->size;
903     }
904 
905     for (i = 0; i < boxItems; i++) {
906         ret = FillpQueuePop(inst->msgBox, (void *)&msg, 1);
907         if (ret <= 0) {
908             break;
909         }
910         if (msg->msgType < MSG_TYPE_END) {
911             g_msgHandler[msg->msgType](msg->value, inst);
912         }
913         if (!msg->block) {
914             DympFree(msg);
915         } else {
916             (void)SYS_ARCH_SEM_POST(&msg->syncSem);
917         }
918     }
919 
920     return;
921 }
922 
SpungeLoopCheckUnsendBox(struct SpungeInstance * inst)923 static void SpungeLoopCheckUnsendBox(struct SpungeInstance *inst)
924 {
925     int j;
926     FillpQueue *boxQueue = inst->unsendBox[0];
927     struct FillpPcbItem **item = inst->unsendItem;
928     struct FtNetconn *netconn = FILLP_NULL_PTR;
929     struct FillpPcb *fpcb = FILLP_NULL_PTR;
930     FILLP_INT count;
931 
932     count = FillpQueuePop(boxQueue, (void *)item, FILLP_UNSEND_BOX_LOOP_CHECK_BURST);
933     if (count <= 0) {
934         return;
935     }
936 
937     for (j = 0; j < count; j++) {
938         netconn = (struct FtNetconn *)item[j]->netconn;
939         if (netconn == FILLP_NULL_PTR) {
940             FillpFreeBufItem(item[j]);
941             continue;
942         }
943 
944         fpcb = &(netconn->pcb->fpcb);
945         HlistAddTail(&fpcb->send.unSendList, &item[j]->unsendNode);
946         (void)FillpFrameAddItem(&fpcb->frameHandle, item[j]);
947         FillpPcbSendFc(fpcb);
948     }
949 }
950 
SpungeDelay(struct SpungeInstance * inst,FILLP_LLONG curTime)951 static FILLP_BOOL SpungeDelay(struct SpungeInstance *inst, FILLP_LLONG curTime)
952 {
953     FILLP_LLONG timePass = curTime - inst->curTime;
954 
955     FILLP_LLONG minSendInterval = (FILLP_LLONG)((FILLP_ULLONG)inst->minSendInterval >> FILLP_TIME_PRECISION);
956     if ((timePass > minSendInterval) && (timePass > FILLP_MINIMUM_SELECT_TIME)) {
957         minSendInterval = 0;
958     } else if (minSendInterval < FILLP_MINIMUM_SELECT_TIME) {
959         minSendInterval = FILLP_MINIMUM_SELECT_TIME;
960     }
961 
962     if (SYS_ARCH_SEM_POST(&inst->threadSem)) {
963         FILLP_LOGWAR("sem wait failed");
964     }
965     if (inst->pcbList.list.size > 0) {
966         (void)SysioSelect((FILLP_INT)minSendInterval);
967     } else {
968         FILLP_SLEEP_MS((FILLP_UINT)FILLP_UTILS_US2MS(minSendInterval));
969     }
970     if (SYS_ARCH_SEM_WAIT(&inst->threadSem)) {
971         FILLP_LOGWAR("sem wait failed");
972     }
973     return FILLP_TRUE;
974 }
975 
SpungeMainDelay(struct SpungeInstance * inst)976 static FILLP_BOOL SpungeMainDelay(struct SpungeInstance *inst)
977 {
978     FILLP_BOOL isTimeout = FILLP_TRUE;
979     FILLP_LLONG curTime = SYS_ARCH_GET_CUR_TIME_LONGLONG();
980 
981     if (g_resource.common.fullCpuEnable && (inst->stb.tbFpcbLists.size > 0)) {
982         (void)SysioSelect(0);
983         inst->curTime = curTime;
984         return isTimeout;
985     }
986 
987     if (curTime < inst->curTime) {
988         FILLP_LOGERR("System Time has been changed to past value");
989         return isTimeout;
990     }
991     isTimeout = SpungeDelay(inst, curTime);
992     curTime = SYS_ARCH_GET_CUR_TIME_LONGLONG();
993     if (curTime < inst->curTime) {
994         FILLP_LOGERR("System Time has been changed to past value\r\n");
995         return isTimeout;
996     }
997 
998     inst->curTime = curTime;
999     inst->minSendInterval = FILLP_MAX_SEND_INTERVAL;
1000     return isTimeout;
1001 }
1002 
FillpServerRecvRateAdjustment(struct SpungeInstance * inst,FILLP_UINT32 calcRecvTotalRate,FILLP_INT realRecvConn,FILLP_UINT32 * connRecvCalLimit)1003 void FillpServerRecvRateAdjustment(struct SpungeInstance *inst, FILLP_UINT32 calcRecvTotalRate, FILLP_INT realRecvConn,
1004     FILLP_UINT32 *connRecvCalLimit)
1005 {
1006     static FILLP_UINT8 recvStableState = 0;
1007     static FILLP_UINT32 prevRecvTotRate = 0;
1008     static const FILLP_UINT32 maxCalcRecvRate = 0;
1009 
1010     if ((calcRecvTotalRate > (RECV_RATE_PAR_LOW * prevRecvTotRate)) &&
1011         (calcRecvTotalRate < (RECV_RATE_PAT_HIGH * prevRecvTotRate))) {
1012         if (recvStableState < RECV_STATE_THRESHOLD) {
1013             recvStableState++;
1014         }
1015     } else {
1016         if (recvStableState > 0) {
1017             recvStableState--;
1018         }
1019     }
1020 
1021     prevRecvTotRate = calcRecvTotalRate;
1022 
1023     /* Give some space for every connection to grow, since if the network
1024     conditions are varying for every connection */
1025     /* If the sum of rate of all connections is less than the historical max
1026     recv rate, then allow to grow */
1027     if (recvStableState < FILLP_FC_STABLESTATE_VAL_2) {
1028         calcRecvTotalRate = (FILLP_UINT32)(calcRecvTotalRate * FILL_FC_SEND_RATE_TOTAL_1);
1029     } else if (calcRecvTotalRate < (maxCalcRecvRate * FILLP_FC_SEND_RATE_MULTIPLE_FACTOR)) {
1030         /* Give the enough room for the client to grow the bandwidth */
1031         calcRecvTotalRate = maxCalcRecvRate;
1032     } else {
1033         /* Give 5% room for connections to grow, so that it can achieve the max
1034             network goodput */
1035         calcRecvTotalRate = (FILLP_UINT32)(calcRecvTotalRate * FILL_FC_SEND_RATE_TOTAL_2);
1036     }
1037 
1038     /* If the sum of received rate of all the connections is more than the configured
1039     rate, then limit it to configured rate.
1040     Rate should not exceed the configured value */
1041     if (calcRecvTotalRate > inst->rateControl.recv.maxRate) {
1042         calcRecvTotalRate = inst->rateControl.recv.maxRate;
1043     }
1044 
1045     if (realRecvConn > 0) {
1046         *connRecvCalLimit = (FILLP_UINT32)((double)calcRecvTotalRate / realRecvConn);
1047     } else {
1048         /* If there are no connections which are active and connected, then set
1049         the rate limit for every connection to maximum limit */
1050         *connRecvCalLimit = inst->rateControl.recv.maxRate;
1051     }
1052     /* End of rate adjustment for Data receiving at server side */
1053     return;
1054 }
1055 
FillpServerSendRateAdjustment(struct SpungeInstance * inst,FILLP_UINT32 calcSendTotalRate,FILLP_INT realSendConn,FILLP_UINT32 * connSendCalLimit)1056 void FillpServerSendRateAdjustment(struct SpungeInstance *inst, FILLP_UINT32 calcSendTotalRate, FILLP_INT realSendConn,
1057     FILLP_UINT32 *connSendCalLimit)
1058 {
1059     static FILLP_UINT8 sendStableState = 0;
1060     static FILLP_UINT32 prevSendTotRate = 0;
1061     static const FILLP_UINT32 maxCalcSendRate = 0;
1062 
1063     if ((calcSendTotalRate > (FILLP_FC_PREV_ADJUSTMENT_RATE_LOW_VAL * prevSendTotRate)) &&
1064         (calcSendTotalRate < (FILLP_FC_PREV_ADJUSTMENT_RATE_HIGH_VAL * prevSendTotRate))) {
1065         if (sendStableState < FILLP_FC_STABLESTATE_VAL_1) {
1066             sendStableState++;
1067         }
1068     } else {
1069         if (sendStableState > 0) {
1070             sendStableState--;
1071         }
1072     }
1073 
1074     prevSendTotRate = calcSendTotalRate;
1075 
1076     /* Give some space for every connection to grow, since if the network
1077     conditions are varying for every connection */
1078     /* If the sum of rate of all connections is less than the historical max
1079     recv rate, then allow to grow */
1080     if (sendStableState < FILLP_FC_STABLESTATE_VAL_2) {
1081         calcSendTotalRate = (FILLP_UINT32)(calcSendTotalRate * FILL_FC_SEND_RATE_TOTAL_1);
1082     } else if (calcSendTotalRate < (maxCalcSendRate * FILLP_FC_SEND_RATE_MULTIPLE_FACTOR)) {
1083         calcSendTotalRate = maxCalcSendRate;
1084     } else {
1085         /* Give 5% room for connections to grow, so that it can achieve the max
1086             network goodput */
1087         calcSendTotalRate = (FILLP_UINT32)(calcSendTotalRate * FILL_FC_SEND_RATE_TOTAL_2);
1088     }
1089 
1090     /* If the sum of sending rate as acked by PACK for all the connections is
1091     more than the configured rate, then limit it to configured rate.
1092     Rate should not exceed the configured value */
1093     if (calcSendTotalRate > inst->rateControl.send.maxRate) {
1094         calcSendTotalRate = inst->rateControl.send.maxRate;
1095     }
1096 
1097     if (realSendConn > 0) {
1098         *connSendCalLimit = (FILLP_UINT32)((double)calcSendTotalRate / realSendConn);
1099     } else {
1100         /* If there are no connections which are active and connected, then set
1101         the rate limit for every connection to maximum limit */
1102         *connSendCalLimit = inst->rateControl.send.maxRate;
1103     }
1104 
1105     /* End of rate adjustment for Data receiving at server side */
1106     return;
1107 }
1108 
FillpCalculateFairness(struct SpungeInstance * inst)1109 void FillpCalculateFairness(struct SpungeInstance *inst)
1110 {
1111     struct HlistNode *pcbNode = FILLP_NULL_PTR;
1112     struct SpungePcb *pcb = FILLP_NULL_PTR;
1113     FILLP_INT realSendConn = 0;
1114     FILLP_INT realRecvConn = 0;
1115     struct FtNetconn *conn = FILLP_NULL_PTR;
1116     FILLP_UINT8 connState;
1117     FILLP_UINT32 connRecvCalLimit;
1118     FILLP_UINT32 connSendCalLimit;
1119     FILLP_UINT32 calcRecvTotalRate = 0;
1120     FILLP_UINT32 calcSendTotalRate = 0;
1121 
1122     pcbNode = HLIST_FIRST(&inst->pcbList.list);
1123     while (pcbNode != FILLP_NULL_PTR) {
1124         pcb = SpungePcbListNodeEntry(pcbNode);
1125         pcbNode = pcbNode->next;
1126         conn = (struct FtNetconn *)pcb->conn;
1127 
1128         connState = NETCONN_GET_STATE(conn);
1129         if (connState > CONN_STATE_CONNECTED) {
1130             /* Connection state is greater than the connected state, so skip and continue */
1131             continue;
1132         }
1133 
1134         if (pcb->fpcb.statistics.pack.periodRecvRate > FILLP_DEFAULT_MIN_RATE) {
1135             realRecvConn++;
1136         }
1137 
1138         if (pcb->fpcb.statistics.pack.periodSendRate > FILLP_DEFAULT_MIN_RATE) {
1139             realSendConn++;
1140         }
1141 
1142         /* Calculate for Data receiving on server side */
1143         calcRecvTotalRate = calcRecvTotalRate + pcb->fpcb.statistics.pack.periodRecvRate;
1144 
1145         /* Calculate for Data sending from server side */
1146         calcSendTotalRate = calcSendTotalRate + pcb->fpcb.statistics.pack.periodAckByPackRate;
1147     }
1148 
1149     /* Calculation of rate adjustment for Data receiving at server side */
1150     FillpServerRecvRateAdjustment(inst, calcRecvTotalRate, realRecvConn, &connRecvCalLimit);
1151 
1152     /* Calculation of rate adjustment for Data Sending at server side */
1153     FillpServerSendRateAdjustment(inst, calcSendTotalRate, realSendConn, &connSendCalLimit);
1154 
1155     pcbNode = HLIST_FIRST(&inst->pcbList.list);
1156     while (pcbNode != FILLP_NULL_PTR) {
1157         pcb = SpungePcbListNodeEntry(pcbNode);
1158         pcbNode = pcbNode->next;
1159 
1160         /* The rate is set to all the connections irrespective of whether the
1161         connection is idle or not, so that, once the connection starts pumping
1162         the data, it will have enough window to start with.
1163         All this algorithm will adjust the rate of all the connections accordingly */
1164         pcb->rateControl.recv.curMaxRateLimitation = connRecvCalLimit;
1165         pcb->fpcb.recv.oppositeSetRate = pcb->rateControl.recv.curMaxRateLimitation;
1166 
1167         pcb->rateControl.send.curMaxRateLimitation = connSendCalLimit;
1168         pcb->fpcb.send.flowControl.sendRateLimit = pcb->rateControl.send.curMaxRateLimitation;
1169     }
1170 
1171     return;
1172 }
1173 
FillpKillCore(void)1174 FILLP_BOOL FillpKillCore(void)
1175 {
1176     FILLP_UINT16 i;
1177     for (i = 0; i < SYS_ARCH_ATOMIC_READ(&g_spunge->sockTable->used); i++) {
1178         struct FtSocket *sock = g_spunge->sockTable->sockPool[i];
1179 
1180         if ((sock->allocState != SOCK_ALLOC_STATE_FREE)) {
1181             return FILLP_FALSE;
1182         }
1183     }
1184 
1185     return FILLP_TRUE;
1186 }
1187 
FillpCheckPcbNackListToSend(void * args)1188 void FillpCheckPcbNackListToSend(void *args)
1189 {
1190     struct SpungePcb *pcb = ((struct FillpPcb *)args)->spcb;
1191     struct Hlist *nackList = FILLP_NULL_PTR;
1192     FILLP_LLONG curTime;
1193     struct HlistNode *node = FILLP_NULL_PTR;
1194     struct HlistNode *tmp = FILLP_NULL_PTR;
1195 
1196     if (pcb == FILLP_NULL_PTR) {
1197         FILLP_LOGERR("spunge_pcb is NULL");
1198         return;
1199     }
1200 
1201     nackList = &(pcb->fpcb.recv.nackList);
1202     if (nackList->size == 0) {
1203         return;
1204     }
1205 
1206     curTime = SYS_ARCH_GET_CUR_TIME_LONGLONG();
1207     node = HLIST_FIRST(nackList);
1208     while (node != FILLP_NULL_PTR) {
1209         struct FillpNackNode *nackNode = FillpNackNodeEntry(node);
1210         FILLP_LLONG timestamp = nackNode->timestamp;
1211         /*
1212         Commenting the timeout check again here, since the timing wheel
1213         will ensure that the time has elapsed before invoking this timeout
1214         function
1215         */
1216         if (curTime > timestamp) {
1217             FILLP_UINT32 startPktNum = nackNode->startPktNum;
1218             FILLP_UINT32 endPktNum = nackNode->endPktNum;
1219             FillpSendNack(&(pcb->fpcb), startPktNum, endPktNum);
1220             tmp = node;
1221             node = node->next;
1222             HlistDelete(nackList, tmp);
1223             SpungeFree(nackNode, SPUNGE_ALLOC_TYPE_CALLOC);
1224             nackNode = FILLP_NULL_PTR;
1225         } else {
1226             break;
1227         }
1228     }
1229 
1230     /* if all the delay NACKs are sent out, then stop the timer */
1231     if (nackList->size > 0) {
1232         FillpEnableDelayNackTimer((struct FillpPcb *)args);
1233     }
1234 }
1235 
SpinstLoopMacTimerChecker(void * p)1236 void SpinstLoopMacTimerChecker(void *p)
1237 {
1238     struct SpungeInstance *inst = (struct SpungeInstance *)p;
1239     /* Check server cookie Refresh */
1240     /* Duration is put as 30minutes, 1 Minute = 60,000 Milliseconds
1241      */
1242     if (((inst->curTime - (FILLP_LLONG)inst->macInfo.switchOverTime) > FILLP_KEY_REFRESH_TIME)) {
1243         FillpMacTimerExpire(&inst->macInfo, inst->curTime);
1244     }
1245     if (!FILLP_TIMING_WHEEL_IS_NODE_ENABLED(&inst->macTimerNode)) {
1246         FillpTimingWheelAddTimer(&inst->timingWheel, (SYS_ARCH_GET_CUR_TIME_LONGLONG() + inst->macTimerNode.interval),
1247             &inst->macTimerNode);
1248     }
1249 }
1250 
SpinstLoopFairnessChecker(void * p)1251 void SpinstLoopFairnessChecker(void *p)
1252 {
1253     struct SpungeInstance *inst = (struct SpungeInstance *)p;
1254 
1255     if ((g_resource.flowControl.supportFairness == FILLP_FAIRNESS_TYPE_EQUAL_WEIGHT) &&
1256         (inst->rateControl.connectionNum > 0)) {
1257         inst->rateControl.lastControlTime = inst->curTime;
1258         FillpCalculateFairness(inst);
1259     }
1260 
1261     if (!FILLP_TIMING_WHEEL_IS_NODE_ENABLED(&inst->fairTimerNode)) {
1262         FillpTimingWheelAddTimer(&inst->timingWheel, (SYS_ARCH_GET_CUR_TIME_LONGLONG() + inst->fairTimerNode.interval),
1263             &inst->fairTimerNode);
1264     }
1265 }
1266 
SpungeEnableTokenTimer(struct SpungeTokenBucke * stb)1267 void SpungeEnableTokenTimer(struct SpungeTokenBucke *stb)
1268 {
1269     if (!FILLP_TIMING_WHEEL_IS_NODE_ENABLED(&stb->tockenTimerNode)) {
1270         FillpTimingWheelAddTimer(&stb->inst->timingWheel, stb->tockenTimerNode.interval + stb->inst->curTime,
1271             &stb->tockenTimerNode);
1272     }
1273 }
1274 
SpungeDisableTokenTimer(struct SpungeTokenBucke * stb)1275 void SpungeDisableTokenTimer(struct SpungeTokenBucke *stb)
1276 {
1277     if (FILLP_TIMING_WHEEL_IS_NODE_ENABLED(&stb->tockenTimerNode)) {
1278         FillpTimingWheelDelTimer(stb->tockenTimerNode.wheel, &stb->tockenTimerNode);
1279     }
1280 }
1281 
SpungeTokenTimerCb(void * p)1282 void SpungeTokenTimerCb(void *p)
1283 {
1284     struct SpungeTokenBucke *stb = (struct SpungeTokenBucke *)p;
1285     struct SpungeInstance *inst = (struct SpungeInstance *)stb->inst;
1286     FILLP_ULLONG bitAdded;
1287     FILLP_UINT32 tokens;
1288 
1289     if (stb->rate != g_resource.flowControl.limitRate) {
1290         FILLP_UINT32 rate_bck = stb->rate;
1291         stb->rate = g_resource.flowControl.limitRate;
1292         stb->tokenCount = 0;
1293 
1294         if (stb->rate != 0) {
1295             stb->tockenTimerNode.interval = (FILLP_UINT32)(
1296                 ((FILLP_ULLONG)stb->maxPktSize * (FILLP_ULLONG)FILLP_FC_IN_KBPS) / (FILLP_ULLONG)stb->rate);
1297             if (stb->tockenTimerNode.interval > SPUNGE_TOKEN_TIMER_MAX_INTERVAL) {
1298                 stb->tockenTimerNode.interval = SPUNGE_TOKEN_TIMER_MAX_INTERVAL;
1299             }
1300         } else {
1301             stb->tockenTimerNode.interval = SPUNGE_TOKEN_TIMER_MAX_INTERVAL_RATE_ZERO;
1302         }
1303 
1304         FILLP_LOGINF("limite rate change from:%u to:%u, timer_interval:%u, maxPktSize:%u", rate_bck, stb->rate,
1305             stb->tockenTimerNode.interval, stb->maxPktSize);
1306     }
1307 
1308     bitAdded = (FILLP_ULLONG)(inst->curTime - stb->lastTime) * (FILLP_ULLONG)stb->rate;
1309     stb->lastTime = inst->curTime;
1310     tokens = (FILLP_UINT32)((bitAdded / (FILLP_ULLONG)FILLP_BPS_TO_KBPS) >> BIT_MOVE_CNT);
1311     if ((tokens < stb->maxPktSize) || (stb->tokenCount < stb->maxPktSize)) {
1312         stb->tokenCount += tokens;
1313     } else {
1314         stb->tokenCount = tokens;
1315     }
1316 
1317     if (stb->tockenTimerNode.interval != SPUNGE_TOKEN_TIMER_MAX_INTERVAL_RATE_ZERO) {
1318         SpungeEnableTokenTimer(stb);
1319     }
1320 }
1321 
SpungeItemRouteByToken(struct FillpPcbItem * item,struct FillpPcb * fpcb)1322 FILLP_INT SpungeItemRouteByToken(struct FillpPcbItem *item, struct FillpPcb *fpcb)
1323 {
1324     struct SpungeTokenBucke *stb;
1325     FILLP_INT ret = ERR_OK;
1326 
1327     stb = &fpcb->pcbInst->stb;
1328 
1329     if (stb->tockenTimerNode.interval == SPUNGE_TOKEN_TIMER_MAX_INTERVAL_RATE_ZERO) {
1330         SpungeTokenTimerCb(stb);
1331     }
1332     if ((stb->rate == 0) && (fpcb->send.itemWaitTokenLists.nodeNum == 0)) { /* no limit or limit -> nolimit */
1333         ret = FillpSendItem(item, fpcb);
1334     } else if ((stb->tokenCount >= (FILLP_UINT32)item->dataLen) && (fpcb->send.itemWaitTokenLists.nodeNum == 0)) {
1335         ret = FillpSendItem(item, fpcb);
1336         if (ret == ERR_OK) {
1337             stb->tokenCount -= (FILLP_UINT32)item->dataLen;
1338         }
1339     } else {
1340         if (SkipListInsert(&fpcb->send.itemWaitTokenLists, (void *)item, &item->skipListNode, FILLP_TRUE) != ERR_OK) {
1341             /* this can't be happen */
1342             FILLP_LOGERR("fillp_sock_id:%d Can't add item <%u,%u> to itemWaitTokenLists", FILLP_GET_SOCKET(fpcb)->index,
1343                 item->seqNum, item->dataLen);
1344             FillpFreeBufItem(item);
1345             (void)SYS_ARCH_ATOMIC_INC(&(FILLP_GET_SOCKET(fpcb)->sendEventCount), 1);
1346 #ifdef SOCK_SEND_SEM
1347             (void)SYS_ARCH_SEM_POST(&fpcb->send.sendSem);
1348 #endif /* SOCK_SEND_SEM */
1349         } else {
1350             stb->waitPktCount++;
1351         }
1352     }
1353 
1354     return ret;
1355 }
1356 
SpungeClearItemWaitTokenList(struct SpungeTokenBucke * stb)1357 static void SpungeClearItemWaitTokenList(struct SpungeTokenBucke *stb)
1358 {
1359     struct HlistNode *fpcbNode = HLIST_FIRST(&(stb->tbFpcbLists));
1360     struct FillpPcb *fpcb = FILLP_NULL_PTR;
1361     struct FillpPcbItem *item = FILLP_NULL_PTR;
1362 
1363     while (fpcbNode != FILLP_NULL_PTR) {
1364         fpcb = FillpPcbStbNodeEntry(fpcbNode);
1365         fpcbNode = fpcbNode->next;
1366         item = (struct FillpPcbItem *)SkipListPopValue(&(fpcb->send.itemWaitTokenLists));
1367         while (item != FILLP_NULL_PTR) {
1368             stb->waitPktCount--;
1369             /* here item should move to unrecvList, not directly send by udp,
1370                or the sendrate may be over the max send rate */
1371             if (SkipListInsert(&fpcb->send.unrecvList, (void *)item, &item->skipListNode, FILLP_TRUE) != ERR_OK) {
1372                 FillpFreeBufItem(item);
1373                 (void)SYS_ARCH_ATOMIC_INC(&(FILLP_GET_SOCKET(fpcb)->sendEventCount), 1);
1374 #ifdef SOCK_SEND_SEM
1375                 (void)SYS_ARCH_SEM_POST(&fpcb->send.sendSem);
1376 #endif /* SOCK_SEND_SEM */
1377             } else if (item->sendCount > 0) {
1378                 fpcb->send.unrecvRedunListBytes += item->dataLen;
1379             }
1380             item = (struct FillpPcbItem *)SkipListPopValue(&(fpcb->send.itemWaitTokenLists));
1381         }
1382 
1383         if (fpcb->send.unrecvList.nodeNum != 0) {
1384             FillpEnableSendTimer(fpcb);
1385         }
1386     }
1387 
1388     if (stb->waitPktCount != 0) {
1389         FILLP_LOGERR("waitPktCount %llu is not 0", stb->waitPktCount);
1390         stb->waitPktCount = 0;
1391     }
1392     stb->fpcbCur = HLIST_FIRST(&(stb->tbFpcbLists));
1393     return;
1394 }
1395 
SpungeCheckItemWaitTokenList(struct SpungeTokenBucke * stb)1396 void SpungeCheckItemWaitTokenList(struct SpungeTokenBucke *stb)
1397 {
1398     struct HlistNode *fpcbNode = FILLP_NULL_PTR;
1399     struct SkipListNode *node = FILLP_NULL_PTR;
1400     struct FillpPcb *fpcb = FILLP_NULL_PTR;
1401     struct FillpPcbItem *item = FILLP_NULL_PTR;
1402     FILLP_UINT32 fpcbCount = (FILLP_UINT32)stb->tbFpcbLists.size;
1403     FILLP_UINT32 waitListEmptyCount = 0;
1404     FILLP_INT err;
1405 
1406     if (stb->waitPktCount == 0) {
1407         return;
1408     }
1409 
1410     /* stb->rate change from !0 to 0, need to move all item form  itemWaitTokenLists to unSendList */
1411     if (stb->rate == 0) {
1412         SpungeClearItemWaitTokenList(stb);
1413         return;
1414     }
1415 
1416     fpcbNode = stb->fpcbCur;
1417     while ((stb->tokenCount > 0) && (stb->waitPktCount > 0) && (waitListEmptyCount < fpcbCount)) {
1418         if (fpcbNode == FILLP_NULL_PTR) {
1419             fpcbNode = HLIST_FIRST(&(stb->tbFpcbLists));
1420         }
1421 
1422         fpcb = FillpPcbStbNodeEntry(fpcbNode);
1423         node = SkipListGetPop(&(fpcb->send.itemWaitTokenLists));
1424         if (node == FILLP_NULL_PTR) {
1425             fpcbNode = fpcbNode->next;
1426             waitListEmptyCount++;
1427             continue;
1428         }
1429 
1430         item = (struct FillpPcbItem *)node->item;
1431         if (stb->tokenCount < item->dataLen) {
1432             break;
1433         }
1434 
1435         stb->waitPktCount--;
1436         (void)SkipListPopValue(&fpcb->send.itemWaitTokenLists);
1437         err = FillpSendItem(item, fpcb);
1438         if (err == ERR_OK) {
1439             stb->tokenCount -= (FILLP_UINT32)item->dataLen;
1440         }
1441         fpcbNode = fpcbNode->next;
1442         waitListEmptyCount = 0;
1443     }
1444 
1445     stb->fpcbCur = fpcbNode;
1446 }
1447 
SpungeInitTokenBucket(struct SpungeInstance * inst)1448 void SpungeInitTokenBucket(struct SpungeInstance *inst)
1449 {
1450     struct SpungeTokenBucke *stb = &inst->stb;
1451 
1452     stb->inst = inst;
1453     stb->lastTime = inst->curTime;
1454     stb->rate = g_resource.flowControl.limitRate;
1455     stb->waitPktCount = 0;
1456     stb->tokenCount = 0;
1457     stb->maxPktSize = (FILLP_UINT32)g_appResource.flowControl.pktSize;
1458 
1459     stb->fpcbCur = FILLP_NULL_PTR;
1460     HLIST_INIT(&(stb->tbFpcbLists));
1461 
1462     FILLP_TIMING_WHEEL_INIT_NODE(&stb->tockenTimerNode);
1463     stb->tockenTimerNode.cbNode.cb = SpungeTokenTimerCb;
1464     stb->tockenTimerNode.cbNode.arg = (void *)stb;
1465     if (stb->rate != 0) {
1466         stb->tockenTimerNode.interval =
1467             (FILLP_UINT32)(((FILLP_ULLONG)stb->maxPktSize * (FILLP_ULLONG)FILLP_FC_IN_KBPS) / (FILLP_ULLONG)stb->rate);
1468         if (stb->tockenTimerNode.interval > SPUNGE_TOKEN_TIMER_MAX_INTERVAL) {
1469             stb->tockenTimerNode.interval = SPUNGE_TOKEN_TIMER_MAX_INTERVAL;
1470         }
1471     } else {
1472         stb->tockenTimerNode.interval = SPUNGE_TOKEN_TIMER_MAX_INTERVAL_RATE_ZERO;
1473     }
1474 
1475     FILLP_LOGINF("limite rate:%u, timer_interval:%u, maxPktSize:%u", stb->rate, stb->tockenTimerNode.interval,
1476         stb->maxPktSize);
1477     SpungeEnableTokenTimer(stb);
1478 
1479     return;
1480 }
1481 
SpungeTokenBucketAddFpcb(struct FillpPcb * fpcb)1482 void SpungeTokenBucketAddFpcb(struct FillpPcb *fpcb)
1483 {
1484     struct SpungeTokenBucke *stb = FILLP_NULL_PTR;
1485 
1486     if ((fpcb == FILLP_NULL_PTR) || (fpcb->pcbInst == FILLP_NULL_PTR)) {
1487         return;
1488     }
1489 
1490     stb = &fpcb->pcbInst->stb;
1491     if (stb->maxPktSize < (FILLP_UINT32)fpcb->pktSize) {
1492         stb->maxPktSize = (FILLP_UINT32)fpcb->pktSize;
1493     }
1494 
1495     HLIST_INIT_NODE(&(fpcb->stbNode));
1496     HlistAddTail(&stb->tbFpcbLists, &(fpcb->stbNode));
1497     FILLP_LOGINF("fillp_sock_id:%d, maxPktSize:%u,"
1498         "limitRate:%u",
1499         FILLP_GET_SOCKET(fpcb)->index, stb->maxPktSize, stb->rate);
1500 }
1501 
SpungeTokenBucketDelFpcb(struct FillpPcb * fpcb)1502 void SpungeTokenBucketDelFpcb(struct FillpPcb *fpcb)
1503 {
1504     struct HlistNode *node = FILLP_NULL_PTR;
1505     struct SpungeTokenBucke *stb = FILLP_NULL_PTR;
1506 
1507     if ((fpcb == FILLP_NULL_PTR) || (fpcb->pcbInst == FILLP_NULL_PTR)) {
1508         return;
1509     }
1510 
1511     stb = &fpcb->pcbInst->stb;
1512     if ((stb->fpcbCur != FILLP_NULL_PTR) && (stb->fpcbCur == &(fpcb->stbNode))) {
1513         stb->fpcbCur = stb->fpcbCur->next;
1514     }
1515 
1516     node = HLIST_FIRST(&(stb->tbFpcbLists));
1517     while (node != FILLP_NULL_PTR) {
1518         if (&(fpcb->stbNode) == node) {
1519             stb->waitPktCount -= (FILLP_ULLONG)fpcb->send.itemWaitTokenLists.nodeNum;
1520             HlistDelete(&(stb->tbFpcbLists), node);
1521             FILLP_LOGINF("fillp_sock_id:%d, limitRate:%u", FILLP_GET_SOCKET(fpcb)->index, stb->rate);
1522             break;
1523         }
1524         node = node->next;
1525     }
1526 }
1527 
1528 /* Return 1 if still alive , or return 0 */
SpinstLoopCheckAlive(struct SpungeInstance * inst)1529 static int SpinstLoopCheckAlive(struct SpungeInstance *inst)
1530 {
1531     if (inst->waitTobeCoreKilled && FillpKillCore()) {
1532         inst->waitTobeCoreKilled = FILLP_FALSE;
1533         return 0;
1534     }
1535 
1536     return 1;
1537 }
1538 
SpinstLoopRecv(struct SpungeInstance * inst)1539 static void SpinstLoopRecv(struct SpungeInstance *inst)
1540 {
1541     struct HlistNode *osSockNode;
1542     int readable = 1;
1543     osSockNode = HLIST_FIRST(&inst->osSockist);
1544     /* Select doesn't work with sendmmsg/recvmmsg, so in that case it is always
1545     set as 1 */
1546     while (osSockNode != FILLP_NULL_PTR) {
1547         struct SockOsSocket *osSock = SockOsListEntry(osSockNode);
1548         if (!g_resource.udp.supportMmsg) {
1549             readable = SysioIsSockReadable((void *)osSock->ioSock);
1550         }
1551         osSockNode = osSockNode->next;
1552 
1553         if (readable) {
1554             SpungeDoRecvCycle(osSock, inst);
1555         }
1556     }
1557 }
1558 
1559 #if !defined(FILLP_LW_LITEOS)
SpungeSetThreadInfo(FILLP_CONST struct SpungeInstance * inst)1560 static void SpungeSetThreadInfo(FILLP_CONST struct SpungeInstance *inst)
1561 {
1562     FILLP_CHAR threadName[SPUNGE_MAX_THREAD_NAME_LENGTH] = {0};
1563     FILLP_UINT8 random = (FILLP_UINT8)(FILLP_RAND() & 0xFF);
1564     (void)inst;
1565     FILLP_INT ret = sprintf_s(threadName, sizeof(threadName), "%s_%u", "Fillp_core", (FILLP_UINT)random);
1566     if (ret < ERR_OK) {
1567         FILLP_LOGWAR("SpungeInstanceMainThread sprintf_s thread name failed(%d), random(%u)", ret, random);
1568     }
1569     (void)SysSetThreadName(threadName, sizeof(threadName));
1570 
1571 #if defined(FILLP_LINUX)
1572     {
1573         pthread_t self;
1574         self = pthread_self();
1575         FILLP_LOGINF("FillP Core threadId:%ld", self);
1576         /* thread resource will be auto recycled
1577            only this detach set if no other thread try to join it */
1578         if (pthread_detach(self)) {
1579             FILLP_LOGERR("Set Detach fail");
1580         }
1581     }
1582 #elif defined(FILLP_WIN32)
1583     FILLP_LOGBUTT("FillP Core threadId:%d", GetCurrentThreadId());
1584 #endif
1585 }
1586 #endif
1587 
SpungeInstanceMainThread(void * p)1588 void SpungeInstanceMainThread(void *p)
1589 {
1590     struct SpungeInstance *inst = FILLP_NULL_PTR;
1591     FILLP_BOOL isTimeout;
1592 
1593     if (p == FILLP_NULL_PTR) {
1594         FILLP_LOGERR("parameter p is NULL");
1595         return;
1596     }
1597 
1598     inst = (struct SpungeInstance *)p;
1599 #if !defined(FILLP_LW_LITEOS)
1600     SpungeSetThreadInfo(inst);
1601 #endif
1602 
1603     if (SYS_ARCH_SEM_WAIT(&inst->threadSem)) {
1604         FILLP_LOGWAR("sem wait failed");
1605         return;
1606     }
1607     while (inst->hasInited) {
1608         SpungeHandleMsgCycle(inst);
1609         SpungeLoopCheckUnsendBox(inst);
1610         if (!SpinstLoopCheckAlive(inst)) {
1611             break;
1612         }
1613         isTimeout = SpungeMainDelay(inst);
1614         SpinstLoopRecv(inst);
1615 
1616         if (isTimeout == FILLP_TRUE) {
1617             FillpTimingWheelLoopCheck(&inst->timingWheel, inst->curTime);
1618         }
1619 
1620         SpungeCheckItemWaitTokenList(&inst->stb);
1621     }
1622 
1623     SpungeDestroyInstance(inst);
1624 }
1625 
SpungePushRecvdDataToStack(void * arg)1626 void SpungePushRecvdDataToStack(void *arg)
1627 {
1628     struct FillpPcb *pcb = (struct FillpPcb *)arg;
1629     struct FillpPcbItem *item = SkipListPopValue(&pcb->recv.recvBoxPlaceInOrder);
1630     while (item != FILLP_NULL_PTR) {
1631         FillpDataToStack(pcb, item);
1632         item = SkipListPopValue(&pcb->recv.recvBoxPlaceInOrder);
1633     }
1634 
1635     FillpEnableDataBurstTimer(pcb);
1636 
1637     return;
1638 }
1639 
1640 #ifdef __cplusplus
1641 }
1642 #endif
1643