• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "nstackx_dfile_session.h"
17 
18 #include "nstackx_dfile_log.h"
19 #include "nstackx_socket.h"
20 
21 #define TAG "nStackXDFile"
22 
23 #define WAIT_DATA_FRAME_WAIT_US            5 /* Spend 5us to read one file data frame. */
24 
25 #define MAX_NR_IOVCNT       20
26 #define MAX_UDP_PAYLOAD     65507
27 
GetIovListSize(void)28 static inline uint32_t GetIovListSize(void)
29 {
30     // updated this value from 40 to 20 now
31     return  MAX_NR_IOVCNT;
32 }
33 
AllocIovList(List * head)34 static int32_t AllocIovList(List *head)
35 {
36     uint32_t size = GetIovListSize();
37     IovList *ptr = malloc(sizeof(IovList) * size);
38     if (ptr == NULL) {
39         return NSTACKX_ENOMEM;
40     }
41     for (uint32_t i = 0; i < size; i++) {
42         ptr[i].addr = NULL;
43         ptr[i].len = 0;
44         ListInsertTail(head, &ptr[i].entry);
45     }
46     return NSTACKX_EOK;
47 }
48 
49 #ifndef BUILD_FOR_WINDOWS
50 __attribute__((unused))
51 #endif
GetFreeIovList(DFileSession * s,int32_t tid)52 static IovList *GetFreeIovList(DFileSession *s, int32_t tid)
53 {
54     List *p = &s->freeIovList[tid];
55     List *q = NULL;
56 
57     if (ListIsEmpty(p)) {
58         int32_t err = AllocIovList(p);
59         if (err != NSTACKX_EOK) {
60             return NULL;
61         }
62     }
63 
64     q = ListPopFront(p);
65     return (IovList *)q;
66 }
67 
DestroyIovList(const List * head,DFileSession * s,uint32_t tid)68 void DestroyIovList(const List *head, DFileSession *s, uint32_t tid)
69 {
70     List *p = NULL;
71     List *n = NULL;
72     BlockFrame *block = NULL;
73 
74     (void)s;
75     (void)tid;
76     LIST_FOR_EACH_SAFE(p, n, head) {
77         block = (BlockFrame *)p;
78         ListRemoveNode(p);
79         free(block->fileDataFrame);
80         free(block);
81     }
82 }
83 
TcpSendFileDataFrame(Socket * socket,PeerInfo * peerInfo,List * p,BlockFrame * block,uint16_t len)84 static int32_t TcpSendFileDataFrame(Socket *socket, PeerInfo *peerInfo, List *p, BlockFrame *block, uint16_t len)
85 {
86     int32_t ret;
87     DFileSession *session = peerInfo->session;
88 
89     ret = SocketSend(socket, (uint8_t *)block->fileDataFrame + block->sendLen, len - block->sendLen);
90     if (ret > 0 && ret == (int32_t)(len - block->sendLen)) {
91         block->sendLen = 0;
92         ListRemoveNode(p);
93         free(block->fileDataFrame);
94         free(block);
95         NSTACKX_ATOM_FETCH_INC(&peerInfo->sendCount);
96         NSTACKX_ATOM_FETCH_INC(&peerInfo->intervalSendCount);
97         NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
98     } else if (ret > 0) {
99         NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
100         block->sendLen = block->sendLen + (uint32_t)ret;
101         ret = NSTACKX_EAGAIN;
102     } else if (errno == EAGAIN) {
103         NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
104         ret = NSTACKX_EAGAIN;
105     } else {
106         DFILE_LOGE(TAG, "socket send failed ret is %d errno is %d", ret, errno);
107         ret = NSTACKX_EFAILED;
108     }
109 
110     return ret;
111 }
112 
UdpSendFileDataSuccess(DFileSession * session,PeerInfo * peerInfo,List * p,FileDataFrameZS * f,BlockFrame * block)113 static void UdpSendFileDataSuccess(DFileSession *session, PeerInfo *peerInfo, List *p, FileDataFrameZS *f,
114     BlockFrame *block)
115 {
116     ListRemoveNode(p);
117     free(f);
118     free(block);
119     NSTACKX_ATOM_FETCH_INC(&peerInfo->sendCount);
120     NSTACKX_ATOM_FETCH_INC(&peerInfo->intervalSendCount);
121     NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
122 }
123 
SendFileDataFrame(DFileSession * session,PeerInfo * peerInfo,List * head,uint32_t tid)124 static int32_t SendFileDataFrame(DFileSession *session, PeerInfo *peerInfo, List *head, uint32_t tid)
125 {
126     List *p = NULL;
127     List *n = NULL;
128     BlockFrame *block = NULL;
129     FileDataFrameZS *f = NULL;
130     int32_t ret;
131     uint16_t len;
132     Socket *socket = session->socket[0];
133 
134     if (CapsTcp(session) && (session->sessionType == DFILE_SESSION_TYPE_SERVER)) {
135         socket = session->acceptSocket;
136     }
137 
138     LIST_FOR_EACH_SAFE(p, n, head) {
139         block = (BlockFrame *)p;
140         f = (FileDataFrameZS *)(void *)block->fileDataFrame;
141         len = ntohs(f->header.length) + DFILE_FRAME_HEADER_LEN;
142         if (CapsTcp(session)) {
143             ret = TcpSendFileDataFrame(socket, peerInfo, p, block, len);
144             if (ret == NSTACKX_EFAILED) {
145                 break;
146             } else if (ret == NSTACKX_EAGAIN) {
147                 return ret;
148             }
149         } else {
150             ret = SocketSend(session->socket[peerInfo->socketIndex], (void *)f, len);
151             if (ret > 0) {
152                 UdpSendFileDataSuccess(session, peerInfo, p, f, block);
153             } else if (ret == NSTACKX_EAGAIN) {
154                 NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
155                 return ret;
156             } else {
157                 DFILE_LOGE(TAG, "socket sendto failed");
158                 break;
159             }
160         }
161     }
162 
163     DestroyIovList(head, session, tid);
164 
165     return ret;
166 }
167 
SendFileDataFrameEx(DFileSession * session,PeerInfo * peerInfo,List * head,uint32_t tid)168 static int32_t SendFileDataFrameEx(DFileSession *session, PeerInfo *peerInfo, List *head, uint32_t tid)
169 {
170     return SendFileDataFrame(session, peerInfo, head, tid);
171 }
172 
CheckUnsentList(List * unsent,List * head,int32_t maxCount)173 static int32_t CheckUnsentList(List *unsent, List *head, int32_t maxCount)
174 {
175     int32_t cnt = 0;
176 
177     ListInitHead(head);
178     while (cnt < maxCount && !ListIsEmpty(unsent)) {
179         List *p = ListPopFront(unsent);
180         if (p == NULL) {
181             break;
182         }
183         ListInsertTail(head, p);
184         cnt++;
185     }
186 
187     return cnt;
188 }
189 
GetMaxSendCount(void)190 static int32_t GetMaxSendCount(void)
191 {
192     return 1;
193 }
194 
DoSendDataFrame(DFileSession * session,List * head,int32_t count,uint32_t tid,uint8_t socketIndex)195 static int32_t DoSendDataFrame(DFileSession *session, List *head, int32_t count, uint32_t tid, uint8_t socketIndex)
196 {
197     BlockFrame *block = NULL;
198     int32_t ret;
199     PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
200     if (!peerInfo) {
201         return NSTACKX_EFAILED;
202     }
203     int32_t maxCount = GetMaxSendCount();
204     int32_t flag;
205     do {
206         while (count < maxCount && FileManagerHasPendingData(session->fileManager)) {
207             ret = FileManagerFileRead(session->fileManager, tid, &block, maxCount - count);
208             if (ret < 0) {
209                 DFILE_LOGE(TAG, "FileManagerFileRead failed %d", ret);
210                 break;
211             }
212             if (ret == 0) {
213                 NSTACKX_ATOM_FETCH_INC(&session->sendBlockListEmptyTimes);
214                 (void)usleep(WAIT_DATA_FRAME_WAIT_US);
215                 continue;
216             }
217             while (block) {
218                 List *next = block->list.next;
219                 ListInsertTail(head, &block->list);
220                 block = (BlockFrame *)(void *)next;
221             }
222             count += ret;
223         }
224 
225         if (count == 0) {
226             NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
227             ret = NSTACKX_EOK;
228             break;
229         }
230         ret = SendFileDataFrameEx(session, peerInfo, head, tid);
231         if (ret <= 0) {
232             break;
233         }
234 
235         count = 0;
236         maxCount = GetMaxSendCount();
237         flag = CapsTcp(session) ? (session->sendRemain ? 0 : 1) :
238             (peerInfo->intervalSendCount < (uint16_t)peerInfo->amendSendRate && !session->closeFlag);
239     } while (flag && (session->stopSendCnt[tid] == 0));
240     return ret;
241 }
242 
243 
244 /*
245  *  * if backpress frame count is not zero then sleep one ack interval and update stopSendCnt
246  *   * if backpress frame count is zero then send packet normally
247  *    */
CheckSendByBackPress(DFileSession * session,uint32_t tid,uint8_t socketIndex)248 static void CheckSendByBackPress(DFileSession *session, uint32_t tid, uint8_t socketIndex)
249 {
250     uint32_t fileProcessCnt;
251     uint32_t sleepTime;
252     uint32_t stopCnt;
253     PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
254     if (peerInfo == NULL) {
255         return;
256     }
257 
258     if (session->stopSendCnt[tid] != 0) {
259         if (PthreadMutexLock(&session->backPressLock) != 0) {
260             DFILE_LOGE(TAG, "pthread backPressLock mutex lock failed");
261             return;
262         }
263 
264         stopCnt = session->stopSendCnt[tid];
265         if (stopCnt == 0) {
266             if (PthreadMutexUnlock(&session->backPressLock) != 0) {
267                 DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
268             }
269             return;
270         }
271 
272         /* fileProcessCnt corresponds to trans one-to-one, one ack interval recv fileProcessCnt backpress frame */
273         fileProcessCnt = session->fileListProcessingCnt + session->smallListProcessingCnt;
274 
275         session->stopSendCnt[tid] = (session->stopSendCnt[tid] > fileProcessCnt) ? (session->stopSendCnt[tid] -
276             fileProcessCnt) : 0;
277 
278         if (PthreadMutexUnlock(&session->backPressLock) != 0) {
279             DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
280             return;
281         }
282 
283         sleepTime = CapsTcp(session) ? NSTACKX_INIT_RATE_STAT_INTERVAL : peerInfo->rateStateInterval;
284 
285 #ifndef NSTACKX_WITH_LITEOS
286         DFILE_LOGI(TAG, "tid %u sleep %u us fileProCnt %u Interval %u lastStopCnt %u stopSendCnt %u", tid, sleepTime,
287              fileProcessCnt, peerInfo->rateStateInterval, stopCnt, session->stopSendCnt[tid]);
288 #endif
289         (void)usleep(sleepTime);
290     }
291 }
292 
SendDataFrame(DFileSession * session,List * unsent,uint32_t tid,uint8_t socketIndex)293 int32_t SendDataFrame(DFileSession *session, List *unsent, uint32_t tid, uint8_t socketIndex)
294 {
295     int32_t ret = NSTACKX_EOK;
296     PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
297     List tmpq;
298 
299     if (peerInfo == NULL) {
300         return NSTACKX_EFAILED;
301     }
302     if (peerInfo->amendSendRate == 0) {
303         return ret;
304     }
305 
306     CheckSendByBackPress(session, tid, socketIndex);
307 
308     int32_t maxCount = GetMaxSendCount();
309     int32_t count = CheckUnsentList(unsent, &tmpq, maxCount);
310     ret = DoSendDataFrame(session, &tmpq, count, tid, socketIndex);
311     if (ret == NSTACKX_EAGAIN) {
312         ListMove(&tmpq, unsent);
313     }
314     return ret;
315 }
316 
SendControlFrame(DFileSession * session,QueueNode * queueNode)317 int32_t SendControlFrame(DFileSession *session, QueueNode *queueNode)
318 {
319     int32_t ret;
320     Socket *socket = NULL;
321 
322     if (CapsTcp(session)) {
323         socket = (session->sessionType == DFILE_SESSION_TYPE_SERVER) ? session->acceptSocket : session->socket[0];
324         ret = SocketSend(socket, queueNode->frame + queueNode->sendLen, queueNode->length - queueNode->sendLen);
325         if (ret > 0 && ret == (int32_t)(queueNode->length - queueNode->sendLen)) {
326             queueNode->sendLen = 0;
327         } else if (ret > 0) {
328             queueNode->sendLen = queueNode->sendLen + (uint32_t)ret;
329             ret = NSTACKX_EAGAIN;
330         } else if (errno == EAGAIN) {
331             ret = NSTACKX_EAGAIN;
332         } else {
333             DFILE_LOGE(TAG, "socket send failed ret is %d errno is %d", ret, errno);
334             ret = NSTACKX_EFAILED;
335         }
336         return ret;
337     }
338 
339     uint8_t socketIndex = queueNode->socketIndex;
340     ret = SocketSend(session->socket[socketIndex], queueNode->frame, queueNode->length);
341     if (ret <= 0) {
342         if (ret != NSTACKX_EAGAIN) {
343             DFILE_LOGE(TAG, "MpEscape. socket:%u send failed. Errno:%d", socketIndex, errno);
344             ret = NSTACKX_EFAILED;
345         }
346     }
347 
348     return ret;
349 }
350 
SendOutboundFrame(DFileSession * session,QueueNode ** preQueueNode)351 int32_t SendOutboundFrame(DFileSession *session, QueueNode **preQueueNode)
352 {
353     QueueNode *queueNode = *preQueueNode;
354     int32_t ret;
355 
356     do {
357         if (PthreadMutexLock(&session->outboundQueueLock) != 0) {
358             DFILE_LOGE(TAG, "Pthread mutex lock failed");
359             ret = NSTACKX_EFAILED;
360             break;
361         }
362         if (queueNode == NULL && session->outboundQueueSize) {
363             queueNode = (QueueNode *)ListPopFront(&session->outboundQueue);
364             session->outboundQueueSize--;
365         }
366         if (PthreadMutexUnlock(&session->outboundQueueLock) != 0) {
367             DFILE_LOGE(TAG, "Pthread mutex unlock failed");
368             ret = NSTACKX_EFAILED;
369             break;
370         }
371         if (queueNode == NULL) {
372             ret = NSTACKX_EOK;
373             break;
374         }
375 
376         uint32_t socketIndex = queueNode->socketIndex;
377         if (session->socket[socketIndex]->protocol == NSTACKX_PROTOCOL_UDP &&
378             session->socket[socketIndex]->isServer == NSTACKX_TRUE) {
379             session->socket[socketIndex]->dstAddr = queueNode->peerAddr;
380         }
381 
382         ret = SendControlFrame(session, queueNode);
383         if (ret <= 0) {
384             break;
385         }
386         /* Send ok, try to get next frame. */
387         DestroyQueueNode(queueNode);
388         queueNode = NULL;
389         NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
390     } while (!session->closeFlag);
391 
392     if (ret == NSTACKX_EAGAIN) {
393         *preQueueNode = queueNode;
394     } else {
395         *preQueueNode = NULL;
396         DestroyQueueNode(queueNode);
397         queueNode = NULL;
398     }
399     return ret;
400 }
401 
TcpSocketRecv(DFileSession * session,uint8_t * buffer,size_t length,struct sockaddr_in * srcAddr,const socklen_t * addrLen)402 int32_t TcpSocketRecv(DFileSession *session, uint8_t *buffer, size_t length, struct sockaddr_in *srcAddr,
403     const socklen_t *addrLen)
404 {
405     int32_t ret;
406     int recvLen = 0;
407 
408     Socket *socket = session->socket[0];
409 
410     if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
411         socket = session->acceptSocket;
412     }
413 
414     while (recvLen < (int32_t)length) {
415         ret = SocketRecv(socket, buffer + session->recvLen, length - (size_t)recvLen, srcAddr, addrLen);
416         if (ret == 0) {
417             return NSTACKX_PEER_CLOSE;
418         }
419         if (ret < 0) {
420             if (errno != EAGAIN) {
421                 ret = NSTACKX_EFAILED;
422                 return ret;
423             } else {
424                 return NSTACKX_EAGAIN;
425             }
426         }
427         recvLen = recvLen + ret;
428         session->recvLen = session->recvLen + (uint32_t)ret;
429     }
430 
431     return recvLen;
432 }
433 
SocketRecvForTcp(DFileSession * session,uint8_t * buffer,struct sockaddr_in * srcAddr,const socklen_t * addrLen)434 int32_t SocketRecvForTcp(DFileSession *session, uint8_t *buffer, struct sockaddr_in *srcAddr,
435     const socklen_t *addrLen)
436 {
437     int32_t ret;
438     uint16_t payloadLen;
439     DFileFrameHeader *frameHeader = NULL;
440     size_t length = sizeof(DFileFrameHeader);
441     if (session->recvLen < length) {
442         ret = TcpSocketRecv(session, buffer, length - session->recvLen, srcAddr, addrLen);
443         if (ret <= 0) {
444             return ret;
445         }
446     }
447 
448     frameHeader = (DFileFrameHeader *)(session->recvBuffer);
449     payloadLen = ntohs(frameHeader->length);
450     if (payloadLen >= NSTACKX_RECV_BUFFER_LEN) {
451         DFILE_LOGI(TAG, "header length is %u recv length is %u payloadLen is %u type %u", length,
452              session->recvLen, payloadLen, frameHeader->type);
453         return NSTACKX_EFAILED;
454     }
455 
456     if ((session->recvLen - length) < payloadLen) {
457         ret = TcpSocketRecv(session, buffer, payloadLen - (session->recvLen - length), srcAddr, addrLen);
458         if (ret <= 0) {
459             return ret;
460         }
461     }
462 
463     return (int32_t)(session->recvLen);
464 }
465