• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <stdlib.h>
16 
17 #include "nstackx_dfile_session.h"
18 
19 #include "nstackx_dfile_log.h"
20 #include "nstackx_socket.h"
21 
22 #define TAG "nStackXDFile"
23 
24 #define WAIT_DATA_FRAME_WAIT_US            5 /* Spend 5us to read one file data frame. */
25 
26 #define MAX_NR_IOVCNT       20
27 #define MAX_UDP_PAYLOAD     65507
28 #define MAX_SEND_COUNT      1
29 
GetIovListSize(void)30 static inline uint32_t GetIovListSize(void)
31 {
32     // updated this value from 40 to 20 now
33     return  MAX_NR_IOVCNT;
34 }
35 
AllocIovList(List * head)36 static int32_t AllocIovList(List *head)
37 {
38     uint32_t size = GetIovListSize();
39     IovList *ptr = malloc(sizeof(IovList) * size);
40     if (ptr == NULL) {
41         return NSTACKX_ENOMEM;
42     }
43     for (uint32_t i = 0; i < size; i++) {
44         ptr[i].addr = NULL;
45         ptr[i].len = 0;
46         ListInsertTail(head, &ptr[i].entry);
47     }
48     return NSTACKX_EOK;
49 }
50 
51 #ifndef BUILD_FOR_WINDOWS
52 __attribute__((unused))
53 #endif
GetFreeIovList(DFileSession * s,int32_t tid)54 static IovList *GetFreeIovList(DFileSession *s, int32_t tid)
55 {
56     List *p = &s->freeIovList[tid];
57     List *q = NULL;
58 
59     if (ListIsEmpty(p)) {
60         int32_t err = AllocIovList(p);
61         if (err != NSTACKX_EOK) {
62             return NULL;
63         }
64     }
65 
66     q = ListPopFront(p);
67     return (IovList *)q;
68 }
69 
DestroyIovList(const List * head,DFileSession * s,uint32_t tid)70 void DestroyIovList(const List *head, DFileSession *s, uint32_t tid)
71 {
72     List *p = NULL;
73     List *n = NULL;
74     BlockFrame *block = NULL;
75 
76     (void)s;
77     (void)tid;
78     LIST_FOR_EACH_SAFE(p, n, head) {
79         block = (BlockFrame *)p;
80         ListRemoveNode(p);
81         free(block->fileDataFrame);
82         free(block);
83     }
84 }
85 
TcpSendFileDataFrame(Socket * socket,PeerInfo * peerInfo,List * p,BlockFrame * block,uint16_t len)86 static int32_t TcpSendFileDataFrame(Socket *socket, PeerInfo *peerInfo, List *p, BlockFrame *block, uint16_t len)
87 {
88     int32_t ret;
89     DFileSession *session = peerInfo->session;
90 
91     ret = SocketSend(socket, (uint8_t *)block->fileDataFrame + block->sendLen, len - block->sendLen);
92     if (ret > 0 && ret == (int32_t)(len - block->sendLen)) {
93         block->sendLen = 0;
94         ListRemoveNode(p);
95         free(block->fileDataFrame);
96         free(block);
97         NSTACKX_ATOM_FETCH_INC(&peerInfo->sendCount);
98         NSTACKX_ATOM_FETCH_INC(&peerInfo->intervalSendCount);
99         NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
100     } else if (ret > 0) {
101         NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
102         block->sendLen = block->sendLen + (uint32_t)ret;
103         ret = NSTACKX_EAGAIN;
104     } else if (errno == EAGAIN) {
105         NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
106         ret = NSTACKX_EAGAIN;
107     } else {
108         DFILE_LOGE(TAG, "socket send failed ret is %d errno is %d", ret, errno);
109         ret = NSTACKX_EFAILED;
110     }
111 
112     return ret;
113 }
114 
UdpSendFileDataSuccess(DFileSession * session,PeerInfo * peerInfo,List * p,FileDataFrameZS * f,BlockFrame * block)115 static void UdpSendFileDataSuccess(DFileSession *session, PeerInfo *peerInfo, List *p, FileDataFrameZS *f,
116     BlockFrame *block)
117 {
118     ListRemoveNode(p);
119     free(f);
120     free(block);
121     NSTACKX_ATOM_FETCH_INC(&peerInfo->sendCount);
122     NSTACKX_ATOM_FETCH_INC(&peerInfo->intervalSendCount);
123     NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
124 }
125 
SendFileDataFrame(DFileSession * session,PeerInfo * peerInfo,List * head,uint32_t tid)126 static int32_t SendFileDataFrame(DFileSession *session, PeerInfo *peerInfo, List *head, uint32_t tid)
127 {
128     List *p = NULL;
129     List *n = NULL;
130     BlockFrame *block = NULL;
131     FileDataFrameZS *f = NULL;
132     int32_t ret;
133     uint16_t len;
134     Socket *socket = session->socket[0];
135 
136     if (CapsTcp(session) && (session->sessionType == DFILE_SESSION_TYPE_SERVER)) {
137         socket = session->acceptSocket;
138     }
139 
140     LIST_FOR_EACH_SAFE(p, n, head) {
141         block = (BlockFrame *)p;
142         f = (FileDataFrameZS *)(void *)block->fileDataFrame;
143         len = ntohs(f->header.length) + DFILE_FRAME_HEADER_LEN;
144         if (CapsTcp(session)) {
145             ret = TcpSendFileDataFrame(socket, peerInfo, p, block, len);
146             if (ret == NSTACKX_EFAILED) {
147                 break;
148             } else if (ret == NSTACKX_EAGAIN) {
149                 return ret;
150             }
151         } else {
152             ret = SocketSend(session->socket[peerInfo->socketIndex], (void *)f, len);
153             if (ret > 0) {
154                 UdpSendFileDataSuccess(session, peerInfo, p, f, block);
155             } else if (ret == NSTACKX_EAGAIN) {
156                 NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
157                 return ret;
158             } else {
159                 DFILE_LOGE(TAG, "socket sendto failed");
160                 break;
161             }
162         }
163     }
164 
165     DestroyIovList(head, session, tid);
166 
167     return ret;
168 }
169 
SendFileDataFrameEx(DFileSession * session,PeerInfo * peerInfo,List * head,uint32_t tid)170 static int32_t SendFileDataFrameEx(DFileSession *session, PeerInfo *peerInfo, List *head, uint32_t tid)
171 {
172     return SendFileDataFrame(session, peerInfo, head, tid);
173 }
174 
CheckUnsentList(List * unsent,List * head,int32_t maxCount)175 static int32_t CheckUnsentList(List *unsent, List *head, int32_t maxCount)
176 {
177     int32_t cnt = 0;
178 
179     ListInitHead(head);
180     while (cnt < maxCount && !ListIsEmpty(unsent)) {
181         List *p = ListPopFront(unsent);
182         if (p == NULL) {
183             break;
184         }
185         ListInsertTail(head, p);
186         cnt++;
187     }
188 
189     return cnt;
190 }
191 
GetMaxSendCount(void)192 static int32_t GetMaxSendCount(void)
193 {
194     return MAX_SEND_COUNT;
195 }
196 
DoSendDataFrame(DFileSession * session,List * head,int32_t count,uint32_t tid,uint8_t socketIndex)197 static int32_t DoSendDataFrame(DFileSession *session, List *head, int32_t count, uint32_t tid, uint8_t socketIndex)
198 {
199     BlockFrame *block = NULL;
200     int32_t ret;
201     PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
202     if (!peerInfo) {
203         return NSTACKX_EFAILED;
204     }
205     int32_t maxCount = GetMaxSendCount();
206     int32_t flag;
207     do {
208         while (count < maxCount && FileManagerHasPendingData(session->fileManager)) {
209             ret = FileManagerFileRead(session->fileManager, tid, &block, maxCount - count);
210             if (ret < 0) {
211                 DFILE_LOGE(TAG, "FileManagerFileRead failed %d", ret);
212                 break;
213             }
214             if (ret == 0) {
215                 NSTACKX_ATOM_FETCH_INC(&session->sendBlockListEmptyTimes);
216                 (void)usleep(WAIT_DATA_FRAME_WAIT_US);
217                 continue;
218             }
219             while (block) {
220                 List *next = block->list.next;
221                 ListInsertTail(head, &block->list);
222                 block = (BlockFrame *)(void *)next;
223             }
224             count += ret;
225         }
226 
227         if (count == 0) {
228             NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
229             ret = NSTACKX_EOK;
230             break;
231         }
232         ret = SendFileDataFrameEx(session, peerInfo, head, tid);
233         if (ret <= 0) {
234             break;
235         }
236 
237         count = 0;
238         maxCount = GetMaxSendCount();
239         flag = CapsTcp(session) ? (session->sendRemain ? 0 : 1) :
240             (peerInfo->intervalSendCount < (uint16_t)peerInfo->amendSendRate && !session->closeFlag);
241     } while (flag && (session->stopSendCnt[tid] == 0));
242     return ret;
243 }
244 
245 
246 /*
247  *  * if backpress frame count is not zero then sleep one ack interval and update stopSendCnt
248  *   * if backpress frame count is zero then send packet normally
249  *    */
CheckSendByBackPress(DFileSession * session,uint32_t tid,uint8_t socketIndex)250 static void CheckSendByBackPress(DFileSession *session, uint32_t tid, uint8_t socketIndex)
251 {
252     uint32_t fileProcessCnt;
253     uint32_t sleepTime;
254     uint32_t stopCnt;
255     PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
256     if (peerInfo == NULL) {
257         return;
258     }
259 
260     if (session->stopSendCnt[tid] != 0) {
261         if (PthreadMutexLock(&session->backPressLock) != 0) {
262             DFILE_LOGE(TAG, "pthread backPressLock mutex lock failed");
263             return;
264         }
265 
266         stopCnt = session->stopSendCnt[tid];
267         if (stopCnt == 0) {
268             if (PthreadMutexUnlock(&session->backPressLock) != 0) {
269                 DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
270             }
271             return;
272         }
273 
274         /* fileProcessCnt corresponds to trans one-to-one, one ack interval recv fileProcessCnt backpress frame */
275         fileProcessCnt = session->fileListProcessingCnt + session->smallListProcessingCnt;
276 
277         session->stopSendCnt[tid] = (session->stopSendCnt[tid] > fileProcessCnt) ? (session->stopSendCnt[tid] -
278             fileProcessCnt) : 0;
279 
280         if (PthreadMutexUnlock(&session->backPressLock) != 0) {
281             DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
282             return;
283         }
284 
285         sleepTime = CapsTcp(session) ? NSTACKX_INIT_RATE_STAT_INTERVAL : peerInfo->rateStateInterval;
286 
287 #ifndef NSTACKX_WITH_LITEOS
288         DFILE_LOGI(TAG, "tid %u sleep %u us fileProCnt %u Interval %u lastStopCnt %u stopSendCnt %u", tid, sleepTime,
289              fileProcessCnt, peerInfo->rateStateInterval, stopCnt, session->stopSendCnt[tid]);
290 #endif
291         (void)usleep(sleepTime);
292     }
293 }
294 
SendDataFrame(DFileSession * session,List * unsent,uint32_t tid,uint8_t socketIndex)295 int32_t SendDataFrame(DFileSession *session, List *unsent, uint32_t tid, uint8_t socketIndex)
296 {
297     int32_t ret = NSTACKX_EOK;
298     PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
299     List tmpq;
300 
301     if (peerInfo == NULL) {
302         return NSTACKX_EFAILED;
303     }
304     if (peerInfo->amendSendRate == 0) {
305         return ret;
306     }
307 
308     CheckSendByBackPress(session, tid, socketIndex);
309 
310     int32_t maxCount = GetMaxSendCount();
311     int32_t count = CheckUnsentList(unsent, &tmpq, maxCount);
312     ret = DoSendDataFrame(session, &tmpq, count, tid, socketIndex);
313     if (ret == NSTACKX_EAGAIN) {
314         ListMove(&tmpq, unsent);
315     }
316     return ret;
317 }
318 
SendControlFrame(DFileSession * session,QueueNode * queueNode)319 int32_t SendControlFrame(DFileSession *session, QueueNode *queueNode)
320 {
321     int32_t ret;
322     Socket *socket = NULL;
323 
324     if (CapsTcp(session)) {
325         socket = (session->sessionType == DFILE_SESSION_TYPE_SERVER) ? session->acceptSocket : session->socket[0];
326         ret = SocketSend(socket, queueNode->frame + queueNode->sendLen, queueNode->length - queueNode->sendLen);
327         if (ret > 0 && ret == (int32_t)(queueNode->length - queueNode->sendLen)) {
328             queueNode->sendLen = 0;
329         } else if (ret > 0) {
330             queueNode->sendLen = queueNode->sendLen + (uint32_t)ret;
331             ret = NSTACKX_EAGAIN;
332         } else if (errno == EAGAIN) {
333             ret = NSTACKX_EAGAIN;
334         } else {
335             DFILE_LOGE(TAG, "socket send failed ret is %d errno is %d", ret, errno);
336             ret = NSTACKX_EFAILED;
337         }
338         return ret;
339     }
340 
341     uint8_t socketIndex = queueNode->socketIndex;
342     ret = SocketSend(session->socket[socketIndex], queueNode->frame, queueNode->length);
343     if (ret <= 0) {
344         if (ret != NSTACKX_EAGAIN) {
345             DFILE_LOGE(TAG, "MpEscape. socket:%u send failed. Errno:%d", socketIndex, errno);
346             ret = NSTACKX_EFAILED;
347         }
348     }
349 
350     return ret;
351 }
352 
SendOutboundFrame(DFileSession * session,QueueNode ** preQueueNode)353 int32_t SendOutboundFrame(DFileSession *session, QueueNode **preQueueNode)
354 {
355     QueueNode *queueNode = *preQueueNode;
356     int32_t ret;
357 
358     do {
359         if (PthreadMutexLock(&session->outboundQueueLock) != 0) {
360             DFILE_LOGE(TAG, "Pthread mutex lock failed");
361             ret = NSTACKX_EFAILED;
362             break;
363         }
364         if (queueNode == NULL && session->outboundQueueSize) {
365             queueNode = (QueueNode *)ListPopFront(&session->outboundQueue);
366             session->outboundQueueSize--;
367         }
368         if (PthreadMutexUnlock(&session->outboundQueueLock) != 0) {
369             DFILE_LOGE(TAG, "Pthread mutex unlock failed");
370             ret = NSTACKX_EFAILED;
371             break;
372         }
373         if (queueNode == NULL) {
374             ret = NSTACKX_EOK;
375             break;
376         }
377 
378         uint32_t socketIndex = queueNode->socketIndex;
379         if (session->socket[socketIndex]->protocol == NSTACKX_PROTOCOL_UDP &&
380             session->socket[socketIndex]->isServer == NSTACKX_TRUE) {
381             session->socket[socketIndex]->dstAddr = queueNode->peerAddr;
382         }
383 
384         ret = SendControlFrame(session, queueNode);
385         if (ret <= 0) {
386             break;
387         }
388         /* Send ok, try to get next frame. */
389         DestroyQueueNode(queueNode);
390         queueNode = NULL;
391         NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
392     } while (!session->closeFlag);
393 
394     if (ret == NSTACKX_EAGAIN) {
395         *preQueueNode = queueNode;
396     } else {
397         *preQueueNode = NULL;
398         DestroyQueueNode(queueNode);
399         queueNode = NULL;
400     }
401     return ret;
402 }
403 
TcpSocketRecv(DFileSession * session,uint8_t * buffer,size_t length,struct sockaddr_in * srcAddr,const socklen_t * addrLen)404 int32_t TcpSocketRecv(DFileSession *session, uint8_t *buffer, size_t length, struct sockaddr_in *srcAddr,
405     const socklen_t *addrLen)
406 {
407     int32_t ret;
408     int recvLen = 0;
409 
410     Socket *socket = session->socket[0];
411 
412     if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
413         socket = session->acceptSocket;
414     }
415 
416     while (recvLen < (int32_t)length) {
417         ret = SocketRecv(socket, buffer + session->recvLen, length - (size_t)recvLen, srcAddr, addrLen);
418         if (ret == 0) {
419             return NSTACKX_PEER_CLOSE;
420         }
421         if (ret < 0) {
422             if (errno != EAGAIN) {
423                 ret = NSTACKX_EFAILED;
424                 return ret;
425             } else {
426                 return NSTACKX_EAGAIN;
427             }
428         }
429         recvLen = recvLen + ret;
430         session->recvLen = session->recvLen + (uint32_t)ret;
431     }
432 
433     return recvLen;
434 }
435 
SocketRecvForTcp(DFileSession * session,uint8_t * buffer,struct sockaddr_in * srcAddr,const socklen_t * addrLen)436 int32_t SocketRecvForTcp(DFileSession *session, uint8_t *buffer, struct sockaddr_in *srcAddr,
437     const socklen_t *addrLen)
438 {
439     int32_t ret;
440     uint16_t payloadLen;
441     DFileFrameHeader *frameHeader = NULL;
442     size_t length = sizeof(DFileFrameHeader);
443     if (session->recvLen < length) {
444         ret = TcpSocketRecv(session, buffer, length - session->recvLen, srcAddr, addrLen);
445         if (ret <= 0) {
446             return ret;
447         }
448     }
449 
450     frameHeader = (DFileFrameHeader *)(session->recvBuffer);
451     payloadLen = ntohs(frameHeader->length);
452     if (payloadLen > NSTACKX_RECV_BUFFER_LEN - length) {
453         DFILE_LOGI(TAG, "header length is %u recv length is %u payloadLen is %u type %u", length,
454              session->recvLen, payloadLen, frameHeader->type);
455         return NSTACKX_EFAILED;
456     }
457 
458     if ((session->recvLen - length) < payloadLen) {
459         ret = TcpSocketRecv(session, buffer, payloadLen - (session->recvLen - length), srcAddr, addrLen);
460         if (ret <= 0) {
461             return ret;
462         }
463     }
464 
465     return (int32_t)(session->recvLen);
466 }
467