1 /*
2 * Copyright (C) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "spunge_core.h"
17 #ifdef FILLP_LINUX
18 #include <errno.h>
19 #endif
20 #include <stdio.h>
21 #include "securec.h"
22 #include "sysio.h"
23 #include "res.h"
24 #include "socket_common.h"
25 #include "fillp_flow_control.h"
26 #include "timing_wheel.h"
27 #include "fillp_buf_item.h"
28 #include "callbacks.h"
29 #include "fillp_common.h"
30 #include "spunge.h"
31 #include "spunge_stack.h"
32 #include "spunge_message.h"
33 #include "fillp_output.h"
34 #include "fillp_input.h"
35 #include "fillp_dfx.h"
36
37 #ifdef __cplusplus
38 extern "C" {
39 #endif
40
41 SYS_ARCH_SEM g_resDeinitSem;
42 #define BIT_MOVE_CNT 3
43 #define RECV_RATE_PAR_LOW 0.98
44 #define RECV_RATE_PAT_HIGH 1.02
45 #define RECV_STATE_THRESHOLD 10
46
47 void SpungeFreeInstanceResource(struct SpungeInstance *inst);
48
49
SpungeDoRecvCycle(struct SockOsSocket * osSock,struct SpungeInstance * inst)50 void SpungeDoRecvCycle(struct SockOsSocket *osSock, struct SpungeInstance *inst)
51 {
52 FILLP_UINT32 i;
53 struct NetBuf buf;
54 struct SpungePcb *spcb = FILLP_NULL_PTR;
55
56 if (!OS_SOCK_OPS_FUNC_VALID(osSock, fetchPacket)) {
57 return;
58 }
59
60 (void)memset_s(&buf, sizeof(buf), 0, sizeof(buf));
61 buf.p = inst->tmpBuf[0];
62 for (i = 0; i < g_resource.udp.rxBurst; i++) {
63 spcb = osSock->ioSock->ops->fetchPacket((void *)osSock, (void *)&buf, 0);
64 if (spcb != FILLP_NULL_PTR) {
65 FillpDoInput(&spcb->fpcb, &buf, inst);
66 continue;
67 } else {
68 break;
69 }
70 }
71 }
72
SpungeCalExpectedBytes(FILLP_UINT32 * sendPktNum,struct SpungePcb * pcb,struct FtSocket * sock,struct FillpFlowControl * flowControl,FILLP_LLONG detaTime)73 static FILLP_UINT32 SpungeCalExpectedBytes(FILLP_UINT32 *sendPktNum, struct SpungePcb *pcb,
74 struct FtSocket *sock, struct FillpFlowControl *flowControl, FILLP_LLONG detaTime)
75 {
76 FILLP_UINT32 bytesExpected;
77 FILLP_UINT32 pktNum = sock->resConf.udp.txBurst;
78
79 if (flowControl->sendInterval) {
80 pktNum = (FILLP_UINT32)(detaTime / flowControl->sendInterval);
81 }
82
83 if (pktNum <= (sock->resConf.udp.txBurst)) {
84 /* sendRate is kbps */
85 FILLP_ULLONG bitsExpected = (FILLP_ULLONG)(detaTime * flowControl->sendRate / FILLP_ONE_SECOND);
86 bitsExpected >>= FILLP_TIME_PRECISION;
87 bytesExpected = (FILLP_UINT32)(FILLP_UTILS_BIT2BYTE(bitsExpected));
88 pcb->fpcb.statistics.traffic.packExpSendBytes += bytesExpected;
89 bytesExpected += pcb->fpcb.send.flowControl.remainBytes;
90 } else {
91 pktNum = sock->resConf.udp.txBurst;
92 bytesExpected = (FILLP_UINT32)(pktNum * pcb->fpcb.pktSize);
93 pcb->fpcb.statistics.traffic.packExpSendBytes += bytesExpected;
94 }
95 *sendPktNum = pktNum;
96 FILLP_LOGDBG("before_send_cycle fillp_sock_id:%d unRecvNum:%u, unAck:%u\r\n",
97 sock->index, pcb->fpcb.send.unrecvList.nodeNum, pcb->fpcb.send.unackList.count);
98 return bytesExpected;
99 }
100
SpungeDoSendUpdate(struct SpungePcb * pcb,FILLP_UINT32 sendBytes,FILLP_UINT32 bytesExpected)101 static void SpungeDoSendUpdate(struct SpungePcb *pcb, FILLP_UINT32 sendBytes, FILLP_UINT32 bytesExpected)
102 {
103 if ((sendBytes > 0) && (bytesExpected >= (FILLP_UINT32)sendBytes)) {
104 pcb->fpcb.statistics.traffic.packSendBytes += sendBytes;
105 pcb->fpcb.send.flowControl.remainBytes = (bytesExpected - (FILLP_UINT32)sendBytes);
106 } else {
107 pcb->fpcb.send.flowControl.remainBytes = 0;
108 }
109 }
110
SpungeDoSendCycle(struct SpungePcb * pcb,struct SpungeInstance * inst,FILLP_LLONG detaTime)111 void SpungeDoSendCycle(struct SpungePcb *pcb, struct SpungeInstance *inst, FILLP_LLONG detaTime)
112 {
113 FILLP_UINT32 sendPktNum;
114 FILLP_UINT32 sendBytes = 0;
115 FILLP_UINT32 tmpBytes = 0;
116 FILLP_UINT32 bytesExpected;
117
118 if ((pcb == FILLP_NULL_PTR) || (pcb->conn == FILLP_NULL_PTR)) {
119 FILLP_LOGERR("NULL Pointer");
120 return;
121 }
122
123 FILLP_SIZE_T pktSize = pcb->fpcb.pktSize;
124 struct FillpFlowControl *flowControl = &pcb->fpcb.send.flowControl;
125 struct FtNetconn *conn = (struct FtNetconn *)pcb->conn;
126 struct FtSocket *sock = (struct FtSocket *)conn->sock;
127
128 if (sock == FILLP_NULL_PTR) {
129 FILLP_LOGERR("NULL Pointer");
130 return;
131 }
132
133 flowControl->sendTime = inst->curTime;
134 bytesExpected = SpungeCalExpectedBytes(&sendPktNum, pcb, sock, flowControl, detaTime);
135
136 /* flow control alg may need to change bytesExpected for the this send cycle a according to current status */
137 if (pcb->fpcb.algFuncs.updateExpectSendBytes != FILLP_NULL_PTR) {
138 pcb->fpcb.algFuncs.updateExpectSendBytes(&pcb->fpcb, &bytesExpected);
139 }
140
141 /* If BytesExpected less than pktSize, no need to send, just store the remainBytes */
142 if (bytesExpected >= pktSize) {
143 /* Make sure that the send bytes won't more than bytesExpected */
144 tmpBytes = (FILLP_UINT32)(bytesExpected - pktSize);
145
146 sendBytes = FillpSendOne(&pcb->fpcb, tmpBytes, sendPktNum);
147 SpungeDoSendUpdate(pcb, sendBytes, bytesExpected);
148 } else {
149 pcb->fpcb.send.flowControl.remainBytes = bytesExpected;
150 }
151
152 FILLP_LOGDBG("after_send_cycle: fillp_sock_id:%d expected bytes:%u sentBytes:%u remain:%u \r\n",
153 sock->index, sendBytes, tmpBytes, pcb->fpcb.send.flowControl.remainBytes);
154
155 if ((pcb->fpcb.send.flowControl.remainBytes) || (!HLIST_EMPTY(&pcb->fpcb.send.unSendList)) ||
156 (pcb->fpcb.send.redunList.nodeNum) || (pcb->fpcb.send.unrecvList.nodeNum)) {
157 FillpEnableSendTimer(&pcb->fpcb);
158 } else {
159 FillpDisableSendTimer(&pcb->fpcb);
160 }
161
162 return;
163 }
164
SpungeDestroySockTableSocket(struct FtSocketTable * table,int tableIndex)165 static void SpungeDestroySockTableSocket(struct FtSocketTable *table, int tableIndex)
166 {
167 struct FtSocket *sock = FILLP_NULL_PTR;
168
169 if (table == FILLP_NULL_PTR) {
170 return;
171 }
172
173 sock = table->sockPool[tableIndex];
174 if (sock == FILLP_NULL_PTR) {
175 return;
176 }
177 (void)SYS_ARCH_RWSEM_DESTROY(&sock->sockConnSem);
178 (void)SYS_ARCH_SEM_DESTROY(&sock->connBlockSem);
179 (void)SYS_ARCH_SEM_DESTROY(&sock->sockCloseProtect);
180 (void)SYS_ARCH_SEM_DESTROY(&sock->epollTaskListLock);
181 SpungeFree(sock, SPUNGE_ALLOC_TYPE_CALLOC);
182 table->sockPool[tableIndex] = FILLP_NULL_PTR;
183 }
184
185 /* SFT */
SpungeCreateSockTable(FILLP_UINT maxSock)186 struct FtSocketTable *SpungeCreateSockTable(FILLP_UINT maxSock)
187 {
188 int i;
189 struct FtSocketTable *table;
190 table = (struct FtSocketTable *)SpungeAlloc(1, sizeof(struct FtSocketTable), SPUNGE_ALLOC_TYPE_CALLOC);
191 if (table == FILLP_NULL_PTR) {
192 FILLP_LOGERR("Failed to allocate memory for socket table \r\n");
193 return FILLP_NULL_PTR;
194 }
195
196 table->freeQueqe = FillpQueueCreate("sock_free_table", (FILLP_SIZE_T)maxSock, SPUNGE_ALLOC_TYPE_CALLOC);
197
198 if (table->freeQueqe == FILLP_NULL_PTR) {
199 FILLP_LOGERR("Fail to create socket table free queue");
200 goto ERR_FAIL;
201 }
202
203 FillpQueueSetConsSafe(table->freeQueqe, FILLP_TRUE);
204 FillpQueueSetProdSafe(table->freeQueqe, FILLP_TRUE);
205
206 table->sockPool =
207 (struct FtSocket **)SpungeAlloc(maxSock, (FILLP_SIZE_T)sizeof(struct FtSocket *), SPUNGE_ALLOC_TYPE_CALLOC);
208 if (table->sockPool == FILLP_NULL_PTR) {
209 FILLP_LOGERR("Failed to allocate memory for sockPool of socket table");
210 goto ERR_FAIL;
211 }
212
213 table->size = (FILLP_INT)maxSock;
214 SYS_ARCH_ATOMIC_SET(&table->used, 0);
215 for (i = 0; i < table->size; i++) {
216 table->sockPool[i] = FILLP_NULL_PTR;
217 }
218
219 return table;
220
221 ERR_FAIL:
222 if (table->freeQueqe != FILLP_NULL_PTR) {
223 FillpQueueDestroy(table->freeQueqe);
224 table->freeQueqe = FILLP_NULL_PTR;
225 }
226
227 if (table->sockPool != FILLP_NULL_PTR) {
228 SpungeFree(table->sockPool, SPUNGE_ALLOC_TYPE_CALLOC);
229 table->sockPool = FILLP_NULL_PTR;
230 }
231
232 SpungeFree(table, SPUNGE_ALLOC_TYPE_CALLOC);
233
234 return FILLP_NULL_PTR;
235 }
236
237
238 /* SFT */
SpungeDestroySockTable(struct FtSocketTable * table)239 void SpungeDestroySockTable(struct FtSocketTable *table)
240 {
241 FILLP_INT i;
242
243 for (i = 0; i < SYS_ARCH_ATOMIC_READ(&table->used); i++) {
244 SpungeDestroySockTableSocket(table, i);
245 }
246
247 if (table->freeQueqe != FILLP_NULL_PTR) {
248 FillpQueueDestroy(table->freeQueqe);
249 table->freeQueqe = FILLP_NULL_PTR;
250 }
251
252 if (table->sockPool != FILLP_NULL_PTR) {
253 SpungeFree(table->sockPool, SPUNGE_ALLOC_TYPE_CALLOC);
254 table->sockPool = FILLP_NULL_PTR;
255 }
256
257 /* NULL check for table already done at the caller, and also in the above
258 check, table is dereferenced without validating, so need to check for NULL
259 again here before freeing it */
260 SpungeFree(table, SPUNGE_ALLOC_TYPE_CALLOC);
261 }
262
SpungeInstMsgBoxInit(struct SpungeInstance * inst)263 static FILLP_INT SpungeInstMsgBoxInit(struct SpungeInstance *inst)
264 {
265 (void)SYS_ARCH_ATOMIC_SET(&inst->msgUsingCount, 0);
266 inst->msgBox = FillpQueueCreate("spunge_msg_box", g_spunge->resConf.maxMsgItemNum, SPUNGE_ALLOC_TYPE_MALLOC);
267 if (inst->msgBox == FILLP_NULL_PTR) {
268 FILLP_LOGERR("Init inst->msgBox Fail");
269 return ERR_NORES;
270 }
271
272 FillpQueueSetConsSafe(inst->msgBox, FILLP_TRUE);
273 FillpQueueSetProdSafe(inst->msgBox, FILLP_TRUE);
274
275 inst->msgPool = SpungeMsgCreatePool(FILLP_MSG_ITEM_INIT_NUM, (int)g_spunge->resConf.maxMsgItemNum);
276 if (inst->msgPool == FILLP_NULL_PTR) {
277 FILLP_LOGERR("create msg pool fail");
278 return ERR_NORES;
279 }
280
281 DympSetConsSafe(inst->msgPool, FILLP_TRUE);
282 DympSetProdSafe(inst->msgPool, FILLP_TRUE);
283 return ERR_OK;
284 }
285
SpungeInstSendInit(struct SpungeInstance * inst)286 static FILLP_INT SpungeInstSendInit(struct SpungeInstance *inst)
287 {
288 int i;
289
290 /* To control on client sending */
291 inst->rateControl.connectionNum = FILLP_NULL;
292
293 inst->rateControl.recv.maxRate = g_resource.flowControl.maxRecvRate;
294
295 /* To control on server sending */
296 inst->rateControl.send.maxRate = g_resource.flowControl.maxRate;
297
298 inst->thresdSemInited = FILLP_FALSE;
299 int ret = SYS_ARCH_SEM_INIT(&inst->threadSem, 1);
300 if (ret != FILLP_OK) {
301 FILLP_LOGERR("SYS_ARCH_SEM_INIT fails");
302 return ERR_NORES;
303 }
304 inst->thresdSemInited = FILLP_TRUE;
305
306 inst->unsendItem =
307 SpungeAlloc(FILLP_UNSEND_BOX_LOOP_CHECK_BURST, sizeof(struct FillpPcbItem *), SPUNGE_ALLOC_TYPE_CALLOC);
308 if (inst->unsendItem == FILLP_NULL_PTR) {
309 FILLP_LOGERR("inst->unsendItem NULL");
310 return ERR_NORES;
311 }
312
313 for (i = 0; i < FILLP_VLEN; i++) {
314 inst->tmpBuf[i] = SpungeAlloc(1, (sizeof(FILLP_CHAR) * FILLP_MAX_PKT_SIZE), SPUNGE_ALLOC_TYPE_MALLOC);
315 if (inst->tmpBuf[i] == FILLP_NULL_PTR) {
316 FILLP_LOGERR("inst->tmpBuf[%d] is NULL", i);
317 return ERR_NORES;
318 }
319 }
320
321 HLIST_INIT(&inst->sendPcbList);
322 for (i = 0; i < FILLP_INST_UNSEND_BOX_NUM; i++) {
323 inst->unsendBox[i] = FillpQueueCreate("socket_send_box", FILLP_INST_UNSEND_BOX_SIZE, SPUNGE_ALLOC_TYPE_MALLOC);
324 if (inst->unsendBox[i] == FILLP_NULL_PTR) {
325 FILLP_LOGERR("inst->unsendBox[%d] is NULL", i);
326 return ERR_NORES;
327 }
328
329 FillpQueueSetConsSafe(inst->unsendBox[i], FILLP_FALSE);
330 FillpQueueSetProdSafe(inst->unsendBox[i], FILLP_TRUE);
331 }
332 return ERR_OK;
333 }
334
SpungeInstTimerInit(struct SpungeInstance * inst)335 static void SpungeInstTimerInit(struct SpungeInstance *inst)
336 {
337 inst->curTime = SYS_ARCH_GET_CUR_TIME_LONGLONG();
338
339 (void)memset_s(&inst->macInfo, sizeof(FillpMacInfo), 0, sizeof(FillpMacInfo));
340 FillpMacTimerExpire(&inst->macInfo, inst->curTime);
341
342 FillpTimingWheelInit(&inst->timingWheel, FILLP_TIMING_WHEEL_ACCURACY);
343
344 /* Init the global timers */
345 FtGlobalTimerInit(inst);
346 SpungeInitTokenBucket(inst);
347 }
348
SpungeThreadInit(struct SpungeInstance * inst)349 static FILLP_INT SpungeThreadInit(struct SpungeInstance *inst)
350 {
351 FILLP_THREAD threadId;
352
353 inst->mainThreadParam.func = SpungeInstanceMainThread;
354 inst->mainThreadParam.param = inst;
355 inst->minSendInterval = FILLP_MAX_SEND_INTERVAL;
356
357 inst->hasInited = FILLP_TRUE;
358
359 (void)FILLP_SYS_START_NEWTHREAD(&inst->mainThreadParam, &threadId);
360
361 return ERR_OK;
362 }
363
SpungeInstInit(struct SpungeInstance * inst)364 FILLP_INT SpungeInstInit(struct SpungeInstance *inst)
365 {
366 FILLP_INT err;
367
368 if (inst == FILLP_NULL_PTR) {
369 FILLP_LOGERR("Init inst null");
370 return ERR_NULLPTR;
371 }
372
373 if (inst->hasInited) {
374 FILLP_LOGERR("Stack has been inited");
375 return ERR_OK;
376 }
377
378 err = SpungeInstMsgBoxInit(inst);
379 if (err != ERR_OK) {
380 goto FAIL;
381 }
382
383 HLIST_INIT(&inst->osSockist);
384 HLIST_INIT(&inst->pcbList.list);
385
386 err = SpungeInstSendInit(inst);
387 if (err != ERR_OK) {
388 goto FAIL;
389 }
390
391 SpungeInstTimerInit(inst);
392
393 inst->cleanseDataCtr = 0;
394
395 err = SpungeThreadInit(inst);
396 if (err != ERR_OK) {
397 goto FAIL;
398 }
399
400 return ERR_OK;
401
402 FAIL:
403 SpungeFreeInstanceResource(inst);
404 return err;
405 }
406
SpungeSysCallRegisted(void)407 static FILLP_INT SpungeSysCallRegisted(void)
408 {
409 FILLP_INT ret;
410
411 ret = FillpValidateFuncPtr(&g_fillpOsBasicLibFun, sizeof(FillpSysLibBasicCallbackFuncSt));
412 if (ret != ERR_OK) {
413 SET_ERRNO(FILLP_EINVAL);
414 return ret;
415 }
416
417 ret = FillpValidateFuncPtr(&g_fillpOsSemLibFun, sizeof(FillpSysLibSemCallbackFuncSt));
418 if (ret != ERR_OK) {
419 SET_ERRNO(FILLP_EINVAL);
420 return ret;
421 }
422
423 ret = FillpValidateFuncPtr(&g_fillpOsSocketLibFun, sizeof(FillpSysLibSockCallbackFuncSt));
424 if (ret != ERR_OK) {
425 SET_ERRNO(FILLP_EINVAL);
426 return ret;
427 }
428
429 return ERR_OK;
430 }
431
FtFreeEpollResource(void)432 static void FtFreeEpollResource(void)
433 {
434 if (g_spunge->epitemPool != FILLP_NULL_PTR) {
435 DympDestroyPool(g_spunge->epitemPool);
436 g_spunge->epitemPool = FILLP_NULL_PTR;
437 }
438
439 if (g_spunge->eventpollPool != FILLP_NULL_PTR) {
440 DympDestroyPool(g_spunge->eventpollPool);
441 g_spunge->eventpollPool = FILLP_NULL_PTR;
442 }
443 return;
444 }
445
FtAllocateEpollResource(void)446 static FILLP_INT FtAllocateEpollResource(void)
447 {
448 DympoolItemOperaCbSt itemOperaCb = {FILLP_NULL_PTR, FILLP_NULL_PTR};
449 g_spunge->epitemPool = DympCreatePool(FILLP_EPOLL_ITEM_INIT_NUM, (int)g_spunge->resConf.maxEpollEventNum,
450 sizeof(struct EpItem), FILLP_TRUE, &itemOperaCb);
451 if (g_spunge->epitemPool == FILLP_NULL_PTR) {
452 FILLP_LOGERR("create mem pool for g_spunge->epitemPool failed");
453 return ERR_NORES;
454 }
455 DympSetConsSafe(g_spunge->epitemPool, FILLP_TRUE);
456 DympSetProdSafe(g_spunge->epitemPool, FILLP_TRUE);
457
458 g_spunge->eventpollPool = DympCreatePool(FILLP_EPOLL_ITEM_INIT_NUM, (int)g_spunge->resConf.maxEpollEventNum,
459 sizeof(struct EventPoll), FILLP_TRUE, &itemOperaCb);
460 if (g_spunge->eventpollPool == FILLP_NULL_PTR) {
461 FtFreeEpollResource();
462 FILLP_LOGERR("create Dym pool for g_spunge->eventpollPool failed");
463 return ERR_NORES;
464 }
465 DympSetConsSafe(g_spunge->eventpollPool, FILLP_TRUE);
466 DympSetProdSafe(g_spunge->eventpollPool, FILLP_TRUE);
467 return ERR_OK;
468 }
469
SpungeAllocInstRes(void)470 static int SpungeAllocInstRes(void)
471 {
472 FILLP_UINT i;
473 FILLP_UINT j;
474 FILLP_INT err;
475
476 for (i = 0; i < g_spunge->insNum; i++) {
477 (void)memset_s(&g_spunge->instPool[i], sizeof(struct SpungeInstance), 0, sizeof(struct SpungeInstance));
478 g_spunge->instPool[i].instIndex = (FILLP_INT)i;
479 err = SpungeInstInit(&g_spunge->instPool[i]);
480 if (err == ERR_OK) {
481 continue;
482 }
483 FILLP_LOGERR("SpungeInstInit failed :: Instance number :: %u\r\n", i);
484
485 /* Release instances which are created success */
486 if (i > 0) {
487 g_spunge->insNum = i;
488
489 g_spunge->hasDeinitBlked = FILLP_TRUE;
490 for (j = 0; j < g_spunge->insNum; j++) {
491 g_spunge->instPool[j].waitTobeCoreKilled = FILLP_TRUE;
492 }
493
494 /* After this step g_spunge will be freed, it should not be accessed, caller has check for NULL pointer
495 before accessing, so it will not cause problem */
496 (void)SYS_ARCH_SEM_WAIT(&g_resDeinitSem);
497 }
498 return err;
499 }
500
501 return ERR_OK;
502 }
503
SpungeFreeInstSendRecv(struct SpungeInstance * inst)504 static void SpungeFreeInstSendRecv(struct SpungeInstance *inst)
505 {
506 int j;
507 if (inst->thresdSemInited) {
508 (void)SYS_ARCH_SEM_DESTROY(&inst->threadSem);
509 inst->thresdSemInited = FILLP_FALSE;
510 }
511
512 for (j = 0; j < FILLP_INST_UNSEND_BOX_NUM; j++) {
513 if (inst->unsendBox[j] != FILLP_NULL_PTR) {
514 FillpQueueDestroy(inst->unsendBox[j]);
515 inst->unsendBox[j] = FILLP_NULL_PTR;
516 } else {
517 break;
518 }
519 }
520 if (inst->unsendItem != FILLP_NULL_PTR) {
521 SpungeFree(inst->unsendItem, SPUNGE_ALLOC_TYPE_CALLOC);
522 inst->unsendItem = FILLP_NULL_PTR;
523 }
524
525 for (j = 0; j < FILLP_VLEN; j++) {
526 if (inst->tmpBuf[j] == FILLP_NULL_PTR) {
527 break;
528 }
529 SpungeFree(inst->tmpBuf[j], SPUNGE_ALLOC_TYPE_MALLOC);
530 inst->tmpBuf[j] = FILLP_NULL_PTR;
531 }
532 }
533
SpungeFreeInstanceResource(struct SpungeInstance * inst)534 void SpungeFreeInstanceResource(struct SpungeInstance *inst)
535 {
536 if (inst == FILLP_NULL_PTR) {
537 return;
538 }
539
540 if (inst->msgBox != FILLP_NULL_PTR) {
541 FillpQueueDestroy(inst->msgBox);
542 inst->msgBox = FILLP_NULL_PTR;
543 }
544
545 while (SYS_ARCH_ATOMIC_READ(&inst->msgUsingCount) > 0) {
546 FILLP_SLEEP_MS(1);
547 }
548
549 if (inst->msgPool != FILLP_NULL_PTR) {
550 SpungeMsgPoolDestroy(inst->msgPool);
551 inst->msgPool = FILLP_NULL_PTR;
552 }
553
554 SpungeFreeInstSendRecv(inst);
555
556 inst->hasInited = FILLP_FALSE;
557 }
558
FtGetSpungeRes(struct SpungeResConf * resConf)559 static void FtGetSpungeRes(struct SpungeResConf *resConf)
560 {
561 (void)memset_s(resConf, sizeof(struct SpungeResConf), 0, sizeof(struct SpungeResConf));
562
563 resConf->maxInstNum = (FILLP_UINT)UTILS_MIN(g_resource.common.maxInstNum, MAX_SPUNGEINSTANCE_NUM);
564 resConf->maxSockNum = g_resource.common.maxSockNum;
565 resConf->maxConnNum = g_resource.common.maxConnNum;
566 resConf->maxMsgItemNum = ((FILLP_UINT)g_resource.common.maxSockNum * FILLP_SPUNGE_EVENTG_MULT_NUM);
567 resConf->maxTimerItemNum = ((FILLP_UINT)g_resource.common.maxSockNum * FILLP_ITEM_MULT_NUM);
568 resConf->maxEpollEventNum = (FILLP_UINT)(g_resource.common.maxSockNum * FILLP_ITEM_MULT_NUM);
569 resConf->maxEpollItemNum = (FILLP_UINT)(g_resource.common.maxSockNum * FILLP_ITEM_MULT_NUM);
570 }
571
FtGlobalTimerInit(struct SpungeInstance * inst)572 void FtGlobalTimerInit(struct SpungeInstance *inst)
573 {
574 /* Initialize the Fairness timer */
575 inst->fairTimerNode.cbNode.cb = SpinstLoopFairnessChecker;
576 inst->fairTimerNode.cbNode.arg = (void *)inst;
577 inst->fairTimerNode.interval = SPUNGE_WEIGHT_ADJUST_INTERVAL;
578 FillpTimingWheelAddTimer(&inst->timingWheel, (SYS_ARCH_GET_CUR_TIME_LONGLONG() + inst->fairTimerNode.interval),
579 &inst->fairTimerNode);
580 /* Initialize the MAC timer */
581 inst->macTimerNode.cbNode.cb = SpinstLoopMacTimerChecker;
582 inst->macTimerNode.cbNode.arg = (void *)inst;
583 inst->macTimerNode.interval = FILLP_KEY_REFRESH_TIME;
584 FillpTimingWheelAddTimer(&inst->timingWheel, (SYS_ARCH_GET_CUR_TIME_LONGLONG() + inst->macTimerNode.interval),
585 &inst->macTimerNode);
586 return;
587 }
588
SpungeCheckCallbacks(void)589 static FILLP_INT SpungeCheckCallbacks(void)
590 {
591 FILLP_INT ret = SpungeSysCallRegisted();
592 return ret;
593 }
594
FtInitGlobalUdpIo(void)595 static FILLP_INT FtInitGlobalUdpIo(void)
596 {
597 g_udpIo.readSet = FILLP_FD_CREATE_FD_SET();
598 if (g_udpIo.readSet == FILLP_NULL_PTR) {
599 FILLP_LOGERR("Malloc g_udpIo.readSet failed");
600 return ERR_NORES;
601 }
602
603 g_udpIo.readableSet = FILLP_FD_CREATE_FD_SET();
604 if (g_udpIo.readableSet == FILLP_NULL_PTR) {
605 FILLP_LOGERR("Malloc g_udpIo.readableSet failed");
606 return ERR_NORES;
607 }
608
609 HLIST_INIT(&g_udpIo.listenPcbList);
610
611 return ERR_OK;
612 }
613
FtInitGlobalInstPool(void)614 static FILLP_INT FtInitGlobalInstPool(void)
615 {
616 g_spunge->insNum = g_spunge->resConf.maxInstNum;
617 g_spunge->instPool = (struct SpungeInstance *)SpungeAlloc(g_spunge->insNum, sizeof(struct SpungeInstance),
618 SPUNGE_ALLOC_TYPE_MALLOC);
619 if (g_spunge->instPool == FILLP_NULL_PTR) {
620 FILLP_LOGERR("Malloc g_spunge->instPool failed");
621 return ERR_NORES;
622 }
623
624 return ERR_OK;
625 }
626
FtInitGlobalSockTable(void)627 static FILLP_INT FtInitGlobalSockTable(void)
628 {
629 g_spunge->sockTable = SpungeCreateSockTable(g_spunge->resConf.maxSockNum);
630 if (g_spunge->sockTable == FILLP_NULL_PTR) {
631 FILLP_LOGERR("Malloc g_spunge->sockTable failed\r\n");
632 return ERR_NORES;
633 }
634 return ERR_OK;
635 }
636
FtInitGlobalNetPool(void)637 static FILLP_INT FtInitGlobalNetPool(void)
638 {
639 FILLP_UINT netPoolInitSize = FILLP_CONN_ITEM_INIT_NUM;
640
641 if (netPoolInitSize > g_spunge->resConf.maxConnNum) {
642 netPoolInitSize = g_spunge->resConf.maxConnNum;
643 }
644
645 DympoolItemOperaCbSt itemOperaCb = {FILLP_NULL_PTR, FILLP_NULL_PTR};
646 g_spunge->netPool = DympCreatePool((FILLP_INT)netPoolInitSize, (int)g_spunge->resConf.maxConnNum,
647 sizeof(struct FtNetconn), FILLP_TRUE, &itemOperaCb);
648 if (g_spunge->netPool == FILLP_NULL_PTR) {
649 FILLP_LOGERR("Malloc g_spunge->netPool failed\r\n");
650 return ERR_NORES;
651 }
652
653 DympSetConsSafe(g_spunge->netPool, FILLP_TRUE);
654 DympSetProdSafe(g_spunge->netPool, FILLP_FALSE);
655 return ERR_OK;
656 }
657
FtFreeGlobalUdpIo(void)658 static void FtFreeGlobalUdpIo(void)
659 {
660 if (g_udpIo.readSet != FILLP_NULL_PTR) {
661 FILLP_FD_DESTROY_FD_SET(g_udpIo.readSet);
662 g_udpIo.readSet = FILLP_NULL_PTR;
663 }
664
665 if (g_udpIo.readableSet != FILLP_NULL_PTR) {
666 FILLP_FD_DESTROY_FD_SET(g_udpIo.readableSet);
667 g_udpIo.readableSet = FILLP_NULL_PTR;
668 }
669 }
670
FtFreeGlobalSpunge(void)671 static void FtFreeGlobalSpunge(void)
672 {
673 if (g_spunge == FILLP_NULL_PTR) {
674 return;
675 }
676 g_spunge->hasInited = FILLP_FALSE;
677
678 FtFreeEpollResource();
679
680 if (g_spunge->sockTable != FILLP_NULL_PTR) {
681 SpungeDestroySockTable(g_spunge->sockTable);
682 g_spunge->sockTable = FILLP_NULL_PTR;
683 }
684
685 if (g_spunge->netPool != FILLP_NULL_PTR) {
686 DympDestroyPool(g_spunge->netPool);
687 g_spunge->netPool = FILLP_NULL_PTR;
688 }
689
690 if (g_spunge->instPool != FILLP_NULL_PTR) {
691 SpungeFree(g_spunge->instPool, SPUNGE_ALLOC_TYPE_MALLOC);
692 g_spunge->instPool = FILLP_NULL_PTR;
693 }
694
695 FtFreeGlobalUdpIo();
696
697 SpungeFree(g_spunge, SPUNGE_ALLOC_TYPE_MALLOC);
698 g_spunge = FILLP_NULL_PTR;
699 }
700
FtModuleInit(void)701 static FILLP_INT FtModuleInit(void)
702 {
703 FILLP_INT err;
704 int ret;
705
706 err = FtInitGlobalUdpIo();
707 if (err != ERR_OK) {
708 return err;
709 }
710
711 err = FtInitGlobalInstPool();
712 if (err != ERR_OK) {
713 return err;
714 }
715
716 err = FtInitGlobalSockTable();
717 if (err != ERR_OK) {
718 return err;
719 }
720
721 err = FtInitGlobalNetPool();
722 if (err != ERR_OK) {
723 return err;
724 }
725
726 err = FtAllocateEpollResource();
727 if (err != ERR_OK) {
728 FILLP_LOGERR("Alloc epoll resource fail");
729 return err;
730 }
731
732 ret = SYS_ARCH_SEM_INIT(&g_resDeinitSem, 0);
733 if (ret != FILLP_OK) {
734 FILLP_LOGERR("deinit sem init failed. ");
735 return ERR_NORES;
736 }
737
738 err = SpungeAllocInstRes();
739 if (err != ERR_OK) {
740 FILLP_LOGERR("Spunge init instances resource fail");
741 (void)SYS_ARCH_SEM_DESTROY(&g_resDeinitSem);
742 return err;
743 }
744 return ERR_OK;
745 }
746
FtInit(void)747 FILLP_INT FtInit(void)
748 {
749 FILLP_INT err;
750
751 FILLP_LOGBUTT("init stack");
752 if (g_spunge != FILLP_NULL_PTR) {
753 FILLP_LOGERR("Init already done");
754 return ERR_STACK_ALREADY_INITIALD;
755 }
756
757 if (SpungeCheckCallbacks() != ERR_OK) {
758 FILLP_LOGERR("User has not registered system callback functions \r\n");
759 return ERR_ADP_SYS_CALLBACK_NOT_REGISTERED;
760 }
761
762 if (SYS_ARCH_INIT() != ERR_OK) {
763 FILLP_LOGERR("SYS_ARCH_INIT ssp failed \r\n");
764 return ERR_NORES;
765 }
766
767 g_spunge = (struct Spunge *)SpungeAlloc(1, sizeof(struct Spunge), SPUNGE_ALLOC_TYPE_MALLOC);
768 if (g_spunge == FILLP_NULL_PTR) {
769 FILLP_LOGERR("Alloc g_spunge fail");
770 return ERR_NORES;
771 }
772
773 (void)memset_s(g_spunge, sizeof(struct Spunge), FILLP_NULL_NUM, sizeof(struct Spunge));
774
775 FtGetSpungeRes(&g_spunge->resConf);
776
777 err = FtModuleInit();
778 if (err != ERR_OK) {
779 goto ERR_FAIL;
780 }
781
782 FILLP_LOGBUTT("FillP_init: Spunge mem_zone alloc finished!");
783
784 FILLP_LOGBUTT("FillP Core init success!");
785 FILLP_LOGBUTT("version " FILLP_VERSION);
786
787 g_spunge->traceFlag = 0;
788 g_spunge->hasInited = FILLP_TRUE;
789 FILLP_LOGBUTT("Init success");
790 return ERR_OK;
791
792 ERR_FAIL:
793 FtFreeGlobalSpunge();
794
795 FILLP_LOGERR("Init fail,clean up");
796 return err;
797 }
798
799
800 /* starts from LSB bit position, cnt starts from 0 */
801 #define SPUNGE_SET_BIT(num, pos) ((num) |= (1U << (pos)))
802
SpungZeroInstance(void)803 static void SpungZeroInstance(void)
804 {
805 FILLP_BOOL hasDeinitBlked = g_spunge->hasDeinitBlked;
806 /* This logic can work for 32 instance in future need to change if more number of
807 instance are supported */
808 /* instance 0 is already closed so mark in bit field. */
809 FILLP_UINT32 instBitClosed = 1;
810 FILLP_UINT32 i;
811 FILLP_UINT32 instAllBit = (FILLP_UINT32)((1U << g_spunge->insNum) - 1);
812
813 /* In case of blocking FtDestroy 0th instance should post semaphore after all instance threads are exited,
814 and all resources are release. In case on non blocking FtDestroy 0th instance should free free all
815 reasource no need to post semaphore and need to release semaphore also
816 Wait for other instance threads to release respective resource and exit thread */
817 while (instBitClosed != instAllBit) {
818 FILLP_SLEEP_MS(1);
819 for (i = 1; i < g_spunge->insNum; i++) {
820 if (g_spunge->instPool[i].hasInited == 0) {
821 /* Mark as closed */
822 SPUNGE_SET_BIT(instBitClosed, i);
823 }
824 }
825 }
826
827 /* Free all global resource and reset parameters */
828 InitGlobalResourceDefault();
829 InitGlobalAppResourceDefault();
830 FtFreeGlobalSpunge();
831 FillpSysOsDeinit();
832 FillpDfxDoEvtCbSet(FILLP_NULL_PTR, FILLP_NULL_PTR);
833
834 /* Signal or release deinit sem */
835 if (hasDeinitBlked) {
836 (void)SYS_ARCH_SEM_POST(&g_resDeinitSem);
837 } else {
838 (void)SYS_ARCH_SEM_DESTROY(&g_resDeinitSem);
839 }
840 }
841
SpungeDestroyInstance(struct SpungeInstance * inst)842 void SpungeDestroyInstance(struct SpungeInstance *inst)
843 {
844 FILLP_INT instIdx = inst->instIndex;
845
846 SpungeFreeInstanceResource(inst);
847
848 if (instIdx == 0) {
849 SpungZeroInstance();
850 }
851
852 FILLP_LOGERR("Destroy finish index: %d", instIdx);
853 }
854
FtDestroyInner(FILLP_INT block)855 static void FtDestroyInner(FILLP_INT block)
856 {
857 FILLP_UINT i;
858 FILLP_LOGERR("Destroy stack start, block(%d)", block);
859
860 if ((g_spunge == FILLP_NULL_PTR) || (!g_spunge->hasInited)) {
861 return;
862 }
863
864 g_spunge->hasDeinitBlked = (FILLP_BOOL)block;
865
866 /*
867 * should check g_spunge again,
868 * because the g_spunge may be freed in main thread after all the inst is freed
869 */
870 for (i = 0; g_spunge != FILLP_NULL_PTR && i < g_spunge->insNum; i++) {
871 (void)SYS_ARCH_SEM_WAIT(&g_spunge->instPool[i].threadSem);
872 g_spunge->instPool[i].waitTobeCoreKilled = FILLP_TRUE;
873 (void)SYS_ARCH_SEM_POST(&g_spunge->instPool[i].threadSem);
874 }
875
876 if ((block) && (SYS_ARCH_SEM_WAIT(&g_resDeinitSem) == 0)) {
877 (void)SYS_ARCH_SEM_DESTROY(&g_resDeinitSem);
878 }
879
880 FILLP_LOGERR("Destroy finished");
881 return;
882 }
883
FtDestroy(void)884 void FtDestroy(void)
885 {
886 FtDestroyInner(FILLP_TRUE);
887 }
888
FtDestroyNonblock(void)889 void FtDestroyNonblock(void)
890 {
891 FtDestroyInner(FILLP_FALSE);
892 }
893
SpungeHandleMsgCycle(struct SpungeInstance * inst)894 void SpungeHandleMsgCycle(struct SpungeInstance *inst)
895 {
896 struct SpungeMsg *msg = FILLP_NULL_PTR;
897 FILLP_INT ret;
898 FILLP_ULONG i;
899
900 FILLP_ULONG boxItems = FillpQueueValidOnes(inst->msgBox);
901 if ((boxItems == 0) || (boxItems > inst->msgBox->size)) {
902 boxItems = (FILLP_ULONG)inst->msgBox->size;
903 }
904
905 for (i = 0; i < boxItems; i++) {
906 ret = FillpQueuePop(inst->msgBox, (void *)&msg, 1);
907 if (ret <= 0) {
908 break;
909 }
910 if (msg->msgType < MSG_TYPE_END) {
911 g_msgHandler[msg->msgType](msg->value, inst);
912 }
913 if (!msg->block) {
914 DympFree(msg);
915 } else {
916 (void)SYS_ARCH_SEM_POST(&msg->syncSem);
917 }
918 }
919
920 return;
921 }
922
SpungeLoopCheckUnsendBox(struct SpungeInstance * inst)923 static void SpungeLoopCheckUnsendBox(struct SpungeInstance *inst)
924 {
925 int j;
926 FillpQueue *boxQueue = inst->unsendBox[0];
927 struct FillpPcbItem **item = inst->unsendItem;
928 struct FtNetconn *netconn = FILLP_NULL_PTR;
929 struct FillpPcb *fpcb = FILLP_NULL_PTR;
930 FILLP_INT count;
931
932 count = FillpQueuePop(boxQueue, (void *)item, FILLP_UNSEND_BOX_LOOP_CHECK_BURST);
933 if (count <= 0) {
934 return;
935 }
936
937 for (j = 0; j < count; j++) {
938 netconn = (struct FtNetconn *)item[j]->netconn;
939 if (netconn == FILLP_NULL_PTR) {
940 FillpFreeBufItem(item[j]);
941 continue;
942 }
943
944 fpcb = &(netconn->pcb->fpcb);
945 HlistAddTail(&fpcb->send.unSendList, &item[j]->unsendNode);
946 (void)FillpFrameAddItem(&fpcb->frameHandle, item[j]);
947 FillpPcbSendFc(fpcb);
948 }
949 }
950
SpungeDelay(struct SpungeInstance * inst,FILLP_LLONG curTime)951 static FILLP_BOOL SpungeDelay(struct SpungeInstance *inst, FILLP_LLONG curTime)
952 {
953 FILLP_LLONG timePass = curTime - inst->curTime;
954
955 FILLP_LLONG minSendInterval = (FILLP_LLONG)((FILLP_ULLONG)inst->minSendInterval >> FILLP_TIME_PRECISION);
956 if ((timePass > minSendInterval) && (timePass > FILLP_MINIMUM_SELECT_TIME)) {
957 minSendInterval = 0;
958 } else if (minSendInterval < FILLP_MINIMUM_SELECT_TIME) {
959 minSendInterval = FILLP_MINIMUM_SELECT_TIME;
960 }
961
962 if (SYS_ARCH_SEM_POST(&inst->threadSem)) {
963 FILLP_LOGWAR("sem wait failed");
964 }
965 if (inst->pcbList.list.size > 0) {
966 (void)SysioSelect((FILLP_INT)minSendInterval);
967 } else {
968 FILLP_SLEEP_MS((FILLP_UINT)FILLP_UTILS_US2MS(minSendInterval));
969 }
970 if (SYS_ARCH_SEM_WAIT(&inst->threadSem)) {
971 FILLP_LOGWAR("sem wait failed");
972 }
973 return FILLP_TRUE;
974 }
975
SpungeMainDelay(struct SpungeInstance * inst)976 static FILLP_BOOL SpungeMainDelay(struct SpungeInstance *inst)
977 {
978 FILLP_BOOL isTimeout = FILLP_TRUE;
979 FILLP_LLONG curTime = SYS_ARCH_GET_CUR_TIME_LONGLONG();
980
981 if (g_resource.common.fullCpuEnable && (inst->stb.tbFpcbLists.size > 0)) {
982 (void)SysioSelect(0);
983 inst->curTime = curTime;
984 return isTimeout;
985 }
986
987 if (curTime < inst->curTime) {
988 FILLP_LOGERR("System Time has been changed to past value");
989 return isTimeout;
990 }
991 isTimeout = SpungeDelay(inst, curTime);
992 curTime = SYS_ARCH_GET_CUR_TIME_LONGLONG();
993 if (curTime < inst->curTime) {
994 FILLP_LOGERR("System Time has been changed to past value\r\n");
995 return isTimeout;
996 }
997
998 inst->curTime = curTime;
999 inst->minSendInterval = FILLP_MAX_SEND_INTERVAL;
1000 return isTimeout;
1001 }
1002
FillpServerRecvRateAdjustment(struct SpungeInstance * inst,FILLP_UINT32 calcRecvTotalRate,FILLP_INT realRecvConn,FILLP_UINT32 * connRecvCalLimit)1003 void FillpServerRecvRateAdjustment(struct SpungeInstance *inst, FILLP_UINT32 calcRecvTotalRate, FILLP_INT realRecvConn,
1004 FILLP_UINT32 *connRecvCalLimit)
1005 {
1006 static FILLP_UINT8 recvStableState = 0;
1007 static FILLP_UINT32 prevRecvTotRate = 0;
1008 static const FILLP_UINT32 maxCalcRecvRate = 0;
1009
1010 if ((calcRecvTotalRate > (RECV_RATE_PAR_LOW * prevRecvTotRate)) &&
1011 (calcRecvTotalRate < (RECV_RATE_PAT_HIGH * prevRecvTotRate))) {
1012 if (recvStableState < RECV_STATE_THRESHOLD) {
1013 recvStableState++;
1014 }
1015 } else {
1016 if (recvStableState > 0) {
1017 recvStableState--;
1018 }
1019 }
1020
1021 prevRecvTotRate = calcRecvTotalRate;
1022
1023 /* Give some space for every connection to grow, since if the network
1024 conditions are varying for every connection */
1025 /* If the sum of rate of all connections is less than the historical max
1026 recv rate, then allow to grow */
1027 if (recvStableState < FILLP_FC_STABLESTATE_VAL_2) {
1028 calcRecvTotalRate = (FILLP_UINT32)(calcRecvTotalRate * FILL_FC_SEND_RATE_TOTAL_1);
1029 } else if (calcRecvTotalRate < (maxCalcRecvRate * FILLP_FC_SEND_RATE_MULTIPLE_FACTOR)) {
1030 /* Give the enough room for the client to grow the bandwidth */
1031 calcRecvTotalRate = maxCalcRecvRate;
1032 } else {
1033 /* Give 5% room for connections to grow, so that it can achieve the max
1034 network goodput */
1035 calcRecvTotalRate = (FILLP_UINT32)(calcRecvTotalRate * FILL_FC_SEND_RATE_TOTAL_2);
1036 }
1037
1038 /* If the sum of received rate of all the connections is more than the configured
1039 rate, then limit it to configured rate.
1040 Rate should not exceed the configured value */
1041 if (calcRecvTotalRate > inst->rateControl.recv.maxRate) {
1042 calcRecvTotalRate = inst->rateControl.recv.maxRate;
1043 }
1044
1045 if (realRecvConn > 0) {
1046 *connRecvCalLimit = (FILLP_UINT32)((double)calcRecvTotalRate / realRecvConn);
1047 } else {
1048 /* If there are no connections which are active and connected, then set
1049 the rate limit for every connection to maximum limit */
1050 *connRecvCalLimit = inst->rateControl.recv.maxRate;
1051 }
1052 /* End of rate adjustment for Data receiving at server side */
1053 return;
1054 }
1055
FillpServerSendRateAdjustment(struct SpungeInstance * inst,FILLP_UINT32 calcSendTotalRate,FILLP_INT realSendConn,FILLP_UINT32 * connSendCalLimit)1056 void FillpServerSendRateAdjustment(struct SpungeInstance *inst, FILLP_UINT32 calcSendTotalRate, FILLP_INT realSendConn,
1057 FILLP_UINT32 *connSendCalLimit)
1058 {
1059 static FILLP_UINT8 sendStableState = 0;
1060 static FILLP_UINT32 prevSendTotRate = 0;
1061 static const FILLP_UINT32 maxCalcSendRate = 0;
1062
1063 if ((calcSendTotalRate > (FILLP_FC_PREV_ADJUSTMENT_RATE_LOW_VAL * prevSendTotRate)) &&
1064 (calcSendTotalRate < (FILLP_FC_PREV_ADJUSTMENT_RATE_HIGH_VAL * prevSendTotRate))) {
1065 if (sendStableState < FILLP_FC_STABLESTATE_VAL_1) {
1066 sendStableState++;
1067 }
1068 } else {
1069 if (sendStableState > 0) {
1070 sendStableState--;
1071 }
1072 }
1073
1074 prevSendTotRate = calcSendTotalRate;
1075
1076 /* Give some space for every connection to grow, since if the network
1077 conditions are varying for every connection */
1078 /* If the sum of rate of all connections is less than the historical max
1079 recv rate, then allow to grow */
1080 if (sendStableState < FILLP_FC_STABLESTATE_VAL_2) {
1081 calcSendTotalRate = (FILLP_UINT32)(calcSendTotalRate * FILL_FC_SEND_RATE_TOTAL_1);
1082 } else if (calcSendTotalRate < (maxCalcSendRate * FILLP_FC_SEND_RATE_MULTIPLE_FACTOR)) {
1083 calcSendTotalRate = maxCalcSendRate;
1084 } else {
1085 /* Give 5% room for connections to grow, so that it can achieve the max
1086 network goodput */
1087 calcSendTotalRate = (FILLP_UINT32)(calcSendTotalRate * FILL_FC_SEND_RATE_TOTAL_2);
1088 }
1089
1090 /* If the sum of sending rate as acked by PACK for all the connections is
1091 more than the configured rate, then limit it to configured rate.
1092 Rate should not exceed the configured value */
1093 if (calcSendTotalRate > inst->rateControl.send.maxRate) {
1094 calcSendTotalRate = inst->rateControl.send.maxRate;
1095 }
1096
1097 if (realSendConn > 0) {
1098 *connSendCalLimit = (FILLP_UINT32)((double)calcSendTotalRate / realSendConn);
1099 } else {
1100 /* If there are no connections which are active and connected, then set
1101 the rate limit for every connection to maximum limit */
1102 *connSendCalLimit = inst->rateControl.send.maxRate;
1103 }
1104
1105 /* End of rate adjustment for Data receiving at server side */
1106 return;
1107 }
1108
FillpCalculateFairness(struct SpungeInstance * inst)1109 void FillpCalculateFairness(struct SpungeInstance *inst)
1110 {
1111 struct HlistNode *pcbNode = FILLP_NULL_PTR;
1112 struct SpungePcb *pcb = FILLP_NULL_PTR;
1113 FILLP_INT realSendConn = 0;
1114 FILLP_INT realRecvConn = 0;
1115 struct FtNetconn *conn = FILLP_NULL_PTR;
1116 FILLP_UINT8 connState;
1117 FILLP_UINT32 connRecvCalLimit;
1118 FILLP_UINT32 connSendCalLimit;
1119 FILLP_UINT32 calcRecvTotalRate = 0;
1120 FILLP_UINT32 calcSendTotalRate = 0;
1121
1122 pcbNode = HLIST_FIRST(&inst->pcbList.list);
1123 while (pcbNode != FILLP_NULL_PTR) {
1124 pcb = SpungePcbListNodeEntry(pcbNode);
1125 pcbNode = pcbNode->next;
1126 conn = (struct FtNetconn *)pcb->conn;
1127
1128 connState = NETCONN_GET_STATE(conn);
1129 if (connState > CONN_STATE_CONNECTED) {
1130 /* Connection state is greater than the connected state, so skip and continue */
1131 continue;
1132 }
1133
1134 if (pcb->fpcb.statistics.pack.periodRecvRate > FILLP_DEFAULT_MIN_RATE) {
1135 realRecvConn++;
1136 }
1137
1138 if (pcb->fpcb.statistics.pack.periodSendRate > FILLP_DEFAULT_MIN_RATE) {
1139 realSendConn++;
1140 }
1141
1142 /* Calculate for Data receiving on server side */
1143 calcRecvTotalRate = calcRecvTotalRate + pcb->fpcb.statistics.pack.periodRecvRate;
1144
1145 /* Calculate for Data sending from server side */
1146 calcSendTotalRate = calcSendTotalRate + pcb->fpcb.statistics.pack.periodAckByPackRate;
1147 }
1148
1149 /* Calculation of rate adjustment for Data receiving at server side */
1150 FillpServerRecvRateAdjustment(inst, calcRecvTotalRate, realRecvConn, &connRecvCalLimit);
1151
1152 /* Calculation of rate adjustment for Data Sending at server side */
1153 FillpServerSendRateAdjustment(inst, calcSendTotalRate, realSendConn, &connSendCalLimit);
1154
1155 pcbNode = HLIST_FIRST(&inst->pcbList.list);
1156 while (pcbNode != FILLP_NULL_PTR) {
1157 pcb = SpungePcbListNodeEntry(pcbNode);
1158 pcbNode = pcbNode->next;
1159
1160 /* The rate is set to all the connections irrespective of whether the
1161 connection is idle or not, so that, once the connection starts pumping
1162 the data, it will have enough window to start with.
1163 All this algorithm will adjust the rate of all the connections accordingly */
1164 pcb->rateControl.recv.curMaxRateLimitation = connRecvCalLimit;
1165 pcb->fpcb.recv.oppositeSetRate = pcb->rateControl.recv.curMaxRateLimitation;
1166
1167 pcb->rateControl.send.curMaxRateLimitation = connSendCalLimit;
1168 pcb->fpcb.send.flowControl.sendRateLimit = pcb->rateControl.send.curMaxRateLimitation;
1169 }
1170
1171 return;
1172 }
1173
FillpKillCore(void)1174 FILLP_BOOL FillpKillCore(void)
1175 {
1176 FILLP_UINT16 i;
1177 for (i = 0; i < SYS_ARCH_ATOMIC_READ(&g_spunge->sockTable->used); i++) {
1178 struct FtSocket *sock = g_spunge->sockTable->sockPool[i];
1179
1180 if ((sock->allocState != SOCK_ALLOC_STATE_FREE)) {
1181 return FILLP_FALSE;
1182 }
1183 }
1184
1185 return FILLP_TRUE;
1186 }
1187
FillpCheckPcbNackListToSend(void * args)1188 void FillpCheckPcbNackListToSend(void *args)
1189 {
1190 struct SpungePcb *pcb = ((struct FillpPcb *)args)->spcb;
1191 struct Hlist *nackList = FILLP_NULL_PTR;
1192 FILLP_LLONG curTime;
1193 struct HlistNode *node = FILLP_NULL_PTR;
1194 struct HlistNode *tmp = FILLP_NULL_PTR;
1195
1196 if (pcb == FILLP_NULL_PTR) {
1197 FILLP_LOGERR("spunge_pcb is NULL");
1198 return;
1199 }
1200
1201 nackList = &(pcb->fpcb.recv.nackList);
1202 if (nackList->size == 0) {
1203 return;
1204 }
1205
1206 curTime = SYS_ARCH_GET_CUR_TIME_LONGLONG();
1207 node = HLIST_FIRST(nackList);
1208 while (node != FILLP_NULL_PTR) {
1209 struct FillpNackNode *nackNode = FillpNackNodeEntry(node);
1210 FILLP_LLONG timestamp = nackNode->timestamp;
1211 /*
1212 Commenting the timeout check again here, since the timing wheel
1213 will ensure that the time has elapsed before invoking this timeout
1214 function
1215 */
1216 if (curTime > timestamp) {
1217 FILLP_UINT32 startPktNum = nackNode->startPktNum;
1218 FILLP_UINT32 endPktNum = nackNode->endPktNum;
1219 FillpSendNack(&(pcb->fpcb), startPktNum, endPktNum);
1220 tmp = node;
1221 node = node->next;
1222 HlistDelete(nackList, tmp);
1223 SpungeFree(nackNode, SPUNGE_ALLOC_TYPE_CALLOC);
1224 nackNode = FILLP_NULL_PTR;
1225 } else {
1226 break;
1227 }
1228 }
1229
1230 /* if all the delay NACKs are sent out, then stop the timer */
1231 if (nackList->size > 0) {
1232 FillpEnableDelayNackTimer((struct FillpPcb *)args);
1233 }
1234 }
1235
SpinstLoopMacTimerChecker(void * p)1236 void SpinstLoopMacTimerChecker(void *p)
1237 {
1238 struct SpungeInstance *inst = (struct SpungeInstance *)p;
1239 /* Check server cookie Refresh */
1240 /* Duration is put as 30minutes, 1 Minute = 60,000 Milliseconds
1241 */
1242 if (((inst->curTime - (FILLP_LLONG)inst->macInfo.switchOverTime) > FILLP_KEY_REFRESH_TIME)) {
1243 FillpMacTimerExpire(&inst->macInfo, inst->curTime);
1244 }
1245 if (!FILLP_TIMING_WHEEL_IS_NODE_ENABLED(&inst->macTimerNode)) {
1246 FillpTimingWheelAddTimer(&inst->timingWheel, (SYS_ARCH_GET_CUR_TIME_LONGLONG() + inst->macTimerNode.interval),
1247 &inst->macTimerNode);
1248 }
1249 }
1250
SpinstLoopFairnessChecker(void * p)1251 void SpinstLoopFairnessChecker(void *p)
1252 {
1253 struct SpungeInstance *inst = (struct SpungeInstance *)p;
1254
1255 if ((g_resource.flowControl.supportFairness == FILLP_FAIRNESS_TYPE_EQUAL_WEIGHT) &&
1256 (inst->rateControl.connectionNum > 0)) {
1257 inst->rateControl.lastControlTime = inst->curTime;
1258 FillpCalculateFairness(inst);
1259 }
1260
1261 if (!FILLP_TIMING_WHEEL_IS_NODE_ENABLED(&inst->fairTimerNode)) {
1262 FillpTimingWheelAddTimer(&inst->timingWheel, (SYS_ARCH_GET_CUR_TIME_LONGLONG() + inst->fairTimerNode.interval),
1263 &inst->fairTimerNode);
1264 }
1265 }
1266
SpungeEnableTokenTimer(struct SpungeTokenBucke * stb)1267 void SpungeEnableTokenTimer(struct SpungeTokenBucke *stb)
1268 {
1269 if (!FILLP_TIMING_WHEEL_IS_NODE_ENABLED(&stb->tockenTimerNode)) {
1270 FillpTimingWheelAddTimer(&stb->inst->timingWheel, stb->tockenTimerNode.interval + stb->inst->curTime,
1271 &stb->tockenTimerNode);
1272 }
1273 }
1274
SpungeDisableTokenTimer(struct SpungeTokenBucke * stb)1275 void SpungeDisableTokenTimer(struct SpungeTokenBucke *stb)
1276 {
1277 if (FILLP_TIMING_WHEEL_IS_NODE_ENABLED(&stb->tockenTimerNode)) {
1278 FillpTimingWheelDelTimer(stb->tockenTimerNode.wheel, &stb->tockenTimerNode);
1279 }
1280 }
1281
SpungeTokenTimerCb(void * p)1282 void SpungeTokenTimerCb(void *p)
1283 {
1284 struct SpungeTokenBucke *stb = (struct SpungeTokenBucke *)p;
1285 struct SpungeInstance *inst = (struct SpungeInstance *)stb->inst;
1286 FILLP_ULLONG bitAdded;
1287 FILLP_UINT32 tokens;
1288
1289 if (stb->rate != g_resource.flowControl.limitRate) {
1290 FILLP_UINT32 rate_bck = stb->rate;
1291 stb->rate = g_resource.flowControl.limitRate;
1292 stb->tokenCount = 0;
1293
1294 if (stb->rate != 0) {
1295 stb->tockenTimerNode.interval = (FILLP_UINT32)(
1296 ((FILLP_ULLONG)stb->maxPktSize * (FILLP_ULLONG)FILLP_FC_IN_KBPS) / (FILLP_ULLONG)stb->rate);
1297 if (stb->tockenTimerNode.interval > SPUNGE_TOKEN_TIMER_MAX_INTERVAL) {
1298 stb->tockenTimerNode.interval = SPUNGE_TOKEN_TIMER_MAX_INTERVAL;
1299 }
1300 } else {
1301 stb->tockenTimerNode.interval = SPUNGE_TOKEN_TIMER_MAX_INTERVAL_RATE_ZERO;
1302 }
1303
1304 FILLP_LOGINF("limite rate change from:%u to:%u, timer_interval:%u, maxPktSize:%u", rate_bck, stb->rate,
1305 stb->tockenTimerNode.interval, stb->maxPktSize);
1306 }
1307
1308 bitAdded = (FILLP_ULLONG)(inst->curTime - stb->lastTime) * (FILLP_ULLONG)stb->rate;
1309 stb->lastTime = inst->curTime;
1310 tokens = (FILLP_UINT32)((bitAdded / (FILLP_ULLONG)FILLP_BPS_TO_KBPS) >> BIT_MOVE_CNT);
1311 if ((tokens < stb->maxPktSize) || (stb->tokenCount < stb->maxPktSize)) {
1312 stb->tokenCount += tokens;
1313 } else {
1314 stb->tokenCount = tokens;
1315 }
1316
1317 if (stb->tockenTimerNode.interval != SPUNGE_TOKEN_TIMER_MAX_INTERVAL_RATE_ZERO) {
1318 SpungeEnableTokenTimer(stb);
1319 }
1320 }
1321
SpungeItemRouteByToken(struct FillpPcbItem * item,struct FillpPcb * fpcb)1322 FILLP_INT SpungeItemRouteByToken(struct FillpPcbItem *item, struct FillpPcb *fpcb)
1323 {
1324 struct SpungeTokenBucke *stb;
1325 FILLP_INT ret = ERR_OK;
1326
1327 stb = &fpcb->pcbInst->stb;
1328
1329 if (stb->tockenTimerNode.interval == SPUNGE_TOKEN_TIMER_MAX_INTERVAL_RATE_ZERO) {
1330 SpungeTokenTimerCb(stb);
1331 }
1332 if ((stb->rate == 0) && (fpcb->send.itemWaitTokenLists.nodeNum == 0)) { /* no limit or limit -> nolimit */
1333 ret = FillpSendItem(item, fpcb);
1334 } else if ((stb->tokenCount >= (FILLP_UINT32)item->dataLen) && (fpcb->send.itemWaitTokenLists.nodeNum == 0)) {
1335 ret = FillpSendItem(item, fpcb);
1336 if (ret == ERR_OK) {
1337 stb->tokenCount -= (FILLP_UINT32)item->dataLen;
1338 }
1339 } else {
1340 if (SkipListInsert(&fpcb->send.itemWaitTokenLists, (void *)item, &item->skipListNode, FILLP_TRUE) != ERR_OK) {
1341 /* this can't be happen */
1342 FILLP_LOGERR("fillp_sock_id:%d Can't add item <%u,%u> to itemWaitTokenLists", FILLP_GET_SOCKET(fpcb)->index,
1343 item->seqNum, item->dataLen);
1344 FillpFreeBufItem(item);
1345 (void)SYS_ARCH_ATOMIC_INC(&(FILLP_GET_SOCKET(fpcb)->sendEventCount), 1);
1346 #ifdef SOCK_SEND_SEM
1347 (void)SYS_ARCH_SEM_POST(&fpcb->send.sendSem);
1348 #endif /* SOCK_SEND_SEM */
1349 } else {
1350 stb->waitPktCount++;
1351 }
1352 }
1353
1354 return ret;
1355 }
1356
SpungeClearItemWaitTokenList(struct SpungeTokenBucke * stb)1357 static void SpungeClearItemWaitTokenList(struct SpungeTokenBucke *stb)
1358 {
1359 struct HlistNode *fpcbNode = HLIST_FIRST(&(stb->tbFpcbLists));
1360 struct FillpPcb *fpcb = FILLP_NULL_PTR;
1361 struct FillpPcbItem *item = FILLP_NULL_PTR;
1362
1363 while (fpcbNode != FILLP_NULL_PTR) {
1364 fpcb = FillpPcbStbNodeEntry(fpcbNode);
1365 fpcbNode = fpcbNode->next;
1366 item = (struct FillpPcbItem *)SkipListPopValue(&(fpcb->send.itemWaitTokenLists));
1367 while (item != FILLP_NULL_PTR) {
1368 stb->waitPktCount--;
1369 /* here item should move to unrecvList, not directly send by udp,
1370 or the sendrate may be over the max send rate */
1371 if (SkipListInsert(&fpcb->send.unrecvList, (void *)item, &item->skipListNode, FILLP_TRUE) != ERR_OK) {
1372 FillpFreeBufItem(item);
1373 (void)SYS_ARCH_ATOMIC_INC(&(FILLP_GET_SOCKET(fpcb)->sendEventCount), 1);
1374 #ifdef SOCK_SEND_SEM
1375 (void)SYS_ARCH_SEM_POST(&fpcb->send.sendSem);
1376 #endif /* SOCK_SEND_SEM */
1377 } else if (item->sendCount > 0) {
1378 fpcb->send.unrecvRedunListBytes += item->dataLen;
1379 }
1380 item = (struct FillpPcbItem *)SkipListPopValue(&(fpcb->send.itemWaitTokenLists));
1381 }
1382
1383 if (fpcb->send.unrecvList.nodeNum != 0) {
1384 FillpEnableSendTimer(fpcb);
1385 }
1386 }
1387
1388 if (stb->waitPktCount != 0) {
1389 FILLP_LOGERR("waitPktCount %llu is not 0", stb->waitPktCount);
1390 stb->waitPktCount = 0;
1391 }
1392 stb->fpcbCur = HLIST_FIRST(&(stb->tbFpcbLists));
1393 return;
1394 }
1395
SpungeCheckItemWaitTokenList(struct SpungeTokenBucke * stb)1396 void SpungeCheckItemWaitTokenList(struct SpungeTokenBucke *stb)
1397 {
1398 struct HlistNode *fpcbNode = FILLP_NULL_PTR;
1399 struct SkipListNode *node = FILLP_NULL_PTR;
1400 struct FillpPcb *fpcb = FILLP_NULL_PTR;
1401 struct FillpPcbItem *item = FILLP_NULL_PTR;
1402 FILLP_UINT32 fpcbCount = (FILLP_UINT32)stb->tbFpcbLists.size;
1403 FILLP_UINT32 waitListEmptyCount = 0;
1404 FILLP_INT err;
1405
1406 if (stb->waitPktCount == 0) {
1407 return;
1408 }
1409
1410 /* stb->rate change from !0 to 0, need to move all item form itemWaitTokenLists to unSendList */
1411 if (stb->rate == 0) {
1412 SpungeClearItemWaitTokenList(stb);
1413 return;
1414 }
1415
1416 fpcbNode = stb->fpcbCur;
1417 while ((stb->tokenCount > 0) && (stb->waitPktCount > 0) && (waitListEmptyCount < fpcbCount)) {
1418 if (fpcbNode == FILLP_NULL_PTR) {
1419 fpcbNode = HLIST_FIRST(&(stb->tbFpcbLists));
1420 }
1421
1422 fpcb = FillpPcbStbNodeEntry(fpcbNode);
1423 node = SkipListGetPop(&(fpcb->send.itemWaitTokenLists));
1424 if (node == FILLP_NULL_PTR) {
1425 fpcbNode = fpcbNode->next;
1426 waitListEmptyCount++;
1427 continue;
1428 }
1429
1430 item = (struct FillpPcbItem *)node->item;
1431 if (stb->tokenCount < item->dataLen) {
1432 break;
1433 }
1434
1435 stb->waitPktCount--;
1436 (void)SkipListPopValue(&fpcb->send.itemWaitTokenLists);
1437 err = FillpSendItem(item, fpcb);
1438 if (err == ERR_OK) {
1439 stb->tokenCount -= (FILLP_UINT32)item->dataLen;
1440 }
1441 fpcbNode = fpcbNode->next;
1442 waitListEmptyCount = 0;
1443 }
1444
1445 stb->fpcbCur = fpcbNode;
1446 }
1447
SpungeInitTokenBucket(struct SpungeInstance * inst)1448 void SpungeInitTokenBucket(struct SpungeInstance *inst)
1449 {
1450 struct SpungeTokenBucke *stb = &inst->stb;
1451
1452 stb->inst = inst;
1453 stb->lastTime = inst->curTime;
1454 stb->rate = g_resource.flowControl.limitRate;
1455 stb->waitPktCount = 0;
1456 stb->tokenCount = 0;
1457 stb->maxPktSize = (FILLP_UINT32)g_appResource.flowControl.pktSize;
1458
1459 stb->fpcbCur = FILLP_NULL_PTR;
1460 HLIST_INIT(&(stb->tbFpcbLists));
1461
1462 FILLP_TIMING_WHEEL_INIT_NODE(&stb->tockenTimerNode);
1463 stb->tockenTimerNode.cbNode.cb = SpungeTokenTimerCb;
1464 stb->tockenTimerNode.cbNode.arg = (void *)stb;
1465 if (stb->rate != 0) {
1466 stb->tockenTimerNode.interval =
1467 (FILLP_UINT32)(((FILLP_ULLONG)stb->maxPktSize * (FILLP_ULLONG)FILLP_FC_IN_KBPS) / (FILLP_ULLONG)stb->rate);
1468 if (stb->tockenTimerNode.interval > SPUNGE_TOKEN_TIMER_MAX_INTERVAL) {
1469 stb->tockenTimerNode.interval = SPUNGE_TOKEN_TIMER_MAX_INTERVAL;
1470 }
1471 } else {
1472 stb->tockenTimerNode.interval = SPUNGE_TOKEN_TIMER_MAX_INTERVAL_RATE_ZERO;
1473 }
1474
1475 FILLP_LOGINF("limite rate:%u, timer_interval:%u, maxPktSize:%u", stb->rate, stb->tockenTimerNode.interval,
1476 stb->maxPktSize);
1477 SpungeEnableTokenTimer(stb);
1478
1479 return;
1480 }
1481
SpungeTokenBucketAddFpcb(struct FillpPcb * fpcb)1482 void SpungeTokenBucketAddFpcb(struct FillpPcb *fpcb)
1483 {
1484 struct SpungeTokenBucke *stb = FILLP_NULL_PTR;
1485
1486 if ((fpcb == FILLP_NULL_PTR) || (fpcb->pcbInst == FILLP_NULL_PTR)) {
1487 return;
1488 }
1489
1490 stb = &fpcb->pcbInst->stb;
1491 if (stb->maxPktSize < (FILLP_UINT32)fpcb->pktSize) {
1492 stb->maxPktSize = (FILLP_UINT32)fpcb->pktSize;
1493 }
1494
1495 HLIST_INIT_NODE(&(fpcb->stbNode));
1496 HlistAddTail(&stb->tbFpcbLists, &(fpcb->stbNode));
1497 FILLP_LOGINF("fillp_sock_id:%d, maxPktSize:%u,"
1498 "limitRate:%u",
1499 FILLP_GET_SOCKET(fpcb)->index, stb->maxPktSize, stb->rate);
1500 }
1501
SpungeTokenBucketDelFpcb(struct FillpPcb * fpcb)1502 void SpungeTokenBucketDelFpcb(struct FillpPcb *fpcb)
1503 {
1504 struct HlistNode *node = FILLP_NULL_PTR;
1505 struct SpungeTokenBucke *stb = FILLP_NULL_PTR;
1506
1507 if ((fpcb == FILLP_NULL_PTR) || (fpcb->pcbInst == FILLP_NULL_PTR)) {
1508 return;
1509 }
1510
1511 stb = &fpcb->pcbInst->stb;
1512 if ((stb->fpcbCur != FILLP_NULL_PTR) && (stb->fpcbCur == &(fpcb->stbNode))) {
1513 stb->fpcbCur = stb->fpcbCur->next;
1514 }
1515
1516 node = HLIST_FIRST(&(stb->tbFpcbLists));
1517 while (node != FILLP_NULL_PTR) {
1518 if (&(fpcb->stbNode) == node) {
1519 stb->waitPktCount -= (FILLP_ULLONG)fpcb->send.itemWaitTokenLists.nodeNum;
1520 HlistDelete(&(stb->tbFpcbLists), node);
1521 FILLP_LOGINF("fillp_sock_id:%d, limitRate:%u", FILLP_GET_SOCKET(fpcb)->index, stb->rate);
1522 break;
1523 }
1524 node = node->next;
1525 }
1526 }
1527
1528 /* Return 1 if still alive , or return 0 */
SpinstLoopCheckAlive(struct SpungeInstance * inst)1529 static int SpinstLoopCheckAlive(struct SpungeInstance *inst)
1530 {
1531 if (inst->waitTobeCoreKilled && FillpKillCore()) {
1532 inst->waitTobeCoreKilled = FILLP_FALSE;
1533 return 0;
1534 }
1535
1536 return 1;
1537 }
1538
SpinstLoopRecv(struct SpungeInstance * inst)1539 static void SpinstLoopRecv(struct SpungeInstance *inst)
1540 {
1541 struct HlistNode *osSockNode;
1542 int readable = 1;
1543 osSockNode = HLIST_FIRST(&inst->osSockist);
1544 /* Select doesn't work with sendmmsg/recvmmsg, so in that case it is always
1545 set as 1 */
1546 while (osSockNode != FILLP_NULL_PTR) {
1547 struct SockOsSocket *osSock = SockOsListEntry(osSockNode);
1548 if (!g_resource.udp.supportMmsg) {
1549 readable = SysioIsSockReadable((void *)osSock->ioSock);
1550 }
1551 osSockNode = osSockNode->next;
1552
1553 if (readable) {
1554 SpungeDoRecvCycle(osSock, inst);
1555 }
1556 }
1557 }
1558
1559 #if !defined(FILLP_LW_LITEOS)
SpungeSetThreadInfo(FILLP_CONST struct SpungeInstance * inst)1560 static void SpungeSetThreadInfo(FILLP_CONST struct SpungeInstance *inst)
1561 {
1562 FILLP_CHAR threadName[SPUNGE_MAX_THREAD_NAME_LENGTH] = {0};
1563 FILLP_UINT8 random = (FILLP_UINT8)(FILLP_RAND() & 0xFF);
1564 (void)inst;
1565 FILLP_INT ret = sprintf_s(threadName, sizeof(threadName), "%s_%u", "Fillp_core", (FILLP_UINT)random);
1566 if (ret < ERR_OK) {
1567 FILLP_LOGWAR("SpungeInstanceMainThread sprintf_s thread name failed(%d), random(%u)", ret, random);
1568 }
1569 (void)SysSetThreadName(threadName, sizeof(threadName));
1570
1571 #if defined(FILLP_LINUX)
1572 {
1573 pthread_t self;
1574 self = pthread_self();
1575 FILLP_LOGINF("FillP Core threadId:%ld", self);
1576 /* thread resource will be auto recycled
1577 only this detach set if no other thread try to join it */
1578 if (pthread_detach(self)) {
1579 FILLP_LOGERR("Set Detach fail");
1580 }
1581 }
1582 #elif defined(FILLP_WIN32)
1583 FILLP_LOGBUTT("FillP Core threadId:%d", GetCurrentThreadId());
1584 #endif
1585 }
1586 #endif
1587
SpungeInstanceMainThread(void * p)1588 void SpungeInstanceMainThread(void *p)
1589 {
1590 struct SpungeInstance *inst = FILLP_NULL_PTR;
1591 FILLP_BOOL isTimeout;
1592
1593 if (p == FILLP_NULL_PTR) {
1594 FILLP_LOGERR("parameter p is NULL");
1595 return;
1596 }
1597
1598 inst = (struct SpungeInstance *)p;
1599 #if !defined(FILLP_LW_LITEOS)
1600 SpungeSetThreadInfo(inst);
1601 #endif
1602
1603 if (SYS_ARCH_SEM_WAIT(&inst->threadSem)) {
1604 FILLP_LOGWAR("sem wait failed");
1605 return;
1606 }
1607 while (inst->hasInited) {
1608 SpungeHandleMsgCycle(inst);
1609 SpungeLoopCheckUnsendBox(inst);
1610 if (!SpinstLoopCheckAlive(inst)) {
1611 break;
1612 }
1613 isTimeout = SpungeMainDelay(inst);
1614 SpinstLoopRecv(inst);
1615
1616 if (isTimeout == FILLP_TRUE) {
1617 FillpTimingWheelLoopCheck(&inst->timingWheel, inst->curTime);
1618 }
1619
1620 SpungeCheckItemWaitTokenList(&inst->stb);
1621 }
1622
1623 SpungeDestroyInstance(inst);
1624 }
1625
SpungePushRecvdDataToStack(void * arg)1626 void SpungePushRecvdDataToStack(void *arg)
1627 {
1628 struct FillpPcb *pcb = (struct FillpPcb *)arg;
1629 struct FillpPcbItem *item = SkipListPopValue(&pcb->recv.recvBoxPlaceInOrder);
1630 while (item != FILLP_NULL_PTR) {
1631 FillpDataToStack(pcb, item);
1632 item = SkipListPopValue(&pcb->recv.recvBoxPlaceInOrder);
1633 }
1634
1635 FillpEnableDataBurstTimer(pcb);
1636
1637 return;
1638 }
1639
1640 #ifdef __cplusplus
1641 }
1642 #endif
1643