1 /*
2 * Copyright (C) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "fillp_flow_control.h"
17 #include "res.h"
18 #include "spunge_stack.h"
19 #include "fillp_algorithm.h"
20 #include "fillp_common.h"
21 #include "fillp_output.h"
22 #include "fillp_dfx.h"
23
24 #ifdef __cplusplus
25 extern "C" {
26 #endif
27
28 #define FILLP_RECV_RATE_INDEX_FIRST 0
29 #define FILLP_RECV_RATE_INDEX_NEXT 1
30 #define FILLP_RECV_RATE_INDEX_THIRD 2
31
FillpAlg2GetRedunCount(void * argPcb,void * argItem)32 FILLP_UINT16 FillpAlg2GetRedunCount(void *argPcb, void *argItem)
33 {
34 (void)argPcb;
35 (void)argItem;
36 return 1;
37 }
38
FillpAlg1GetRedunCount(void * argPcb,void * argItem)39 FILLP_UINT16 FillpAlg1GetRedunCount(void *argPcb, void *argItem)
40 {
41 (void)argPcb;
42 (void)argItem;
43 return 1;
44 }
45
FillpRecvRateIsBigger(struct FillpRateSample * rateSample,FILLP_UINT32 maxCnt,FILLP_UINT32 indexK)46 static FILLP_BOOL FillpRecvRateIsBigger(struct FillpRateSample *rateSample, FILLP_UINT32 maxCnt,
47 FILLP_UINT32 indexK)
48 {
49 FILLP_UINT32 indexN = indexK - 1;
50 if (rateSample[indexK].v > rateSample[indexN].v) {
51 struct FillpRateSample tmp = rateSample[indexK];
52 rateSample[indexK] = rateSample[indexN];
53 rateSample[indexN] = tmp;
54 return FILLP_TRUE;
55 }
56 FILLP_UNUSED_PARA(maxCnt);
57
58 return FILLP_FALSE;
59 }
60
FillpUpdateRecvRateSample(struct FillpMaxRateSample * maxRateSample,FILLP_UINT32 rateValue,FILLP_UINT8 rateI)61 void FillpUpdateRecvRateSample(struct FillpMaxRateSample *maxRateSample, FILLP_UINT32 rateValue,
62 FILLP_UINT8 rateI)
63 {
64 FILLP_UINT32 index;
65 FILLP_UINT32 indexK;
66 struct FillpRateSample val;
67 struct FillpRateSample *rateSample = maxRateSample->rateSample;
68
69 val.i = rateI;
70 val.v = rateValue;
71
72 // m->s[] stores few numbers of maximal rates, when do update, if the index is already in s->m[], just resort
73 // or , we need to insert a new one
74 for (index = 0; index < maxRateSample->maxCnt; index++) {
75 if (rateSample[index].i == val.i) {
76 break;
77 }
78 }
79 // Now m->s[index] not include pack_index, means we need to update the m->s[]
80 if (index >= maxRateSample->maxCnt) {
81 for (indexK = 0; indexK < maxRateSample->maxCnt; indexK++) {
82 if (val.v > rateSample[indexK].v) {
83 struct FillpRateSample tmp = rateSample[indexK];
84 rateSample[indexK] = val;
85 val = tmp;
86 }
87 }
88 return;
89 }
90
91 // Don't need to re-sort the whole list, because the list always be sorted already
92 // such as if val.v > m->s[index].v, then just need to update the upper ones
93 if (rateSample[index].v > val.v) { // The new value is lighter , then float up
94 rateSample[index].v = val.v;
95
96 for (indexK = index; indexK < maxRateSample->maxCnt - 1; indexK++) {
97 FILLP_UINT32 indexN = indexK + 1;
98 if (rateSample[indexK].v < rateSample[indexN].v) {
99 struct FillpRateSample tmp = rateSample[indexK];
100 rateSample[indexK] = rateSample[indexN];
101 rateSample[indexN] = tmp;
102 continue;
103 }
104
105 break;
106 }
107 } else { // The new value is bigger, then sink down
108 rateSample[index].v = val.v;
109
110 for (indexK = index; indexK > 0; indexK--) {
111 if (FillpRecvRateIsBigger(rateSample, maxRateSample->maxCnt - 1, indexK)) {
112 continue;
113 }
114
115 break;
116 }
117 }
118
119 FILLP_LOGDBG("max expired pack_index %u the max is %u the 2ed max is %u the 3th max is %u,recv rate is %u", rateI,
120 rateSample[FILLP_RECV_RATE_INDEX_FIRST].v, rateSample[FILLP_RECV_RATE_INDEX_NEXT].v,
121 rateSample[FILLP_RECV_RATE_INDEX_THIRD].v, rateValue);
122 }
123
FillpAppLimitedStatus(struct FillpPcb * pcb,FILLP_UINT32 beginPktNum,FILLP_UINT32 endPktNum)124 FILLP_BOOL FillpAppLimitedStatus(struct FillpPcb *pcb, FILLP_UINT32 beginPktNum, FILLP_UINT32 endPktNum)
125 {
126 struct FillpHashLlist *mapList = &pcb->send.pktSeqMap;
127 struct Hlist *list = FILLP_NULL_PTR;
128 struct HlistNode *pos = FILLP_NULL_PTR;
129 struct FillpPcbItem *item = FILLP_NULL_PTR;
130 FILLP_UINT32 i, j;
131 FILLP_UINT32 mapLevel;
132 FILLP_BOOL appLimited = FILLP_FALSE;
133
134 if (!FillpNumIsbigger(endPktNum, beginPktNum)) {
135 return appLimited;
136 }
137
138 for (i = beginPktNum, j = 0; !FillpNumIsbigger(i, endPktNum) && j < mapList->count; i++, j++) {
139 mapLevel = (FILLP_UINT32)(i & mapList->hashModSize);
140 list = &mapList->hashMap[mapLevel];
141 pos = HLIST_FIRST(list);
142 while (pos != FILLP_NULL_PTR) {
143 item = FillpPcbPktSeqMapNodeEntry(pos);
144 if (FillpNumIsbigger(item->pktNum, endPktNum)) {
145 break;
146 }
147
148 if ((!FillpNumIsbigger(beginPktNum, item->pktNum)) &&
149 UTILS_FLAGS_CHECK(item->flags, FILLP_ITEM_FLAGS_APP_LIMITED)) {
150 appLimited = FILLP_TRUE;
151 break;
152 }
153 pos = pos->next;
154 }
155
156 if (appLimited == FILLP_TRUE) {
157 break;
158 }
159 }
160
161 return appLimited;
162 }
163
FillpCalSendInterval(struct FillpPcb * pcb)164 void FillpCalSendInterval(struct FillpPcb *pcb)
165 {
166 struct FillpFlowControl *flowControl = &pcb->send.flowControl;
167 struct FtSocket *sock = FILLP_GET_SOCKET(pcb);
168
169 if (sock->resConf.flowControl.constRateEnbale) {
170 flowControl->sendRate = sock->resConf.flowControl.maxRate;
171 }
172
173 if (flowControl->sendRate == 0) {
174 flowControl->sendInterval = FILLP_NULL;
175 return;
176 }
177
178 /* The rate is calculated based on Kbps, hence multiplied by 8 and 1000 */
179 flowControl->sendInterval = (FILLP_LLONG)(pcb->pktSize * FILLP_FC_IN_KBPS * FILLP_FC_IN_BIT);
180 /* need round up to avoid sendInterval is smaller */
181 flowControl->sendInterval = FILLP_DIV_ROUND_UP(flowControl->sendInterval, (FILLP_LLONG)flowControl->sendRate);
182 if (flowControl->sendInterval < FILLP_NULL) {
183 flowControl->sendInterval = FILLP_NULL;
184 }
185
186 pcb->sendTimerNode.interval = (FILLP_UINT32)(flowControl->sendInterval / FILLP_FC_IN_BIT);
187 FILLP_LOGDBG("Send interval %lld, timer_interval:%u", flowControl->sendInterval, pcb->sendTimerNode.interval);
188
189 return;
190 }
191
FillpFcTailProtected(struct FillpPcb * pcb,struct FillpPktPack * pack)192 void FillpFcTailProtected(struct FillpPcb *pcb, struct FillpPktPack *pack)
193 {
194 struct FillpTailLostProtected *tailProtect = FILLP_NULL_PTR;
195 FILLP_LLONG deltaUs;
196 FILLP_BOOL isDataWaitedEmpty;
197 FILLP_UINT32 infBytes = 0;
198 FILLP_UINT32 infCap = 0;
199
200 struct FillpPktHead *pktHdr = (struct FillpPktHead *)pack->head;
201 FILLP_UINT32 ackSeqNum = pktHdr->seqNum;
202 FILLP_UINT32 lostSeqNum = pack->lostSeq;
203
204 FILLP_UINT32 unackNum = pcb->send.unackList.count;
205 FILLP_UINT32 unsendSize =
206 pcb->send.unrecvList.nodeNum + pcb->send.itemWaitTokenLists.nodeNum + pcb->send.redunList.nodeNum;
207 isDataWaitedEmpty = (unsendSize == 0);
208
209 unsendSize += pcb->send.unSendList.size;
210 isDataWaitedEmpty = (unsendSize == 0) && (SpungeConnCheckUnsendBoxEmpty(FILLP_GET_CONN(pcb)) == FILLP_TRUE);
211
212 deltaUs = pcb->pcbInst->curTime - pcb->send.lastSendTs;
213
214 /* ackSeqNum equal to lostSeqNum, peer doesn't recv valid packet which can be give to app */
215 tailProtect = &pcb->send.tailProtect;
216 if ((ackSeqNum == lostSeqNum) && (ackSeqNum == tailProtect->lastPackSeq) && (unackNum != 0) &&
217 (pack->rate == 0) && isDataWaitedEmpty && (pcb->statistics.debugPcb.curPackDeltaUs != 0) &&
218 (deltaUs >= pcb->statistics.pack.packIntervalBackup)) {
219 tailProtect->samePackCount++;
220 if (tailProtect->samePackCount >= tailProtect->judgeThreshold) {
221 FILLP_LOGDTL("fillp_sock_id:%d tail protection active,Threshold:%u,infBytes:%u,"
222 "infCap:%u,unSendList:%u,unackList:%u, ackSeqNum%u",
223 FILLP_GET_SOCKET(pcb)->index, tailProtect->judgeThreshold, infBytes, infCap,
224 pcb->send.unSendList.size, pcb->send.unackList.count, ackSeqNum);
225 FillpMoveUnackToUnrecv(ackSeqNum, pcb->send.seqNum, pcb, FILLP_FALSE);
226 tailProtect->judgeThreshold = tailProtect->maxJudgeThreshold;
227 tailProtect->samePackCount = FILLP_NULL;
228 }
229 } else {
230 pcb->send.tailProtect.judgeThreshold = tailProtect->minJudgeThreshold;
231 pcb->send.tailProtect.samePackCount = FILLP_NULL;
232 pcb->send.tailProtect.lastPackSeq = ackSeqNum;
233 }
234
235 return;
236 }
237
FillpFcPackInput(struct FillpPcb * pcb,struct FillpPktPack * pack)238 void FillpFcPackInput(struct FillpPcb *pcb, struct FillpPktPack *pack)
239 {
240 if (pcb->algFuncs.analysisPack != FILLP_NULL_PTR) {
241 pcb->algFuncs.analysisPack(pcb, (void *)pack);
242 }
243
244 if (!(pack->flag & FILLP_PACK_FLAG_REQURE_RTT)) {
245 FillpFcTailProtected(pcb, pack);
246 }
247
248 return;
249 }
250
FillpFcNackInput(struct FillpPcb * pcb,struct FillpPktNack * nack)251 void FillpFcNackInput(struct FillpPcb *pcb, struct FillpPktNack *nack)
252 {
253 if (pcb->algFuncs.analysisNack != FILLP_NULL_PTR) {
254 pcb->algFuncs.analysisNack(pcb, (void *)nack);
255 }
256 }
257
FillpGetAlgFun(struct FillpPcb * pcb)258 static int FillpGetAlgFun(struct FillpPcb *pcb)
259 {
260 switch (pcb->fcAlg) {
261 case FILLP_SUPPORT_ALG_BASE:
262 pcb->algFuncs = g_fillpAlg0;
263 break;
264 case FILLP_SUPPORT_ALG_3:
265 pcb->algFuncs = g_fillpAlg0;
266 break;
267 default:
268 FILLP_LOGERR("flow control not set");
269 return -1;
270 }
271 return 0;
272 }
273
FillpFcInit(struct FillpPcb * pcb)274 FILLP_INT FillpFcInit(struct FillpPcb *pcb)
275 {
276 FILLP_INT ret = ERR_OK;
277
278 if (pcb == FILLP_NULL_PTR) {
279 FILLP_LOGERR("pcb null");
280 return -1;
281 }
282
283 if (pcb->send.slowStart) {
284 /* Sender interval, be used to control the sending rate kbits/s */
285 pcb->send.flowControl.sendRate = FILLP_INITIAL_RATE;
286 FILLP_LOGDBG("slowStart:%u init_rate:%u", pcb->send.slowStart, pcb->send.flowControl.sendRate);
287 } else {
288 /* The maxRate configured by the user is in Mbps, hence multiplied by
289 100 to get the value in Kbps */
290 pcb->send.flowControl.sendRate = g_resource.flowControl.maxRate;
291 FILLP_LOGDBG("slowStart not enabled, init_rate:%u", pcb->send.flowControl.sendRate);
292 }
293
294 pcb->send.flowControl.sendTime = 0;
295 pcb->send.flowControl.sendRateLimit = 0;
296 pcb->send.flowControl.remainBytes = 0;
297 pcb->send.flowControl.lastCycleNoEnoughData = FILLP_FALSE;
298 pcb->send.flowControl.sendOneNoData = FILLP_TRUE;
299
300 pcb->send.tailProtect.lastPackSeq = 0;
301 pcb->send.tailProtect.samePackCount = 0;
302
303 pcb->statistics.keepAlive.lastRecvTime = pcb->pcbInst->curTime;
304 pcb->statistics.keepAlive.lastDataRecvTime = pcb->pcbInst->curTime;
305
306 pcb->send.flowControl.fcAlg = FILLP_NULL_PTR;
307 FILLP_LOGERR("fillp_sock_id:%d, fc alg:%xh, characters:%xh, peer_alg:%xh, peerCharacters:%xh",
308 FILLP_GET_SOCKET(pcb)->index, pcb->fcAlg, pcb->characters, FILLP_GET_CONN(pcb)->peerFcAlgs,
309 FILLP_GET_CONN(pcb)->peerCharacters);
310 if (FillpGetAlgFun(pcb) != 0) {
311 return -1;
312 }
313
314 FillpAdjustFcParamsByRtt(pcb);
315
316 if (pcb->algFuncs.fcInit != FILLP_NULL_PTR) {
317 ret = pcb->algFuncs.fcInit(pcb);
318 }
319
320 return ret;
321 }
322
FillpFcDeinit(struct FillpPcb * pcb)323 void FillpFcDeinit(struct FillpPcb *pcb)
324 {
325 if (pcb->algFuncs.fcDeinit != FILLP_NULL_PTR) {
326 pcb->algFuncs.fcDeinit(pcb);
327 }
328 pcb->send.flowControl.fcAlg = FILLP_NULL_PTR;
329 }
330
331 /* recv a data packet */
FillpFcDataInput(struct FillpPcb * pcb,FILLP_CONST struct FillpPktHead * pkt)332 void FillpFcDataInput(struct FillpPcb *pcb, FILLP_CONST struct FillpPktHead *pkt)
333 {
334 pcb->statistics.traffic.totalRecved++;
335
336 if (pcb->statistics.traffic.totalRecved == 1) {
337 FILLP_LOGDBG("fillp_sock_id:%d "
338 "First data receiving time =%lld, recv seq num = %u, recv pkt num = %u \r\n",
339 FILLP_GET_SOCKET(pcb)->index, pcb->pcbInst->curTime, pcb->recv.seqNum, pcb->recv.pktNum);
340 }
341
342 pcb->statistics.traffic.totalRecvedBytes += ((FILLP_UINT32)pkt->dataLen);
343 pcb->statistics.pack.periodRecvedOnes++;
344 pcb->statistics.pack.periodRecvBits += FILLP_FC_VAL_IN_BITS((FILLP_ULLONG)pkt->dataLen);
345
346 return;
347 }
348
349 /* discard a data packet */
FillpFcRecvDropOne(struct FillpPcb * pcb)350 void FillpFcRecvDropOne(struct FillpPcb *pcb)
351 {
352 pcb->statistics.pack.periodDroped++;
353 pcb->statistics.traffic.totalDroped++;
354
355 return;
356 }
357
358 /* recv an packet outof order */
FillpFcRecvOutOfOrder(struct FillpPcb * pcb)359 void FillpFcRecvOutOfOrder(struct FillpPcb *pcb)
360 {
361 pcb->statistics.traffic.totalOutOfOrder++;
362
363 return;
364 }
365
366 /* calculate the lost packets on recv side */
FillpFcRecvLost(struct FillpPcb * pcb,FILLP_UINT32 ones)367 void FillpFcRecvLost(struct FillpPcb *pcb, FILLP_UINT32 ones)
368 {
369 pcb->statistics.traffic.totalRecvLost += ones;
370
371 return;
372 }
373
FillpFcCycle(void * arg)374 void FillpFcCycle(void *arg)
375 {
376 struct FillpPcb *pcb = (struct FillpPcb *)arg;
377 /* The unit of the time returned here is micro seconds */
378 FILLP_LLONG detaTime;
379 struct FtNetconn *netconn = FILLP_GET_CONN(pcb);
380 struct FtSocket *sock;
381
382 sock = (struct FtSocket *)netconn->sock;
383
384 if (sock->isListenSock) {
385 FILLP_LOGERR("Listen socket should not hit here!!!");
386 return;
387 }
388
389 detaTime = pcb->pcbInst->curTime - pcb->statistics.keepAlive.lastRecvTime;
390
391 if (detaTime >= (FILLP_LLONG)FILLP_UTILS_MS2US((FILLP_LLONG)sock->resConf.common.keepAliveTime)) {
392 FILLP_LOGERR("Keep alive timeout, fillp_sock_id:%d,detaTime:%lld,keepAliveTime:%u(ms)",
393 sock->index, detaTime, sock->resConf.common.keepAliveTime);
394
395 FillpDfxSockLinkAndQosNotify(sock, FILLP_DFX_LINK_KEEPALIVE_TIMEOUT);
396 SpungeShutdownSock(sock, SPUNGE_SHUT_RDWR);
397 sock->errEvent |= SPUNGE_EPOLLERR;
398 SpungeEpollEventCallback(sock, (FILLP_INT)SPUNGE_EPOLLIN | (FILLP_INT)SPUNGE_EPOLLERR, 1);
399 SpungeConnClosed(FILLP_GET_CONN(pcb));
400 return;
401 }
402
403 pcb->keepAliveTimerNode.interval =
404 (FILLP_UINT32)(FILLP_UTILS_MS2US((FILLP_LLONG)sock->resConf.common.keepAliveTime) - detaTime);
405 FILLP_LOGDTL("update the keep alive interval to %u, fillp_sock_id:%d, detaTime:%lld, keepAliveTime:%u(ms)",
406 pcb->keepAliveTimerNode.interval, sock->index, detaTime, sock->resConf.common.keepAliveTime);
407 FillpEnableKeepAliveTimer(pcb);
408
409 return;
410 }
411
412 #ifdef __cplusplus
413 }
414 #endif
415