• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "nstackx_dfile_session.h"
17 
18 #include "nstackx_congestion.h"
19 #include "nstackx_dfile.h"
20 #include "nstackx_dfile_config.h"
21 #include "nstackx_dfile_frame.h"
22 #include "nstackx_dfile_send.h"
23 #include "nstackx_epoll.h"
24 #include "nstackx_error.h"
25 #include "nstackx_event.h"
26 #include "nstackx_dfile_log.h"
27 #ifdef MBEDTLS_INCLUDED
28 #include "nstackx_mbedtls.h"
29 #else
30 #include "nstackx_openssl.h"
31 #endif
32 #include "nstackx_timer.h"
33 #include "nstackx_dfile_control.h"
34 #include "nstackx_dfile_mp.h"
35 #include "securec.h"
36 #include "nstackx_util.h"
37 #include "nstackx_dfile_dfx.h"
38 
39 #define TAG "nStackXDFile"
40 
41 #define DEFAULT_WAIT_TIME_MS               1000
42 #define MULTI_THREADS_SOCKET_WAIT_TIME_MS  1
43 #define DEFAULT_NEGOTIATE_TIMEOUT          1000
44 #define TCP_NEGOTIATE_TIMEOUT              10000
45 #define MAX_SERVER_NEGOTIATE_VALID_TIMEOUT (600 * DEFAULT_NEGOTIATE_TIMEOUT)
46 #define MAX_NEGOTIATE_TIMEOUT_COUNT        3
47 #define MAX_UNPROCESSED_READ_EVENT_COUNT   3
48 #define MAX_RECVBUF_COUNT                  130000
49 #define MAX_NOMEM_PRINT                    10000
50 
51 static void ReadEventHandle(void *arg);
52 static void ProcessSessionTrans(const DFileSession *session, uint16_t exceptTransId);
53 DFileEventFunc g_dfileEventFunc;
54 
CreateQueueNode(const uint8_t * frame,size_t length,const struct sockaddr_in * peerAddr,uint8_t socketIndex)55 static QueueNode *CreateQueueNode(const uint8_t *frame, size_t length,
56     const struct sockaddr_in *peerAddr, uint8_t socketIndex)
57 {
58     QueueNode *queueNode = NULL;
59 
60     if (frame == NULL || length == 0 || length > NSTACKX_MAX_FRAME_SIZE) {
61         return NULL;
62     }
63 
64     queueNode = calloc(1, sizeof(QueueNode));
65     if (queueNode == NULL) {
66         return NULL;
67     }
68 
69     queueNode->frame = calloc(1, length);
70     if (queueNode->frame == NULL) {
71         free(queueNode);
72         return NULL;
73     }
74     queueNode->length = length;
75     queueNode->sendLen = 0;
76 
77     if (memcpy_s(queueNode->frame, length, frame, length) != EOK) {
78         free(queueNode->frame);
79         free(queueNode);
80         return NULL;
81     }
82     if (peerAddr != NULL) {
83         if (memcpy_s(&queueNode->peerAddr, sizeof(struct sockaddr_in), peerAddr, sizeof(struct sockaddr_in)) != EOK) {
84             free(queueNode->frame);
85             free(queueNode);
86             return NULL;
87         }
88     }
89     queueNode->socketIndex = socketIndex;
90 
91     return queueNode;
92 }
93 
DestroyQueueNode(QueueNode * queueNode)94 void DestroyQueueNode(QueueNode *queueNode)
95 {
96     if (queueNode != NULL) {
97         free(queueNode->frame);
98         free(queueNode);
99     }
100 }
101 
NotifyMsgRecver(const DFileSession * session,DFileMsgType msgType,const DFileMsg * msg)102 void NotifyMsgRecver(const DFileSession *session, DFileMsgType msgType, const DFileMsg *msg)
103 {
104     if (session == NULL) {
105         DFILE_LOGI(TAG, "session is NULL");
106         return;
107     }
108 
109     if (session->msgReceiver == NULL) {
110         DFILE_LOGI(TAG, "msgReceiver is NULL");
111         return;
112     }
113 
114     session->msgReceiver(session->sessionId, msgType, msg);
115 }
116 
CalculateSessionTransferRatePrepare(DFileSession * session)117 void CalculateSessionTransferRatePrepare(DFileSession *session)
118 {
119 #ifdef NSTACKX_SMALL_FILE_SUPPORT
120     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && (!ListIsEmpty(&session->pendingFileLists)
121         || !ListIsEmpty(&session->smallFileLists))) {
122         return;
123     }
124 #else
125     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && !ListIsEmpty(&session->pendingFileLists)) {
126         return;
127     }
128 #endif
129     if (!ListIsEmpty(&session->dFileTransChain)) {
130         return;
131     }
132     DFILE_LOGI(TAG, "begin to calculate transfer rate");
133     session->bytesTransferred = 0;
134     session->transCount = 0;
135     ClockGetTime(CLOCK_MONOTONIC, &session->startTs);
136 }
137 
138 #ifdef NSTACKX_SMALL_FILE_SUPPORT
SendSmallList(DFileSession * session)139 static int32_t SendSmallList(DFileSession *session)
140 {
141     FileListInfo *fileListInfo = NULL;
142     DFileMsg data;
143     if (session->fileListProcessingCnt > 0 || session->fileListPendingCnt == 0) {
144         while (!ListIsEmpty(&session->smallFileLists)) {
145             fileListInfo = (FileListInfo *)ListPopFront(&session->smallFileLists);
146             session->smallListPendingCnt--;
147             if (fileListInfo == NULL) {
148                 continue;
149             }
150             int32_t ret = DFileStartTrans(session, fileListInfo);
151             if (ret == NSTACKX_EOK) {
152                 return NSTACKX_TRUE;
153             }
154             DFILE_LOGE(TAG, "DFileStartTrans fail, error: %d", ret);
155             (void)memset_s(&data, sizeof(data), 0, sizeof(data));
156             data.errorCode = ret;
157             data.fileList.files = (const char **)fileListInfo->files;
158             data.fileList.fileNum = fileListInfo->fileNum;
159             data.fileList.userData = fileListInfo->userData;
160             NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, &data);
161             DestroyFileListInfo(fileListInfo);
162         }
163     }
164     return NSTACKX_FALSE;
165 }
166 #endif
167 
SendPendingList(DFileSession * session)168 static void SendPendingList(DFileSession *session)
169 {
170     FileListInfo *fileListInfo = NULL;
171     DFileMsg data;
172     while (!ListIsEmpty(&session->pendingFileLists)) {
173         fileListInfo = (FileListInfo *)ListPopFront(&session->pendingFileLists);
174         session->fileListPendingCnt--;
175         if (fileListInfo == NULL) {
176             continue;
177         }
178         int32_t ret = DFileStartTrans(session, fileListInfo);
179         if (ret == NSTACKX_EOK) {
180             break;
181         }
182         DFILE_LOGE(TAG, "DFileStartTrans fail, error: %d", ret);
183         (void)memset_s(&data, sizeof(data), 0, sizeof(data));
184         data.errorCode = ret;
185         data.fileList.files = (const char **)fileListInfo->files;
186         data.fileList.fileNum = fileListInfo->fileNum;
187         data.fileList.userData = fileListInfo->userData;
188         NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, &data);
189         DestroyFileListInfo(fileListInfo);
190     }
191 }
192 
SendSmallAndPendingList(DFileSession * session)193 static void SendSmallAndPendingList(DFileSession *session)
194 {
195 #ifdef NSTACKX_SMALL_FILE_SUPPORT
196     DFILE_LOGI(TAG, "fileListPendingCnt %u fileListProcessingCnt %u smallListPendingCnt %u smallListProcessingCnt %u",
197         session->fileListPendingCnt, session->fileListProcessingCnt, session->smallListPendingCnt,
198         session->smallListProcessingCnt);
199     if (SendSmallList(session) != NSTACKX_TRUE) {
200         SendPendingList(session);
201     }
202 #else
203     DFILE_LOGI(TAG, "fileListPendingCnt %u fileListProcessingCnt %u",
204         session->fileListPendingCnt, session->fileListProcessingCnt);
205     SendPendingList(session);
206 #endif
207 }
208 
NoticeSessionProgress(DFileSession * session)209 void NoticeSessionProgress(DFileSession *session)
210 {
211     DFileMsg data;
212     (void)memset_s(&data, sizeof(data), 0, sizeof(data));
213     if ((FileManagerGetTotalBytes(session->fileManager, &data.transferUpdate.totalBytes) == NSTACKX_EOK) &&
214         (FileManagerGetBytesTransferred(session->fileManager, &data.transferUpdate.bytesTransferred) == NSTACKX_EOK) &&
215         (data.transferUpdate.bytesTransferred <= data.transferUpdate.totalBytes) &&
216         (data.transferUpdate.bytesTransferred > 0)) {
217         NotifyMsgRecver(session, DFILE_ON_SESSION_IN_PROGRESS, &data);
218     }
219 }
220 
UpdateMsgProcessInfo(const DFileSession * session,struct DFileTrans * dFileTrans,DFileTransMsgType msgType,DFileTransMsg * msg)221 static void UpdateMsgProcessInfo(const DFileSession *session, struct DFileTrans *dFileTrans,
222                                  DFileTransMsgType msgType, DFileTransMsg *msg)
223 {
224     uint64_t totalBytes = 0;
225     uint64_t bytesTrans = 0;
226 
227     if (session == NULL || dFileTrans == NULL || msg == NULL ||
228         (msgType != DFILE_TRANS_MSG_FILE_RECEIVED && msgType != DFILE_TRANS_MSG_FILE_RECEIVE_FAIL &&
229         msgType != DFILE_TRANS_MSG_FILE_SEND_FAIL && msgType != DFILE_TRANS_MSG_FILE_SENT)) {
230         return;
231     }
232 
233     msg->transferUpdate.transId = dFileTrans->transId;
234 
235     if (msgType == DFILE_TRANS_MSG_FILE_RECEIVED ||  msgType == DFILE_TRANS_MSG_FILE_SENT) {
236         totalBytes = DFileTransGetTotalBytes(dFileTrans);
237         if (totalBytes > 0) {
238             bytesTrans = totalBytes;
239             goto L_END;
240         }
241     }
242 
243     if (FileManagerGetTransUpdateInfo(session->fileManager, dFileTrans->transId, &totalBytes, &bytesTrans) !=
244         NSTACKX_EOK) {
245         return;
246     }
247 
248 L_END:
249     msg->transferUpdate.totalBytes = totalBytes;
250     msg->transferUpdate.bytesTransferred = bytesTrans;
251     if (msgType == DFILE_TRANS_MSG_FILE_SENT) {
252         if (dFileTrans->fileList->vtransFlag) {
253             msg->fileList.transId = dFileTrans->fileList->vtransRealTransId;
254             msg->transferUpdate.transId = dFileTrans->fileList->vtransRealTransId;
255         }
256         NotifyMsgRecver(session, DFILE_ON_TRANS_IN_PROGRESS, msg);
257     }
258 }
259 
WakeSendThread(DFileSession * session,uint8_t isSender,uint8_t socketIndex)260 static void WakeSendThread(DFileSession *session, uint8_t isSender, uint8_t socketIndex)
261 {
262     SemPost(&session->outboundQueueWait[socketIndex]);
263     if (isSender && session->clientSendThreadNum > 1) {
264         for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
265             SemPost(&session->sendThreadPara[i].sendWait);
266         }
267     }
268 }
269 
CalculateSessionTransferRate(DFileSession * session,uint64_t totalBytes,DFileTransMsgType msgType)270 static void CalculateSessionTransferRate(DFileSession *session, uint64_t totalBytes, DFileTransMsgType msgType)
271 {
272     if (msgType != DFILE_TRANS_MSG_FILE_SENT && msgType != DFILE_TRANS_MSG_END) {
273         return;
274     }
275     if (totalBytes <= UINT64_MAX - session->bytesTransferred) {
276         session->bytesTransferred += totalBytes;
277     } else {
278         session->bytesTransferred = UINT64_MAX;
279     }
280     session->transCount++;
281     if (!ListIsEmpty(&session->dFileTransChain)) {
282         return;
283     }
284 #ifdef NSTACKX_SMALL_FILE_SUPPORT
285     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && (!ListIsEmpty(&session->pendingFileLists)
286         || !ListIsEmpty(&session->smallFileLists))) {
287         return;
288     }
289 #else
290     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && !ListIsEmpty(&session->pendingFileLists)) {
291         return;
292     }
293 #endif
294     struct timespec endTs;
295     ClockGetTime(CLOCK_MONOTONIC, &endTs);
296 
297     uint32_t spendTime = GetTimeDiffMs(&endTs, &session->startTs);
298     if (spendTime == 0) {
299         return;
300     }
301     const double rate = 1.0 * session->bytesTransferred / DFILE_MEGABYTES * MSEC_TICKS_PER_SEC / spendTime;
302     DFILE_LOGI(TAG, "Total %u trans, %llu bytes, used %u ms. rate %.2f MB/s",
303         session->transCount, session->bytesTransferred, spendTime, rate);
304     DFileMsg msgData;
305     (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
306     msgData.rate = (uint32_t)rate;
307     NotifyMsgRecver(session, DFILE_ON_SESSION_TRANSFER_RATE, &msgData);
308 
309     TransferCompleteEvent(rate);
310 }
311 
CheckTransDone(DFileSession * session,struct DFileTrans * dFileTrans,DFileTransMsgType msgType)312 static void CheckTransDone(DFileSession *session, struct DFileTrans *dFileTrans, DFileTransMsgType msgType)
313 {
314     if (msgType == DFILE_TRANS_MSG_FILE_RECEIVE_FAIL || msgType == DFILE_TRANS_MSG_FILE_SENT ||
315         msgType == DFILE_TRANS_MSG_FILE_SEND_FAIL || msgType == DFILE_TRANS_MSG_END) {
316         uint8_t flag = dFileTrans->fileList->smallFlag;
317         if (SetTransIdState(session, dFileTrans->transId, STATE_TRANS_DONE) != NSTACKX_EOK) {
318             DFILE_LOGE(TAG, "set trans id state fail");
319         }
320         ((PeerInfo *)dFileTrans->context)->currentTransCount--;
321         ListRemoveNode(&dFileTrans->list);
322         uint64_t totalBytes = DFileTransGetTotalBytes(dFileTrans);
323         DFileTransDestroy(dFileTrans);
324         if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
325             if (flag == NSTACKX_TRUE && session->smallListProcessingCnt > 0) {
326                 session->smallListProcessingCnt--;
327             } else if (session->fileListProcessingCnt > 0) {
328                 session->fileListProcessingCnt--;
329             }
330             SendSmallAndPendingList(session);
331         }
332         CalculateSessionTransferRate(session, totalBytes, msgType);
333     }
334 }
335 
DTransMsgReceiver(struct DFileTrans * dFileTrans,DFileTransMsgType msgType,DFileTransMsg * msg)336 static void DTransMsgReceiver(struct DFileTrans *dFileTrans, DFileTransMsgType msgType,
337                               DFileTransMsg *msg)
338 {
339     PeerInfo *peerInfo = dFileTrans->context;
340     DFileSession *session = peerInfo->session;
341 
342     UpdateMsgProcessInfo(session, dFileTrans, msgType, msg);
343 
344     switch (msgType) {
345         case DFILE_TRANS_MSG_FILE_SEND_DATA:
346             WakeSendThread(session, dFileTrans->isSender, peerInfo->socketIndex);
347             break;
348         case DFILE_TRANS_MSG_FILE_LIST_RECEIVED:
349             NotifyMsgRecver(session, DFILE_ON_FILE_LIST_RECEIVED, msg);
350             break;
351         case DFILE_TRANS_MSG_FILE_RECEIVED: /* Receiver receive all the file data */
352             NotifyMsgRecver(session, DFILE_ON_FILE_RECEIVE_SUCCESS, msg);
353             break;
354         case DFILE_TRANS_MSG_FILE_RECEIVED_TO_FAIL:
355         case DFILE_TRANS_MSG_FILE_RECEIVE_FAIL:
356             NotifyMsgRecver(session, DFILE_ON_FILE_RECEIVE_FAIL, msg);
357             break;
358         case DFILE_TRANS_MSG_FILE_SENT: /* Sender send TRANSFER DONE ACK frame and come to end */
359             NoticeSessionProgress(session);
360             NotifyMsgRecver(session, DFILE_ON_FILE_SEND_SUCCESS, msg);
361             break;
362         case DFILE_TRANS_MSG_FILE_SEND_FAIL:
363             NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, msg);
364             break;
365         case DFILE_TRANS_MSG_IN_PROGRESS:
366             NotifyMsgRecver(session, DFILE_ON_TRANS_IN_PROGRESS, msg);
367             break;
368         case DFILE_TRANS_MSG_FILE_SEND_ACK:
369             ProcessSessionTrans(session, dFileTrans->transId);
370             break;
371         default:
372             DFILE_LOGI(TAG, "transId %u, recv DFileTrans event %d", dFileTrans->transId, msgType);
373     }
374 
375     CheckTransDone(session, dFileTrans, msgType);
376 }
377 
ServerSettingTimeoutHandle(void * data)378 static void ServerSettingTimeoutHandle(void *data)
379 {
380     PeerInfo *peerInfo = data;
381     ListRemoveNode(&peerInfo->list);
382     TimerDelete(peerInfo->settingTimer);
383     peerInfo->session->peerInfoCnt--;
384     free(peerInfo);
385     DFILE_LOGI(TAG, "DFileServer Setting Negotationion timeout");
386 }
387 
ClientSettingTimeoutHandle(void * data)388 static void ClientSettingTimeoutHandle(void *data)
389 {
390     PeerInfo *peerInfo = data;
391     uint8_t cnt = peerInfo->settingTimeoutCnt++;
392     DFileMsg msgData;
393     uint32_t timeout = CapsTcp(peerInfo->session) ? TCP_NEGOTIATE_TIMEOUT : DEFAULT_NEGOTIATE_TIMEOUT;
394     (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
395     if (cnt >= MAX_NEGOTIATE_TIMEOUT_COUNT) {
396         TimerDelete(peerInfo->settingTimer);
397         peerInfo->settingTimer = NULL;
398         peerInfo->settingTimeoutCnt = 0;
399         msgData.errorCode = NSTACKX_EFAILED;
400         DFILE_LOGI(TAG, "DFileClient Setting Negotationion timeout");
401         NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &msgData);
402     } else {
403         DFileSessionSendSetting(peerInfo);
404         DFILE_LOGI(TAG, "Client Setting Negotationion timeout %u times", peerInfo->settingTimeoutCnt);
405         if (TimerSetTimeout(peerInfo->settingTimer, timeout, NSTACKX_FALSE) != NSTACKX_EOK) {
406             msgData.errorCode = NSTACKX_EFAILED;
407             NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &msgData);
408             DFILE_LOGE(TAG, "Timer setting timer fail");
409         }
410     }
411 }
412 
SearchDFileTransNode(List * dFileTransChain,uint16_t transId)413 static DFileTrans *SearchDFileTransNode(List *dFileTransChain, uint16_t transId)
414 {
415     List *pos = NULL;
416     DFileTrans *trans = NULL;
417 
418     if (dFileTransChain == NULL || transId == 0) {
419         return NULL;
420     }
421 
422     LIST_FOR_EACH(pos, dFileTransChain) {
423         trans = (DFileTrans *)pos;
424         if (trans->transId == transId) {
425             return trans;
426         }
427     }
428     return NULL;
429 }
430 
SearchPeerInfoNode(const DFileSession * session,const struct sockaddr_in * peerAddr)431 static PeerInfo *SearchPeerInfoNode(const DFileSession *session, const struct sockaddr_in *peerAddr)
432 {
433     List *pos = NULL;
434     PeerInfo *peerInfo = NULL;
435     LIST_FOR_EACH(pos, &session->peerInfoChain) {
436         peerInfo = (PeerInfo *)pos;
437         if (memcmp(&peerInfo->dstAddr, peerAddr, sizeof(struct sockaddr_in)) == 0 &&
438             peerInfo->session->sessionId == session->sessionId) {
439             return peerInfo;
440         }
441     }
442     return NULL;
443 }
444 
DFileSessionSendSetting(PeerInfo * peerInfo)445 void DFileSessionSendSetting(PeerInfo *peerInfo)
446 {
447     uint32_t timeout = CapsTcp(peerInfo->session) ? TCP_NEGOTIATE_TIMEOUT : DEFAULT_NEGOTIATE_TIMEOUT;
448     uint8_t buf[NSTACKX_DEFAULT_FRAME_SIZE];
449     size_t frameLen = 0;
450     DFileMsg data;
451     (void)memset_s(&data, sizeof(data), 0, sizeof(data));
452     SettingFrame settingFramePara;
453     settingFramePara.connType = peerInfo->connType;
454     settingFramePara.mtu = peerInfo->localMtu;
455     settingFramePara.capability = peerInfo->session->capability & NSTACKX_CAPS_LINK_SEQUENCE;
456     settingFramePara.dataFrameSize = 0;
457     settingFramePara.capsCheck = NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
458     if (peerInfo->session->fileManager->keyLen) {
459         DFileGetCipherCaps(peerInfo->session, &settingFramePara);
460         settingFramePara.deviceBits = DFileGetDeviceBits();
461     }
462     EncodeSettingFrame(buf, NSTACKX_DEFAULT_FRAME_SIZE, &frameLen, &settingFramePara);
463 
464     if (peerInfo->settingTimer != NULL) {
465         int32_t ret = DFileWriteHandle(buf, frameLen, peerInfo);
466         if (ret != (int32_t)frameLen && ret != NSTACKX_EAGAIN) {
467             data.errorCode = NSTACKX_EFAILED;
468             NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
469         }
470         return;
471     }
472 
473     if (peerInfo->session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
474         peerInfo->settingTimer = TimerStart(peerInfo->session->epollfd, timeout,
475                                             NSTACKX_FALSE, ClientSettingTimeoutHandle, peerInfo);
476         if (peerInfo->settingTimer == NULL) {
477             DFILE_LOGE(TAG, "setting timmer creat fail");
478             data.errorCode = NSTACKX_EFAILED;
479             NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
480             return;
481         }
482     } else {
483         peerInfo->settingTimer = TimerStart(peerInfo->session->epollfd, MAX_SERVER_NEGOTIATE_VALID_TIMEOUT,
484                                             NSTACKX_FALSE, ServerSettingTimeoutHandle, peerInfo);
485         if (peerInfo->settingTimer == NULL) {
486             return;
487         }
488     }
489 
490     int32_t ret = DFileWriteHandle(buf, frameLen, peerInfo);
491     if (ret != (int32_t)frameLen && ret != NSTACKX_EAGAIN) {
492         data.errorCode = NSTACKX_EFAILED;
493         NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
494     }
495 }
496 
SetDFileSessionConfig(DFileSession * session,DFileConfig * dFileConfig,uint16_t connType,PeerInfo * peerInfo)497 static void SetDFileSessionConfig(DFileSession *session, DFileConfig *dFileConfig, uint16_t connType,
498     PeerInfo *peerInfo)
499 {
500     peerInfo->maxSendRate = dFileConfig->sendRate;
501     (void)memset_s(peerInfo->integralLossRate, INTEGRAL_TIME * sizeof(double), 0, INTEGRAL_TIME * sizeof(double));
502     peerInfo->fastStartCounter = 0;
503     ClockGetTime(CLOCK_MONOTONIC, &peerInfo->measureBefore);
504     peerInfo->dataFrameSize = dFileConfig->dataFrameSize;
505     if (connType == CONNECT_TYPE_WLAN) {
506         peerInfo->sendRate = dFileConfig->sendRate / NSTACKX_WLAN_INIT_SPEED_DIVISOR;
507     } else {
508         peerInfo->sendRate = dFileConfig->sendRate / NSTACKX_P2P_INIT_SPEED_DIVISOR;
509     }
510 
511 #ifndef NSTACKX_WITH_LITEOS
512     if ((!peerInfo->gotWifiRate) && (connType == CONNECT_TYPE_WLAN)) {
513         peerInfo->sendRate = (uint16_t)(NSTACKX_WLAN_INIT_RATE / MSEC_TICKS_PER_SEC * DATA_FRAME_SEND_INTERVAL_MS
514             / dFileConfig->dataFrameSize + NSTACKX_WLAN_COMPENSATION_RATE);
515     }
516 #endif
517     if (peerInfo->sendRate < NSTACKX_MIN_SENDRATE) {
518         peerInfo->sendRate = NSTACKX_MIN_SENDRATE;
519     }
520 
521     if (FileManagerSetMaxFrameLength(session->fileManager, peerInfo->dataFrameSize) != NSTACKX_EOK) {
522         DFILE_LOGE(TAG, "failed to set max frame length");
523     }
524 
525     DFILE_LOGI(TAG, "connType is %u set sendrate is %u maxSendRate is %u peerInfo->dataFrameSize is %u",
526         connType, peerInfo->sendRate, peerInfo->maxSendRate, peerInfo->dataFrameSize);
527     if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
528         if (FileManagerSetRecvParaWithConnType(session->fileManager, connType) != NSTACKX_EOK) {
529             DFILE_LOGE(TAG, "failed to set recv para");
530         }
531     }
532 }
533 
DFileSessionSetPeerInfo(PeerInfo * peerInfo,SettingState state,SettingFrame * settingFrame)534 static void DFileSessionSetPeerInfo(PeerInfo *peerInfo, SettingState state, SettingFrame *settingFrame)
535 {
536     peerInfo->state = state;
537     peerInfo->mtu = settingFrame->mtu;
538     peerInfo->connType = settingFrame->connType;
539     uint16_t localMtu = peerInfo->localMtu;
540     peerInfo->mtuInuse = (localMtu < peerInfo->mtu) ? localMtu : peerInfo->mtu;
541     peerInfo->remoteSessionId = settingFrame->header.sessionId;
542 }
543 
DFileSessionHandleClientSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)544 static void DFileSessionHandleClientSetting(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
545 {
546     List *pos = NULL;
547     SettingFrame hostSettingFrame;
548     (void)memset_s(&hostSettingFrame, sizeof(hostSettingFrame), 0, sizeof(hostSettingFrame));
549     DFILE_LOGI(TAG, "handle Setting Frame, DFileSessionType %u", session->sessionType);
550     /* unsupport version */
551     if (DecodeSettingFrame((SettingFrame *)dFileFrame, &hostSettingFrame) != NSTACKX_EOK || hostSettingFrame.mtu == 0) {
552         return;
553     }
554     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
555     if (peerInfo == NULL) {
556         DFILE_LOGE(TAG, "recv unknown peer setting, maybe be attacked");
557         return;
558     }
559     peerInfo->remoteDFileVersion = hostSettingFrame.dFileVersion;
560     TimerDelete(peerInfo->settingTimer);
561     peerInfo->settingTimer = NULL;
562     DFileSessionSetPeerInfo(peerInfo, SETTING_NEGOTIATED, &hostSettingFrame);
563     LIST_FOR_EACH(pos, &session->dFileTransChain) {
564         DFileTrans *trans = (DFileTrans *)pos;
565         if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
566             DFILE_LOGE(TAG, "set trans mtu failed");
567         }
568     }
569     DFileMsg data;
570     (void)memset_s(&data, sizeof(data), 0, sizeof(data));
571     data.errorCode = NSTACKX_EOK;
572     NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_SUCCESS, &data);
573 
574     DFileConfig dFileConfig;
575     (void)memset_s(&dFileConfig, sizeof(dFileConfig), 0, sizeof(dFileConfig));
576     if (GetDFileConfig(&dFileConfig, peerInfo->mtuInuse, hostSettingFrame.connType) == NSTACKX_EOK) {
577         SetDFileSessionConfig(session, &dFileConfig, hostSettingFrame.connType, peerInfo);
578     }
579     if (hostSettingFrame.capability & NSTACKX_CAPS_LINK_SEQUENCE) {
580         DFILE_LOGI(TAG, "server replies not support Link Sequence");
581     } else {
582         DFILE_LOGI(TAG, "server replies using normal ACK");
583         session->capability &= ~NSTACKX_CAPS_LINK_SEQUENCE;
584     }
585     DFileChooseCipherType(&hostSettingFrame, session);
586 }
587 
DFileGetMTU(SocketProtocol protocol)588 static uint16_t DFileGetMTU(SocketProtocol protocol)
589 {
590     /* for udp, return NSTACKX_DEFAULT_FRAME_SIZE, for D2D, need call D2D MTU interface */
591     uint16_t mtu = 0;
592     if (protocol == NSTACKX_PROTOCOL_UDP) {
593         mtu = NSTACKX_DEFAULT_FRAME_SIZE;
594     } else if (protocol == NSTACKX_PROTOCOL_D2D) {
595         DFILE_LOGE(TAG, "d2d not support");
596     } else if (protocol == NSTACKX_PROTOCOL_TCP) {
597         mtu = NSTACKX_DEFAULT_FRAME_SIZE;
598     }
599 
600     return mtu;
601 }
602 
CreatePeerInfo(DFileSession * session,const struct sockaddr_in * dstAddr,uint16_t peerMtu,uint16_t connType,uint8_t socketIndex)603 PeerInfo *CreatePeerInfo(DFileSession *session, const struct sockaddr_in *dstAddr, uint16_t peerMtu,
604     uint16_t connType, uint8_t socketIndex)
605 {
606     if (session->peerInfoCnt >= MAX_PEERINFO_SIZE) {
607         return NULL;
608     }
609     PeerInfo *peerInfo = calloc(1, sizeof(PeerInfo));
610     if (peerInfo == NULL) {
611         return NULL;
612     }
613     peerInfo->session = session;
614     peerInfo->dstAddr = *dstAddr;
615     peerInfo->connType = connType;
616     peerInfo->socketIndex = socketIndex;
617     peerInfo->localMtu = DFileGetMTU(session->protocol);
618     session->peerInfoCnt++;
619     peerInfo->gotWifiRate = 0;
620     peerInfo->ackInterval = NSTACKX_INIT_ACK_INTERVAL;
621     peerInfo->rateStateInterval = NSTACKX_INIT_RATE_STAT_INTERVAL;
622 
623     if (GetInterfaceNameByIP(session->socket[socketIndex]->srcAddr.sin_addr.s_addr,
624         peerInfo->localInterfaceName, sizeof(peerInfo->localInterfaceName)) != NSTACKX_EOK) {
625         DFILE_LOGE(TAG, "GetInterfaceNameByIP failed %d", errno);
626     }
627 
628     if (peerMtu == 0) {
629         return peerInfo;
630     }
631     peerInfo->mtu = peerMtu;
632     peerInfo->mtuInuse = (peerInfo->localMtu < peerInfo->mtu) ? peerInfo->localMtu : peerInfo->mtu;
633     return peerInfo;
634 }
635 
HandleLinkSeqCap(DFileSession * session,SettingFrame * hostSettingFrame)636 static void HandleLinkSeqCap(DFileSession *session, SettingFrame *hostSettingFrame)
637 {
638     if (hostSettingFrame->capability & NSTACKX_CAPS_LINK_SEQUENCE) {
639         DFILE_LOGI(TAG, "client wants to enable Link Sequence");
640     } else {
641         DFILE_LOGI(TAG, "client wants to use normal ACK");
642         session->capability &= ~NSTACKX_CAPS_LINK_SEQUENCE;
643     }
644 
645     if (hostSettingFrame->capability & NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK) {
646         DFILE_LOGI(TAG, "client support recv feedback");
647         session->capsCheck |= NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
648     } else {
649         DFILE_LOGI(TAG, "client do not support recv feedback");
650         session->capsCheck &= ~NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
651     }
652 }
653 
AllocPeerInfo(DFileSession * session,const struct sockaddr_in * peerAddr,const SettingFrame * frame,uint8_t socketIndex)654 static PeerInfo *AllocPeerInfo(DFileSession *session, const struct sockaddr_in *peerAddr, const SettingFrame *frame,
655     uint8_t socketIndex)
656 {
657     PeerInfo *peerInfo = NULL;
658 
659     peerInfo = CreatePeerInfo(session, peerAddr, frame->mtu, frame->connType, socketIndex);
660     if (peerInfo == NULL) {
661         return NULL;
662     }
663 
664     peerInfo->remoteSessionId = frame->header.sessionId;
665     peerInfo->state = SETTING_NEGOTIATING;
666     DFileSessionSendSetting(peerInfo);
667     if (peerInfo->settingTimer == NULL) {
668         free(peerInfo);
669         session->peerInfoCnt--;
670         return NULL;
671     }
672     ListInsertTail(&peerInfo->session->peerInfoChain, &peerInfo->list);
673     return peerInfo;
674 }
675 
DFileSessionHandleServerSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr,uint8_t socketIndex)676 static void DFileSessionHandleServerSetting(DFileSession *session, DFileFrame *dFileFrame,
677     struct sockaddr_in *peerAddr, uint8_t socketIndex)
678 {
679     SettingFrame hostSettingFrame;
680     DFileConfig dFileConfig;
681     (void)memset_s(&hostSettingFrame, sizeof(hostSettingFrame), 0, sizeof(hostSettingFrame));
682     (void)memset_s(&dFileConfig, sizeof(dFileConfig), 0, sizeof(dFileConfig));
683     DFILE_LOGI(TAG, "handle Setting Frame, DFileSessionType %u", session->sessionType);
684     if (DecodeSettingFrame((SettingFrame *)dFileFrame, &hostSettingFrame) != NSTACKX_EOK || hostSettingFrame.mtu == 0) {
685         return;
686     }
687 
688     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
689     if (peerInfo != NULL) {
690         if (peerInfo->settingTimeoutCnt >= MAX_NEGOTIATE_TIMEOUT_COUNT) {
691             DFILE_LOGE(TAG, "receive more than %d Setting for one peer, drop", MAX_NEGOTIATE_TIMEOUT_COUNT);
692             return;
693         } else {
694             DFileSessionSetPeerInfo(peerInfo, SETTING_NEGOTIATING, &hostSettingFrame);
695             DFileSessionSendSetting(peerInfo);
696             goto L_END;
697         }
698     }
699 
700     peerInfo = AllocPeerInfo(session, peerAddr, &hostSettingFrame, socketIndex);
701     if (peerInfo == NULL) {
702         return;
703     }
704 
705 L_END:
706     peerInfo->settingTimeoutCnt++;
707     DFILE_LOGI(TAG, "DFileServer response Setting Frame. count %u", peerInfo->settingTimeoutCnt);
708     peerInfo->remoteDFileVersion = hostSettingFrame.dFileVersion;
709     if (GetDFileConfig(&dFileConfig, peerInfo->mtuInuse, hostSettingFrame.connType) == NSTACKX_EOK) {
710         SetDFileSessionConfig(session, &dFileConfig, hostSettingFrame.connType, peerInfo);
711     }
712     HandleLinkSeqCap(session, &hostSettingFrame);
713     DFileChooseCipherType(&hostSettingFrame, session);
714 }
715 
DFileSessionHandleSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr,uint8_t socketIndex)716 static void DFileSessionHandleSetting(DFileSession *session, DFileFrame *dFileFrame,
717     struct sockaddr_in *peerAddr, uint8_t socketIndex)
718 {
719     if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
720         DFileSessionHandleServerSetting(session, dFileFrame, peerAddr, socketIndex);
721     } else if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
722         DFileSessionHandleClientSetting(session, dFileFrame, peerAddr);
723     } else {
724         return;
725     }
726 }
727 
HandleWithoutSettingError(DFileSession * session,const struct sockaddr_in * peerAddr)728 static void HandleWithoutSettingError(DFileSession *session, const struct sockaddr_in *peerAddr)
729 {
730     List *pos = NULL;
731     DFileTrans *trans = NULL;
732 
733     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
734     if (peerInfo == NULL) {
735         DFILE_LOGE(TAG, "recv unknown peer rst, maybe be attacked");
736         return;
737     }
738 
739     LIST_FOR_EACH(pos, &session->dFileTransChain) {
740         trans = (DFileTrans *)pos;
741         /*
742              * when client recv peer RST NSTACKX_DFILE_WITHOUT_SETTING_ERROR, Set Trans MTU 0,
743              * and will reset after new setting negotion.
744              */
745         if (DFileTransSetMtu(trans, 0) != NSTACKX_EOK) {
746             DFILE_LOGE(TAG, "DFileTransSetMtu(trans, 0) failed %d", errno);
747         }
748     }
749 
750     LIST_FOR_EACH(pos, &session->peerInfoChain) {
751         peerInfo = (PeerInfo *)pos;
752         if (memcmp(&peerInfo->dstAddr, peerAddr, sizeof(struct sockaddr_in)) == 0 &&
753             peerInfo->session->sessionId == session->sessionId) {
754             TimerDelete(peerInfo->settingTimer);
755             peerInfo->settingTimer = NULL;
756             peerInfo->state = SETTING_NEGOTIATING;
757             DFILE_LOGD(TAG, "Send Setting Frame");
758             DFileSessionSendSetting(peerInfo);
759             break;
760         }
761     }
762 }
763 
DFileSessionHandleRst(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)764 static void DFileSessionHandleRst(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
765 {
766     uint16_t errCode = 0;
767     if (DecodeRstFrame((RstFrame *)dFileFrame, &errCode, NULL, NULL) != NSTACKX_EOK) {
768         return;
769     }
770 
771     uint16_t transId = ntohs(dFileFrame->header.transId);
772     DFILE_LOGD(TAG, "handle RST (%hu) frame, transId %hu", errCode, transId);
773 
774     switch (errCode) {
775         case NSTACKX_DFILE_WITHOUT_SETTING_ERROR:
776             HandleWithoutSettingError(session, peerAddr);
777             break;
778         default:
779             DFILE_LOGE(TAG, "Unspported error code %hu", errCode);
780             break;
781     }
782 }
783 
DFileSessionResolveBackPress(DFileSession * session,DataBackPressure backPress,uint32_t count)784 static void DFileSessionResolveBackPress(DFileSession *session, DataBackPressure backPress, uint32_t count)
785 {
786     uint32_t index;
787 
788     if (PthreadMutexLock(&session->backPressLock) != 0) {
789         DFILE_LOGE(TAG, "pthread backPressLock mutex lock failed");
790         return;
791     }
792 
793     if (backPress.recvListOverIo == 1) {
794         for (index = 0; index < count; index++) {
795             session->stopSendCnt[index]++;
796         }
797     } else {
798         for (index = 0; index < count; index++) {
799             session->stopSendCnt[index] = 0;
800         }
801     }
802 
803     if (PthreadMutexUnlock(&session->backPressLock) != 0) {
804         DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
805         return;
806     }
807 
808     return;
809 }
810 
DFileSessionHandleBackPressure(DFileSession * session,const DFileFrame * dFileFrame,const struct sockaddr_in * peerAddr)811 static void DFileSessionHandleBackPressure(DFileSession *session, const DFileFrame *dFileFrame,
812     const struct sockaddr_in *peerAddr)
813 {
814     DataBackPressure backPress;
815     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
816     if (peerInfo == NULL) {
817         DFILE_LOGE(TAG, "can't get valid peerinfo");
818         return;
819     }
820 
821     if (DecodeBackPressFrame((BackPressureFrame *)dFileFrame, &backPress) != NSTACKX_EOK) {
822         return;
823     }
824 
825     DFileSessionResolveBackPress(session, backPress, session->clientSendThreadNum);
826 
827     DFILE_LOGI(TAG, "handle back pressure recvListOverIo %u recvBufThreshold %u stopSendPeriod %u",
828         backPress.recvListOverIo, backPress.recvBufThreshold,
829         backPress.stopSendPeriod);
830 
831     return;
832 }
833 
CreateTrans(uint16_t transId,DFileSession * session,PeerInfo * peerInfo,uint8_t isSender)834 static DFileTrans *CreateTrans(uint16_t transId, DFileSession *session, PeerInfo *peerInfo, uint8_t isSender)
835 {
836     DFileTransPara transPara;
837 
838     if (peerInfo == NULL) {
839         return NULL;
840     }
841     (void)memset_s(&transPara, sizeof(transPara), 0, sizeof(transPara));
842     transPara.isSender = isSender;
843     transPara.transId = transId; /* for receiver, use transId of sender */
844     transPara.fileManager = session->fileManager;
845     transPara.writeHandle = DFileWriteHandle;
846     transPara.msgReceiver = DTransMsgReceiver;
847     transPara.connType = peerInfo->connType;
848     transPara.context = peerInfo;
849     transPara.session = session;
850     transPara.onRenameFile = session->onRenameFile;
851 
852     ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
853     return DFileTransCreate(&transPara);
854 }
855 
DFileSessionHandleFrame(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)856 static int32_t DFileSessionHandleFrame(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
857 {
858     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
859     if (peerInfo == NULL) {
860         DFILE_LOGI(TAG, "can't find peerInfo");
861         return NSTACKX_EFAILED;
862     }
863 
864     if (peerInfo->session->sessionType == DFILE_SESSION_TYPE_SERVER && peerInfo->state == SETTING_NEGOTIATING) {
865         peerInfo->state = SETTING_NEGOTIATED;
866         TimerDelete(peerInfo->settingTimer);
867         peerInfo->settingTimer = NULL;
868     }
869 
870     uint16_t transId = ntohs(dFileFrame->header.transId);
871     if (transId == 0) {
872         DFILE_LOGE(TAG, "transId is 0");
873         return NSTACKX_EFAILED;
874     }
875 
876     DFileTrans *trans = SearchDFileTransNode(&(session->dFileTransChain), transId);
877     if (trans == NULL) {
878         if (dFileFrame->header.type != NSTACKX_DFILE_FILE_HEADER_FRAME) {
879             /* Only HEADER frame can start dfile transfer (receiver) */
880             DFILE_LOGE(TAG, "trans %u is NULL && type is %u", transId, dFileFrame->header.type);
881             return NSTACKX_EFAILED;
882         }
883         if (IsTransIdDone(session, transId) == NSTACKX_EOK) {
884             return NSTACKX_EFAILED;
885         }
886         trans = CreateTrans(transId, session, peerInfo, NSTACKX_FALSE);
887         if (trans == NULL) {
888             DFileMsg data;
889             (void)memset_s(&data, sizeof(data), 0, sizeof(data));
890             data.errorCode = NSTACKX_ENOMEM;
891             NotifyMsgRecver(session, DFILE_ON_FATAL_ERROR, &data);
892             DFILE_LOGE(TAG, "trans is NULL");
893             return NSTACKX_EFAILED;
894         }
895         if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
896             DFILE_LOGE(TAG, "set trans mtu failed");
897         }
898         CalculateSessionTransferRatePrepare(session);
899         ListInsertTail(&(session->dFileTransChain), &(trans->list));
900         peerInfo->currentTransCount++;
901     }
902 
903     return HandleDFileFrame(trans, dFileFrame);
904 }
905 
DFileWriteHandle(const uint8_t * frame,size_t len,void * context)906 int32_t DFileWriteHandle(const uint8_t *frame, size_t len, void *context)
907 {
908     PeerInfo *peerInfo = context;
909     DFileSession *session = peerInfo->session;
910     QueueNode *queueNode = NULL;
911     struct sockaddr_in peerAddr;
912 
913     peerAddr = peerInfo->dstAddr;
914     queueNode = CreateQueueNode(frame, len, &peerAddr, peerInfo->socketIndex);
915     if (queueNode == NULL) {
916         return NSTACKX_ENOMEM;
917     }
918 
919     if (PthreadMutexLock(&session->outboundQueueLock) != 0) {
920         DFILE_LOGE(TAG, "pthread mutex lock failed");
921         free(queueNode->frame);
922         free(queueNode);
923         return NSTACKX_EFAILED;
924     }
925     ListInsertTail(&session->outboundQueue, &queueNode->list);
926     session->outboundQueueSize++;
927     if (PthreadMutexUnlock(&session->outboundQueueLock) != 0) {
928         /* Don't need to free node, as it's mount to outboundQueue */
929         DFILE_LOGE(TAG, "pthread mutex unlock failed");
930         return NSTACKX_EFAILED;
931     }
932     SemPost(&session->outboundQueueWait[peerInfo->socketIndex]);
933     return (int32_t)len;
934 }
935 
BindMainLoopToTargetCpu(void)936 static void BindMainLoopToTargetCpu(void)
937 {
938     int32_t cpu;
939     int32_t cpus = GetCpuNum();
940     if (cpus >= FIRST_CPU_NUM_LEVEL) {
941         return;
942     } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
943         cpu = CPU_IDX_0;
944     } else {
945         return;
946     }
947     StartThreadBindCore(cpu);
948 }
949 
GetEpollWaitTimeOut(DFileSession * session)950 static int64_t GetEpollWaitTimeOut(DFileSession *session)
951 {
952     int64_t minTimeout = DEFAULT_WAIT_TIME_MS;
953     if (session->mainLoopActiveReadFlag && session->inboundQueueSize) {
954         minTimeout = 0;
955         return minTimeout;
956     }
957     List *pos = NULL;
958     DFileTrans *trans = NULL;
959     int64_t timeout;
960     LIST_FOR_EACH(pos, &session->dFileTransChain) {
961         trans = (DFileTrans *)pos;
962         timeout = DFileTransGetTimeout(trans);
963         if (timeout >= 0 && timeout < minTimeout) {
964             minTimeout = timeout;
965         }
966     }
967     if (minTimeout > DEFAULT_WAIT_TIME_MS) {
968         minTimeout = DEFAULT_WAIT_TIME_MS;
969     }
970     return minTimeout;
971 }
972 
ProcessSessionTrans(const DFileSession * session,uint16_t exceptTransId)973 static void ProcessSessionTrans(const DFileSession *session, uint16_t exceptTransId)
974 {
975     List *tmp = NULL;
976     List *pos = NULL;
977     LIST_FOR_EACH_SAFE(pos, tmp, &session->dFileTransChain) {
978         DFileTrans *trans = (DFileTrans *)pos;
979         if (trans->transId != exceptTransId) {
980             DFileTransProcess(trans);
981         }
982     }
983 }
984 
NotifyBindPort(const DFileSession * session)985 static void NotifyBindPort(const DFileSession *session)
986 {
987     DFileMsg data;
988     int32_t socketNum;
989     (void)memset_s(&data, sizeof(data), 0, sizeof(data));
990     socketNum = 1;
991 
992     for (int i = 0; i < socketNum; i++) {
993         data.sockAddr[i].sin_port = ntohs(session->socket[i]->srcAddr.sin_port);
994         data.sockAddr[i].sin_addr.s_addr = ntohl(session->socket[i]->srcAddr.sin_addr.s_addr);
995     }
996     NotifyMsgRecver(session, DFILE_ON_BIND, &data);
997 }
998 
DFileMainLoop(void * arg)999 void *DFileMainLoop(void *arg)
1000 {
1001     DFileSession *session = arg;
1002     int32_t ret = NSTACKX_EOK;
1003     DFileMsg msgData;
1004     (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
1005     uint8_t isBind = NSTACKX_FALSE;
1006     DFILE_LOGI(TAG, "main thread start");
1007     SetThreadName(DFFILE_MAIN_THREAD_NAME);
1008     SetMaximumPriorityForThread();
1009     SetTidToBindInfo(session, POS_MAIN_THERD_START);
1010     NotifyBindPort(session);
1011     while (!session->closeFlag) {
1012         int64_t minTimeout = GetEpollWaitTimeOut(session);
1013         ret = EpollLoop(session->epollfd, (int32_t)minTimeout);
1014         if (ret == NSTACKX_EFAILED) {
1015             DFILE_LOGE(TAG, "epoll wait failed");
1016             break;
1017         }
1018         if (isBind == NSTACKX_FALSE && session->transFlag == NSTACKX_TRUE) {
1019             BindMainLoopToTargetCpu();
1020             isBind = NSTACKX_TRUE;
1021         }
1022         ProcessSessionTrans(session, 0);
1023         if (session->mainLoopActiveReadFlag && session->inboundQueueSize) {
1024             session->partReadFlag = NSTACKX_TRUE;
1025             ReadEventHandle(session);
1026             session->partReadFlag = NSTACKX_FALSE;
1027         }
1028     }
1029 
1030     if (ret == NSTACKX_EFAILED || DFileSessionCheckFatalFlag(session)) {
1031         msgData.errorCode = NSTACKX_EFAILED;
1032         NotifyMsgRecver(session, DFILE_ON_FATAL_ERROR, &msgData);
1033     }
1034 
1035     /* Notify sender thread to terminate */
1036     PostOutboundQueueWait(session);
1037 
1038     /* Unblock "select" and notify receiver thread to terminate */
1039     NotifyPipeEvent(session);
1040     return NULL;
1041 }
1042 
AmendPeerInfoSendRate(PeerInfo * peerInfo)1043 static void AmendPeerInfoSendRate(PeerInfo *peerInfo)
1044 {
1045     peerInfo->amendSendRate = peerInfo->sendRate;
1046     DFILE_LOGI(TAG, "current: sendrate %u, realsendframerate %u---new amendSendRate %d",
1047         peerInfo->sendRate, peerInfo->sendFrameRate, peerInfo->amendSendRate);
1048 }
1049 
DFileSendCalculateRate(DFileSession * session,PeerInfo * peerInfo)1050 static void DFileSendCalculateRate(DFileSession *session, PeerInfo *peerInfo)
1051 {
1052     uint64_t sendCount;
1053 
1054     if (session->sessionType != DFILE_SESSION_TYPE_CLIENT) {
1055         return;
1056     }
1057 
1058     struct timespec nowTime;
1059     ClockGetTime(CLOCK_MONOTONIC, &nowTime);
1060     uint64_t measureElapse = GetTimeDiffUs(&nowTime, &peerInfo->startTime);
1061     /* just calculate io rate */
1062     if (measureElapse > peerInfo->rateStateInterval) {
1063         /* ONLY PEER 0 calculate io rate */
1064         sendCount = (uint64_t)peerInfo->sendCount;
1065         if (peerInfo->socketIndex == 0) {
1066             session->fileManager->iorRate = (uint32_t)(session->fileManager->iorBytes *
1067                 NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1068             DFILE_LOGI(TAG, "IO read rate: %u MB/s send list full times %u", session->fileManager->iorRate,
1069                 session->fileManager->sendListFullTimes);
1070             session->fileManager->sendListFullTimes = 0;
1071             session->fileManager->iorBytes = 0;
1072         }
1073 
1074         peerInfo->sendFrameRate = (uint32_t)(sendCount  * DATA_FRAME_SEND_INTERVAL_US / measureElapse);
1075         peerInfo->sendCountRateMB = (uint32_t)(sendCount * peerInfo->dataFrameSize *
1076             NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1077         if (peerInfo->qdiscSearchNum != 0) {
1078             peerInfo->qdiscAveLeft = peerInfo->qdiscAveLeft / peerInfo->qdiscSearchNum;
1079         }
1080 
1081         DFILE_LOGI(TAG, "framesize %u maxsendrate %u sendRate %u, amendSendRate %d sendCount %llu,"
1082                   "measureElapse %llu sendFrameRate %u %uMB/s,"
1083                   "total send block num %llu eAgainCount %u send list empty times %u sleep times %u, noPendingData %u,"
1084                   "min qdisc %u max qdisc %u search num %u ave qdisc %u"
1085                   "totalRecvBlocks %llu socket:%u "
1086                   "overRun %llu maxRetryCountPerSec %u maxRetryCountLastSec %u wlanCatagory %u",
1087             peerInfo->dataFrameSize, peerInfo->maxSendRate, peerInfo->sendRate, peerInfo->amendSendRate,
1088             peerInfo->sendCount, measureElapse, peerInfo->sendFrameRate, peerInfo->sendCountRateMB,
1089             NSTACKX_ATOM_FETCH(&(session->totalSendBlocks)), peerInfo->eAgainCount,
1090             NSTACKX_ATOM_FETCH(&(session->sendBlockListEmptyTimes)), session->sleepTimes,
1091             NSTACKX_ATOM_FETCH(&(session->noPendingDataTimes)), peerInfo->qdiscMinLeft,
1092             peerInfo->qdiscMaxLeft, peerInfo->qdiscSearchNum, peerInfo->qdiscAveLeft,
1093             NSTACKX_ATOM_FETCH(&(session->totalRecvBlocks)), peerInfo->socketIndex,
1094             peerInfo->overRun, peerInfo->maxRetryCountPerSec, peerInfo->maxRetryCountLastSec, session->wlanCatagory);
1095         AmendPeerInfoSendRate(peerInfo);
1096         ClearSessionStats(session);
1097         ClearPeerinfoStats(peerInfo);
1098         ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1099     }
1100 }
1101 
UpdateAllTransRetryCount(DFileSession * session,PeerInfo * peerInfo)1102 void UpdateAllTransRetryCount(DFileSession *session, PeerInfo *peerInfo)
1103 {
1104     List *pos = NULL;
1105     DFileTrans *trans = NULL;
1106     uint32_t allRetryCount = 0;
1107     LIST_FOR_EACH(pos, &session->dFileTransChain) { /* for client , only one peer */
1108         trans = (DFileTrans *)pos;
1109         allRetryCount += trans->retryCount;
1110     }
1111     peerInfo->allDtransRetryCount = allRetryCount;
1112 }
1113 
DFileRecvCalculateRate(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)1114 static void DFileRecvCalculateRate(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
1115 {
1116     struct timespec nowTime;
1117     uint64_t recvCount;
1118     uint32_t timeOut;
1119 
1120     if (session->sessionType != DFILE_SESSION_TYPE_SERVER ||
1121         dFileFrame->header.type != NSTACKX_DFILE_FILE_DATA_FRAME) {
1122         return;
1123     }
1124 
1125     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
1126     if (peerInfo == NULL) {
1127         return;
1128     }
1129 
1130     timeOut = (peerInfo->connType == CONNECT_TYPE_P2P) ? NSTACKX_P2P_MAX_CONTROL_FRAME_TIMEOUT :
1131         NSTACKX_WLAN_MAX_CONTROL_FRAME_TIMEOUT;
1132 
1133     peerInfo->recvCount++;
1134     ClockGetTime(CLOCK_MONOTONIC, &nowTime);
1135     uint64_t measureElapse = GetTimeDiffUs(&nowTime, &session->measureBefore);
1136     if (measureElapse > peerInfo->rateStateInterval) {
1137         session->fileManager->iowRate = (uint32_t)(session->fileManager->iowBytes *
1138             NSTACKX_MICRO_TICKS / measureElapse / DFILE_KILOBYTES);
1139         session->fileManager->iowCount = session->fileManager->iowBytes / peerInfo->dataFrameSize *
1140             (NSTACKX_MILLI_TICKS * timeOut - peerInfo->rateStateInterval) / measureElapse;
1141         if (session->fileManager->iowRate > session->fileManager->iowMaxRate) {
1142             session->fileManager->iowMaxRate = session->fileManager->iowRate;
1143         }
1144         DFILE_LOGI(TAG, "measureElapse %llu iowBytes %llu iowCount %llu IO write rate : %u KB/s", measureElapse,
1145             session->fileManager->iowBytes, session->fileManager->iowCount, session->fileManager->iowRate);
1146         session->fileManager->iowBytes = 0;
1147         ClockGetTime(CLOCK_MONOTONIC, &session->measureBefore);
1148     }
1149 
1150     measureElapse = GetTimeDiffUs(&nowTime, &peerInfo->startTime);
1151     if (measureElapse > peerInfo->rateStateInterval) {
1152         recvCount = (uint64_t)peerInfo->recvCount;
1153         peerInfo->recvFrameRate = (uint32_t)(recvCount * DATA_FRAME_SEND_INTERVAL_US / measureElapse);
1154         peerInfo->recvCountRateMB = (uint32_t)(recvCount * peerInfo->dataFrameSize *
1155             NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1156         peerInfo->recvCount = 0;
1157         ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1158     }
1159 }
1160 
CheckElapseTime(const struct timespec * before,uint64_t overRun)1161 static uint64_t CheckElapseTime(const struct timespec *before, uint64_t overRun)
1162 {
1163     struct timespec now;
1164     ClockGetTime(CLOCK_MONOTONIC, &now);
1165     uint64_t elapseUs = GetTimeDiffUs(&now, before);
1166     elapseUs += overRun;
1167     if (elapseUs < DATA_FRAME_SEND_INTERVAL_US) {
1168         if (usleep((useconds_t)((uint64_t)DATA_FRAME_SEND_INTERVAL_US - elapseUs)) != NSTACKX_EOK) {
1169             DFILE_LOGE(TAG, "usleep(DATA_FRAME_SEND_INTERVAL_US - elapseUs) failed %d", errno);
1170         }
1171         return 0;
1172     } else {
1173         uint64_t delta = (elapseUs - DATA_FRAME_SEND_INTERVAL_US);
1174         return delta;
1175     }
1176 }
1177 
BindClientSendThreadToTargetCpu(uint32_t idx)1178 static void BindClientSendThreadToTargetCpu(uint32_t idx)
1179 {
1180     int32_t cpu;
1181     int32_t cpus = GetCpuNum();
1182     if (cpus >= FIRST_CPU_NUM_LEVEL) {
1183         return;
1184     } else if (cpus >= SECOND_CPU_NUM_LEVEL) {
1185         cpu = CPU_IDX_1 + (int32_t)idx;
1186     } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
1187         cpu = CPU_IDX_1;
1188     } else {
1189         return;
1190     }
1191     if (cpu > cpus) {
1192         cpu = cpus - 1;
1193     }
1194     StartThreadBindCore(cpu);
1195 }
1196 
DFileSenderUpdateMeasureTime(DFileSession * session,uint8_t socketIndex)1197 static void DFileSenderUpdateMeasureTime(DFileSession *session, uint8_t socketIndex)
1198 {
1199     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
1200         PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
1201         if (peerInfo == NULL) {
1202             return;
1203         }
1204         ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1205         BindClientSendThreadToTargetCpu(0);
1206     }
1207 }
1208 
TerminateMainThreadFatalInner(void * arg)1209 static void TerminateMainThreadFatalInner(void *arg)
1210 {
1211     DFileSession *session = (DFileSession *)arg;
1212     DFileSessionSetFatalFlag(session);
1213 }
1214 
PostFatalEvent(DFileSession * session)1215 static void PostFatalEvent(DFileSession *session)
1216 {
1217     if (PostEvent(&session->eventNodeChain, session->epollfd, TerminateMainThreadFatalInner, session) != NSTACKX_EOK) {
1218         DFILE_LOGE(TAG, "PostEvent TerminateMainThreadFatalInner failed");
1219         DFileSessionSetFatalFlag(session);
1220     }
1221 }
1222 
GetSocketWaitMs(uint32_t threadNum)1223 static uint32_t GetSocketWaitMs(uint32_t threadNum)
1224 {
1225     if (threadNum > 1) {
1226         return MULTI_THREADS_SOCKET_WAIT_TIME_MS;
1227     }
1228     return DEFAULT_WAIT_TIME_MS;
1229 }
1230 
SetSendThreadName(uint32_t threadIdx)1231 static void SetSendThreadName(uint32_t threadIdx)
1232 {
1233     char name[MAX_THREAD_NAME_LEN] = {0};
1234     if (sprintf_s(name, sizeof(name), "%s%u", DFFILE_SEND_THREAD_NAME_PREFIX, threadIdx) < 0) {
1235         DFILE_LOGE(TAG, "sprintf send thead name failed");
1236     }
1237     SetThreadName(name);
1238 }
1239 
1240 typedef struct {
1241     struct DFileSession *session;
1242     uint32_t threadIdx;
1243 }SendThreadCtx;
1244 
DFileAddiSenderHandle(void * arg)1245 static void *DFileAddiSenderHandle(void *arg)
1246 {
1247     SendThreadCtx *sendThreadCtx = (SendThreadCtx *)arg;
1248     struct DFileSession *session = sendThreadCtx->session;
1249     uint32_t threadIdx = sendThreadCtx->threadIdx;
1250     free(sendThreadCtx);
1251     int32_t ret = NSTACKX_EOK;
1252     DFILE_LOGI(TAG, "send thread %u start", threadIdx);
1253     SetSendThreadName(threadIdx);
1254     SetTidToBindInfo(session, threadIdx + POS_SEND_THERD_START);
1255     List unsent;
1256     uint8_t canWrite = NSTACKX_FALSE;
1257     uint32_t socketWaitMs = GetSocketWaitMs(session->clientSendThreadNum);
1258     BindClientSendThreadToTargetCpu(threadIdx + 1);
1259     ListInitHead(&unsent);
1260     while (!session->addiSenderCloseFlag) {
1261         if (ListIsEmpty(&unsent) && !FileManagerHasPendingData(session->fileManager)) {
1262             NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
1263             SemWait(&session->sendThreadPara[threadIdx].sendWait);
1264             if (session->addiSenderCloseFlag) {
1265                 break;
1266             }
1267         }
1268         if (ret == NSTACKX_EAGAIN) {
1269             ret = WaitSocketEvent(NULL, session->socket[0]->sockfd, socketWaitMs, NULL, &canWrite);
1270             if (ret != NSTACKX_EOK || session->closeFlag) {
1271                 break;
1272             }
1273             if (!canWrite) {
1274                 ret = NSTACKX_EAGAIN;
1275                 continue;
1276             }
1277         }
1278         ret = SendDataFrame(session, &unsent, threadIdx, 0);
1279         if ((ret < 0 && ret != NSTACKX_EAGAIN) || session->closeFlag) {
1280             break;
1281         }
1282         if (ret == NSTACKX_EAGAIN) {
1283             continue;
1284         }
1285         SemWait(&session->sendThreadPara[threadIdx].semNewCycle);
1286     }
1287     if (ret < 0 && ret != NSTACKX_EAGAIN) {
1288         PostFatalEvent(session);
1289     }
1290     DestroyIovList(&unsent, session, threadIdx);
1291     return NULL;
1292 }
1293 
CloseAddiSendThread(struct DFileSession * session)1294 static void CloseAddiSendThread(struct DFileSession *session)
1295 {
1296     if (session->sessionType == DFILE_SESSION_TYPE_SERVER || session->clientSendThreadNum <= 1) {
1297         return;
1298     }
1299     session->addiSenderCloseFlag = NSTACKX_TRUE;
1300     for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
1301         SemPost(&session->sendThreadPara[i].sendWait);
1302         SemPost(&session->sendThreadPara[i].semNewCycle);
1303         PthreadJoin(session->sendThreadPara[i].senderTid, NULL);
1304         SemDestroy(&session->sendThreadPara[i].sendWait);
1305         SemDestroy(&session->sendThreadPara[i].semNewCycle);
1306     }
1307 }
1308 
ErrorHandle(struct DFileSession * session,uint16_t cnt)1309 static void ErrorHandle(struct DFileSession *session, uint16_t cnt)
1310 {
1311     while (cnt > 0) {
1312         SemPost(&session->sendThreadPara[cnt - 1].sendWait);
1313         SemPost(&session->sendThreadPara[cnt - 1].semNewCycle);
1314         PthreadJoin(session->sendThreadPara[cnt - 1].senderTid, NULL);
1315         SemDestroy(&session->sendThreadPara[cnt - 1].sendWait);
1316         SemDestroy(&session->sendThreadPara[cnt - 1].semNewCycle);
1317         cnt--;
1318     }
1319 }
1320 
CreateAddiSendThread(struct DFileSession * session)1321 static int32_t CreateAddiSendThread(struct DFileSession *session)
1322 {
1323     uint16_t i;
1324     if (session->sessionType == DFILE_SESSION_TYPE_SERVER || session->clientSendThreadNum <= 1) {
1325         return NSTACKX_EOK;
1326     }
1327     session->addiSenderCloseFlag = NSTACKX_FALSE;
1328     for (i = 0; i < session->clientSendThreadNum - 1; i++) {
1329         SendThreadCtx *sendThreadCtx = (SendThreadCtx *)calloc(1, sizeof(SendThreadCtx));
1330         if (sendThreadCtx == NULL) {
1331             goto L_ERR_SENDER_THREAD;
1332         }
1333 
1334         if (SemInit(&session->sendThreadPara[i].sendWait, 0, 0) != 0) {
1335             free(sendThreadCtx);
1336             goto L_ERR_SENDER_THREAD;
1337         }
1338 
1339         if (SemInit(&session->sendThreadPara[i].semNewCycle, 0, 0) != 0) {
1340             SemDestroy(&session->sendThreadPara[i].sendWait);
1341             free(sendThreadCtx);
1342             goto L_ERR_SENDER_THREAD;
1343         }
1344 
1345         sendThreadCtx->session = session;
1346         sendThreadCtx->threadIdx = i;
1347         if (PthreadCreate(&(session->sendThreadPara[i].senderTid), NULL, DFileAddiSenderHandle, sendThreadCtx)) {
1348             DFILE_LOGE(TAG, "Create sender thread failed");
1349             SemDestroy(&session->sendThreadPara[i].sendWait);
1350             SemDestroy(&session->sendThreadPara[i].semNewCycle);
1351             free(sendThreadCtx);
1352             goto L_ERR_SENDER_THREAD;
1353         }
1354     }
1355     return NSTACKX_EOK;
1356 
1357 L_ERR_SENDER_THREAD:
1358     session->addiSenderCloseFlag = NSTACKX_TRUE;
1359     ErrorHandle(session, i);
1360     return NSTACKX_EFAILED;
1361 }
1362 
UpdatePeerinfoQdiscInfo(PeerInfo * peerInfo,uint32_t qDiscLeft)1363 static void UpdatePeerinfoQdiscInfo(PeerInfo *peerInfo, uint32_t qDiscLeft)
1364 {
1365     if (peerInfo->qdiscMinLeft == 0) {
1366         peerInfo->qdiscMinLeft = (uint16_t)qDiscLeft;
1367     } else if (peerInfo->qdiscMinLeft > qDiscLeft) {
1368         peerInfo->qdiscMinLeft = (uint16_t)qDiscLeft;
1369     }
1370     if (peerInfo->qdiscMaxLeft < qDiscLeft) {
1371         peerInfo->qdiscMaxLeft = (uint16_t)qDiscLeft;
1372     }
1373     peerInfo->qdiscSearchNum++;
1374     peerInfo->qdiscAveLeft += qDiscLeft;
1375 }
1376 
UpdatePeerinfoAmendSendrateByQdisc(PeerInfo * peerInfo)1377 static void UpdatePeerinfoAmendSendrateByQdisc(PeerInfo *peerInfo)
1378 {
1379     uint32_t qDiscLeft;
1380     uint32_t qDiscLeftSendRate;
1381 
1382     if (GetQdiscLen(peerInfo->localInterfaceName, ROOT_QUEUE, &qDiscLeft) == NSTACKX_EOK) {
1383         if (peerInfo->mtuInuse == 0) {
1384             return;
1385         }
1386         uint32_t mtuNumInOneFrame = peerInfo->dataFrameSize / peerInfo->mtuInuse;
1387         if (mtuNumInOneFrame == 0) {
1388             mtuNumInOneFrame = 1;
1389         }
1390         UpdatePeerinfoQdiscInfo(peerInfo, qDiscLeft);
1391         qDiscLeftSendRate = qDiscLeft / mtuNumInOneFrame;
1392         if (peerInfo->sendRate >= qDiscLeftSendRate) {
1393             peerInfo->amendSendRate = (int32_t)qDiscLeftSendRate;
1394         } else {
1395             peerInfo->amendSendRate = peerInfo->sendRate;
1396         }
1397     }
1398     return;
1399 }
1400 
UpdateInfoAfterSend(DFileSession * session,PeerInfo * peerInfo,int32_t curAmendSendRate,struct timespec * before,uint8_t socketIndex)1401 static void UpdateInfoAfterSend(DFileSession *session, PeerInfo *peerInfo, int32_t curAmendSendRate,
1402     struct timespec *before, uint8_t socketIndex)
1403 {
1404     session->cycleRunning[socketIndex] = NSTACKX_FALSE;
1405     if (curAmendSendRate > 0 && peerInfo->intervalSendCount >= (uint32_t)curAmendSendRate) {
1406         if (peerInfo->decreaseStatus == 1) {
1407             peerInfo->overRun = 0;
1408             peerInfo->decreaseStatus = 0;
1409         }
1410         peerInfo->overRun = CheckElapseTime(before, peerInfo->overRun);
1411         if (peerInfo->overRun == 0) {
1412             session->sleepTimes++;
1413         }
1414     }
1415     peerInfo->intervalSendCount = 0;
1416 
1417     UpdatePeerinfoAmendSendrateByQdisc(peerInfo);
1418 }
1419 
DFileSessionSendFrame(DFileSession * session,QueueNode ** preQueueNode,List * unsent,struct timespec * before,uint8_t socketIndex)1420 static int32_t DFileSessionSendFrame(DFileSession *session, QueueNode **preQueueNode, List *unsent,
1421                                      struct timespec *before, uint8_t socketIndex)
1422 {
1423     int32_t ret;
1424 
1425     if (CapsTcp(session) && !ListIsEmpty(unsent)) {
1426         session->sendRemain = 1;
1427         ret = SendDataFrame(session, unsent, (uint32_t)(session->clientSendThreadNum - 1), socketIndex);
1428         session->sendRemain = 0;
1429         if ((ret == NSTACKX_EFAILED || ret == NSTACKX_EAGAIN) || session->closeFlag) {
1430             DFILE_LOGI(TAG, "ret is %d", ret);
1431             return ret;
1432         }
1433     }
1434 
1435     ret = SendOutboundFrame(session, preQueueNode);
1436     if (session->sessionType != DFILE_SESSION_TYPE_CLIENT) {
1437         return ret;
1438     }
1439     PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
1440     if (peerInfo == NULL) {
1441         DFILE_LOGE(TAG, "can't get valid peerinfo");
1442         return NSTACKX_EFAILED;
1443     }
1444     if (peerInfo->mtuInuse == 0) {
1445         return ret;
1446     }
1447 
1448     if (ret == NSTACKX_EOK) {
1449         if (!session->cycleRunning[socketIndex]) {
1450             session->cycleRunning[socketIndex] = NSTACKX_TRUE;
1451         }
1452         ret = SendDataFrame(session, unsent, (uint32_t)(session->clientSendThreadNum - 1), socketIndex);
1453     }
1454     int32_t curAmendSendRate = peerInfo->amendSendRate;
1455     DFileSendCalculateRate(session, peerInfo);
1456 
1457     if ((ret == NSTACKX_EFAILED || ret == NSTACKX_EAGAIN) || session->closeFlag) {
1458         DFILE_LOGI(TAG, "ret is %d and peerInfo->intervalSendCount is %u", ret, peerInfo->intervalSendCount);
1459         return ret;
1460     }
1461 
1462     UpdateInfoAfterSend(session, peerInfo, curAmendSendRate, before, socketIndex);
1463 
1464     for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
1465         SemPost(&session->sendThreadPara[i].semNewCycle);
1466     }
1467     return ret;
1468 }
1469 
WaitNewSendData(const QueueNode * queueNode,const List * unsent,DFileSession * session,uint8_t socketIndex)1470 static void WaitNewSendData(const QueueNode *queueNode, const List *unsent, DFileSession *session, uint8_t socketIndex)
1471 {
1472     if (queueNode == NULL && ListIsEmpty(unsent) && !FileManagerHasPendingData(session->fileManager) &&
1473         !session->outboundQueueSize) {
1474         NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
1475         SemWait(&session->outboundQueueWait[socketIndex]);
1476     }
1477 }
1478 
DFileSenderPre(DFileSession * session,uint8_t socketIndex)1479 static void DFileSenderPre(DFileSession *session, uint8_t socketIndex)
1480 {
1481     SetSendThreadName((uint32_t)(session->clientSendThreadNum + socketIndex - 1));
1482     DFileSenderUpdateMeasureTime(session, socketIndex);
1483     SetMaximumPriorityForThread();
1484     uint32_t pos = (uint32_t)(session->clientSendThreadNum - 1 + socketIndex + POS_SEND_THERD_START);
1485     if (pos >= POS_SEND_THERD_START + NSTACKX_MAX_CLIENT_SEND_THREAD_NUM) {
1486         return;
1487     }
1488 
1489     SetTidToBindInfo(session, pos);
1490 }
1491 
DFileSenderClose(DFileSession * session,QueueNode * queueNode,List * unsent,void * arg)1492 void DFileSenderClose(DFileSession *session, QueueNode *queueNode, List *unsent, void *arg)
1493 {
1494     DFILE_LOGI(TAG, "DFileSendCalculateRate: total send block num %llu.", session->totalSendBlocks);
1495     CloseAddiSendThread(session);
1496     DestroyIovList(unsent, session, session->clientSendThreadNum - 1U);
1497     DestroyQueueNode(queueNode);
1498     free(arg);
1499     return;
1500 }
1501 
DFileSenderHandle(void * arg)1502 void *DFileSenderHandle(void *arg)
1503 {
1504     DFileSession *session = ((SenderThreadPara*)arg)->session;
1505     uint8_t socketIndex = ((SenderThreadPara*)arg)->socketIndex;
1506     QueueNode *queueNode = NULL;
1507     List unsent;
1508     int32_t ret = NSTACKX_EOK;
1509     struct timespec before;
1510     uint8_t canWrite = NSTACKX_FALSE;
1511     uint32_t socketWaitMs = GetSocketWaitMs(session->clientSendThreadNum);
1512     uint8_t isBind = NSTACKX_FALSE;
1513 
1514     if (CreateAddiSendThread(session) != NSTACKX_EOK) {
1515         PostFatalEvent(session);
1516         return NULL;
1517     }
1518     ListInitHead(&unsent);
1519     DFileSenderPre(session, socketIndex);
1520     while (!session->closeFlag) {
1521         WaitNewSendData(queueNode, &unsent, session, socketIndex);
1522         if (session->closeFlag) {
1523             break;
1524         }
1525         if (!session->cycleRunning[socketIndex]) {
1526             ClockGetTime(CLOCK_MONOTONIC, &before);
1527         }
1528         if (ret == NSTACKX_EAGAIN) {
1529             ret = WaitSocketEvent(NULL, session->socket[socketIndex]->sockfd, socketWaitMs, NULL, &canWrite);
1530             if (ret != NSTACKX_EOK || session->closeFlag) {
1531                 break;
1532             }
1533             if (!canWrite) {
1534                 ret = NSTACKX_EAGAIN;
1535                 continue;
1536             }
1537         }
1538         if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && isBind == NSTACKX_FALSE &&
1539             session->transFlag == NSTACKX_TRUE) {
1540             BindClientSendThreadToTargetCpu(0);
1541             isBind = NSTACKX_TRUE;
1542         }
1543         ret = DFileSessionSendFrame(session, &queueNode, &unsent, &before, socketIndex);
1544         if (ret < 0 && ret != NSTACKX_EAGAIN) {
1545             PeerShuttedEvent();
1546             PostFatalEvent(session);
1547             break;
1548         }
1549     }
1550     DFileSenderClose(session, queueNode, &unsent, arg);
1551     return NULL;
1552 }
1553 
ClearDFileFrameList(List * head)1554 static void ClearDFileFrameList(List *head)
1555 {
1556     if (head == NULL || ListIsEmpty(head)) {
1557         return;
1558     }
1559     List *tmp = NULL;
1560     List *pos = NULL;
1561     LIST_FOR_EACH_SAFE(pos, tmp, head) {
1562         QueueNode *node = (QueueNode *)pos;
1563         ListRemoveNode(&node->list);
1564         DestroyQueueNode(node);
1565     }
1566 }
1567 
CheckDfileType(uint8_t type)1568 static inline int32_t CheckDfileType(uint8_t type)
1569 {
1570     if (type >= NSTACKX_DFILE_TYPE_MAX || type < NSTACKX_DFILE_FILE_HEADER_FRAME) {
1571         return NSTACKX_EFAILED;
1572     }
1573     return NSTACKX_EOK;
1574 }
1575 
CheckSessionType(DFileSessionType type)1576 static inline int32_t CheckSessionType(DFileSessionType type)
1577 {
1578     if (type < DFILE_SESSION_TYPE_CLIENT || type > DFILE_SESSION_TYPE_SERVER) {
1579         return NSTACKX_EFAILED;
1580     }
1581     return NSTACKX_EOK;
1582 }
1583 
ProcessDFileFrameList(DFileSession * session,List * head)1584 static void ProcessDFileFrameList(DFileSession *session, List *head)
1585 {
1586     QueueNode *queueNode = NULL;
1587     DFileFrame *dFileFrame = NULL;
1588     struct sockaddr_in *peerAddr = NULL;
1589     int32_t handleFrameRet = NSTACKX_EOK;
1590     while (!ListIsEmpty(head) && !session->closeFlag) {
1591         queueNode = (QueueNode *)ListPopFront(head);
1592         if (queueNode == NULL) {
1593             continue;
1594         }
1595         dFileFrame = (DFileFrame *)(queueNode->frame);
1596         peerAddr = &queueNode->peerAddr;
1597         uint8_t type = dFileFrame->header.type;
1598 
1599         if (ntohs(dFileFrame->header.length) > NSTACKX_MAX_FRAME_SIZE - sizeof(DFileFrameHeader)) {
1600             DFILE_LOGE(TAG, "header length %u is too big", ntohs(dFileFrame->header.length));
1601             DestroyQueueNode(queueNode);
1602             continue;
1603         }
1604         if (CheckDfileType(dFileFrame->header.type) != NSTACKX_EOK ||
1605             CheckSessionType(session->sessionType) != NSTACKX_EOK) {
1606             handleFrameRet = NSTACKX_EFAILED;
1607         } else if (dFileFrame->header.type == NSTACKX_DFILE_SETTING_FRAME) {
1608             DFileSessionHandleSetting(session, dFileFrame, peerAddr, queueNode->socketIndex);
1609         } else if (dFileFrame->header.type == NSTACKX_DFILE_RST_FRAME &&
1610                    dFileFrame->header.transId == 0) {
1611             DFileSessionHandleRst(session, dFileFrame, peerAddr);
1612         } else if (dFileFrame->header.type == NSTACKX_DFILE_FILE_BACK_PRESSURE_FRAME) {
1613             DFileSessionHandleBackPressure(session, dFileFrame, peerAddr);
1614         } else {
1615             /*
1616              * For NSTACKX_DFILE_FILE_DATA_FRAME, "dFileFrame" may be free when handling failed, and become invalid.
1617              */
1618             handleFrameRet = DFileSessionHandleFrame(session, dFileFrame, peerAddr);
1619         }
1620 
1621         if (handleFrameRet != NSTACKX_EOK || type != NSTACKX_DFILE_FILE_DATA_FRAME) {
1622             /* For FILE_DATA frame, the memory is passed to file manager. */
1623             free(queueNode->frame);
1624             queueNode->frame = NULL;
1625         }
1626         free(queueNode);
1627     }
1628     ClearDFileFrameList(head);
1629 }
1630 
ReadEventHandle(void * arg)1631 static void ReadEventHandle(void *arg)
1632 {
1633     DFileSession *session = arg;
1634     List newHead;
1635     ListInitHead(&newHead);
1636     struct timespec before, now;
1637     if (!session->partReadFlag) {
1638         NSTACKX_ATOM_FETCH_DEC(&session->unprocessedReadEventCount);
1639     } else {
1640         ClockGetTime(CLOCK_MONOTONIC, &before);
1641     }
1642     while (session->inboundQueueSize && !session->closeFlag) {
1643         if (session->partReadFlag) {
1644             ClockGetTime(CLOCK_MONOTONIC, &now);
1645             if (GetTimeDiffMs(&now, &before) >= DEFAULT_WAIT_TIME_MS) {
1646                 break;
1647             }
1648         }
1649         if (PthreadMutexLock(&session->inboundQueueLock) != 0) {
1650             DFILE_LOGE(TAG, "PthreadMutexLock error");
1651             return;
1652         }
1653         ListMove(&session->inboundQueue, &newHead);
1654         session->recvBlockNumInner += session->inboundQueueSize;
1655         session->inboundQueueSize = 0;
1656         if (PthreadMutexUnlock(&session->inboundQueueLock) != 0) {
1657             DFILE_LOGE(TAG, "PthreadMutexUnlock error");
1658             break;
1659         }
1660         ProcessDFileFrameList(session, &newHead);
1661     }
1662     ClearDFileFrameList(&newHead);
1663 }
1664 
DFileAddInboundQueue(DFileSession * session,const uint8_t * frame,size_t frameLength,struct sockaddr_in * peerAddr,uint8_t socketIndex)1665 static int32_t DFileAddInboundQueue(DFileSession *session, const uint8_t *frame, size_t frameLength,
1666                                     struct sockaddr_in *peerAddr, uint8_t socketIndex)
1667 {
1668     if (session->inboundQueueSize > MAX_RECVBUF_COUNT) {
1669         if (session->inboundQueueSize % MAX_NOMEM_PRINT == 0) {
1670             DFILE_LOGI(TAG, "no mem inboundQueueSize:%llu", session->inboundQueueSize);
1671         }
1672         return NSTACKX_ENOMEM;
1673     }
1674     QueueNode *queueNode = CreateQueueNode(frame, frameLength, peerAddr, socketIndex);
1675     if (queueNode == NULL) {
1676         return NSTACKX_ENOMEM;
1677     }
1678 
1679     if (PthreadMutexLock(&session->inboundQueueLock) != 0) {
1680         DestroyQueueNode(queueNode);
1681         return NSTACKX_EFAILED;
1682     }
1683     ListInsertTail(&session->inboundQueue, &queueNode->list);
1684     session->inboundQueueSize++;
1685     session->recvBlockNumDirect++;
1686 
1687     if (PthreadMutexUnlock(&session->inboundQueueLock) != 0) {
1688         /* queue node is pushed to list, don't need destroy here. */
1689         return NSTACKX_EFAILED;
1690     }
1691 
1692     return NSTACKX_EOK;
1693 }
1694 
DFileReceiverUpdateSessionMeasureTime(DFileSession * session)1695 static void DFileReceiverUpdateSessionMeasureTime(DFileSession *session)
1696 {
1697     if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
1698         ClockGetTime(CLOCK_MONOTONIC, &session->measureBefore);
1699     }
1700 }
1701 
BindServerRecvThreadToTargetCpu(DFileSession * session)1702 static void BindServerRecvThreadToTargetCpu(DFileSession *session)
1703 {
1704     int32_t cpu;
1705     int32_t cpus = GetCpuNum();
1706     if (session->sessionType != DFILE_SESSION_TYPE_SERVER) {
1707         cpu = CPU_IDX_2;
1708         StartThreadBindCore(cpu);
1709         return;
1710     }
1711     if (cpus >= FIRST_CPU_NUM_LEVEL) {
1712         return;
1713     } else if (cpus >= SECOND_CPU_NUM_LEVEL) {
1714         cpu = CPU_IDX_1;
1715     } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
1716         cpu = CPU_IDX_0;
1717     } else {
1718         return;
1719     }
1720     StartThreadBindCore(cpu);
1721 }
1722 
PostReadEventToMainLoop(DFileSession * session)1723 static void PostReadEventToMainLoop(DFileSession *session)
1724 {
1725     if (NSTACKX_ATOM_FETCH(&(session->unprocessedReadEventCount)) >= MAX_UNPROCESSED_READ_EVENT_COUNT) {
1726         return;
1727     }
1728     NSTACKX_ATOM_FETCH_INC(&session->unprocessedReadEventCount);
1729     if (PostEvent(&session->eventNodeChain, session->epollfd, ReadEventHandle, session) != NSTACKX_EOK) {
1730         DFILE_LOGE(TAG, "post read event failed");
1731         NSTACKX_ATOM_FETCH_DEC(&session->unprocessedReadEventCount);
1732         session->mainLoopActiveReadFlag = NSTACKX_TRUE;
1733     } else {
1734         session->mainLoopActiveReadFlag = NSTACKX_FALSE;
1735     }
1736 }
1737 
DFileSessionHandleReadBuffer(DFileSession * session,const uint8_t * buf,size_t bufLen,struct sockaddr_in * peerAddr,uint8_t socketIndex)1738 int32_t DFileSessionHandleReadBuffer(DFileSession *session, const uint8_t *buf, size_t bufLen,
1739                                      struct sockaddr_in *peerAddr, uint8_t socketIndex)
1740 {
1741     DFileFrame *dFileFrame = NULL;
1742     if (DecodeDFileFrame(buf, bufLen, &dFileFrame) != NSTACKX_EOK) {
1743         /* discard packet with non-zero trans id during cancel stage */
1744         DFILE_LOGE(TAG, "drop malformed frame");
1745         return NSTACKX_EOK;
1746     }
1747 
1748     int32_t ret = DFileAddInboundQueue(session, buf, bufLen, peerAddr, socketIndex);
1749     if (ret == NSTACKX_ENOMEM) {
1750         return NSTACKX_EOK;
1751     }
1752     if (ret != NSTACKX_EOK) {
1753         DFILE_LOGE(TAG, "frame add in bound queue failed :%d", ret);
1754         return NSTACKX_EFAILED;
1755     }
1756     PostReadEventToMainLoop(session);
1757     DFileRecvCalculateRate(session, dFileFrame, peerAddr);
1758     return NSTACKX_EOK;
1759 }
1760 
DFileRecverPre(DFileSession * session)1761 static void DFileRecverPre(DFileSession *session)
1762 {
1763     SetThreadName(DFFILE_RECV_THREAD_NAME);
1764     DFileReceiverUpdateSessionMeasureTime(session);
1765     SetMaximumPriorityForThread();
1766     SetTidToBindInfo(session, POS_RECV_THERD_START);
1767 }
1768 
RcverWaitSocket(DFileSession * session,uint8_t * canRead)1769 int32_t RcverWaitSocket(DFileSession *session, uint8_t *canRead)
1770 {
1771     if (session->acceptFlag == 0) {
1772         return WaitSocketEvent(session, session->socket[0]->sockfd, DEFAULT_WAIT_TIME_MS, canRead, NULL);
1773     } else {
1774         return WaitSocketEvent(session, session->acceptSocket->sockfd, DEFAULT_WAIT_TIME_MS, canRead, NULL);
1775     }
1776 }
1777 
DFileSocketRecv(DFileSession * session)1778 int32_t DFileSocketRecv(DFileSession *session)
1779 {
1780     return DFileSocketRecvSP(session);
1781 }
1782 
DFileAcceptSocket(DFileSession * session)1783 int32_t DFileAcceptSocket(DFileSession *session)
1784 {
1785     session->acceptSocket = AcceptSocket(session->socket[0]);
1786     if (session->acceptSocket == NULL) {
1787         DFILE_LOGI(TAG, "accept socket failed");
1788         return NSTACKX_EFAILED;
1789     } else {
1790         DFILE_LOGI(TAG, "accept socket success");
1791         session->acceptFlag = 1;
1792         SetTcpKeepAlive(session->acceptSocket->sockfd);
1793     }
1794 
1795     AcceptSocketEvent();
1796 
1797     return NSTACKX_EOK;
1798 }
1799 
DFileReceiverHandle(void * arg)1800 void *DFileReceiverHandle(void *arg)
1801 {
1802     DFileSession *session = arg;
1803     uint8_t canRead = NSTACKX_FALSE;
1804     int32_t ret = NSTACKX_EAGAIN;
1805     uint8_t isBind = NSTACKX_FALSE;
1806 
1807     DFILE_LOGI(TAG, "recv thread start");
1808     DFileRecverPre(session);
1809     while (!session->closeFlag) {
1810         if (ret == NSTACKX_EAGAIN || !canRead) {
1811             ret = RcverWaitSocket(session, &canRead);
1812             if (ret != NSTACKX_EOK || session->closeFlag) {
1813                 break;
1814             }
1815         }
1816         if (!canRead) {
1817             continue;
1818         }
1819         if (isBind == NSTACKX_FALSE && session->transFlag == NSTACKX_TRUE) {
1820             BindServerRecvThreadToTargetCpu(session);
1821             isBind = NSTACKX_TRUE;
1822         }
1823 
1824         ret = DFileSocketRecv(session);
1825         if (ret != NSTACKX_EAGAIN && ret != NSTACKX_EOK) {
1826             PeerShuttedEvent();
1827             break;
1828         }
1829     }
1830     DFILE_LOGI(TAG, "Total recv blocks: direct %llu inner %llu", session->recvBlockNumDirect,
1831         session->recvBlockNumInner);
1832     if (ret < 0 && ret != NSTACKX_EAGAIN && ret != NSTACKX_PEER_CLOSE) {
1833         PostFatalEvent(session);
1834     }
1835 
1836     DFILE_LOGI(TAG, "session %u Exit receiver thread ret %d", session->sessionId, ret);
1837     return NULL;
1838 }
1839 
DFileControlHandle(void * arg)1840 void *DFileControlHandle(void *arg)
1841 {
1842     SetThreadName(DFFILE_CONTROL_THREAD_NAME);
1843     DFileSession *session = arg;
1844     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
1845         DFileSenderControlHandle(session);
1846     } else {
1847         DFileReceiverControlHandle(session);
1848     }
1849     return NULL;
1850 }
1851 
RealPathFileName(FileListInfo * fileListInfo)1852 static int32_t RealPathFileName(FileListInfo *fileListInfo)
1853 {
1854     uint32_t i;
1855     int32_t ret = NSTACKX_EOK;
1856     for (i = 0; i < fileListInfo->fileNum; i++) {
1857         char *tmpFileName = fileListInfo->files[i];
1858         char *tmpFileNameRes = realpath(tmpFileName, NULL);
1859         if (tmpFileNameRes == NULL) {
1860             ret = NSTACKX_EFAILED;
1861             break;
1862         }
1863         fileListInfo->files[i] = tmpFileNameRes;
1864         free(tmpFileName);
1865     }
1866     if (ret != NSTACKX_EOK) {
1867         DFILE_LOGE(TAG, "realpath failed");
1868     }
1869     return ret;
1870 }
1871 
FreeTransFileListInfo(FileListInfo * fileListInfo)1872 static void FreeTransFileListInfo(FileListInfo *fileListInfo)
1873 {
1874     free(fileListInfo->files);
1875     fileListInfo->files = NULL;
1876     if (fileListInfo->remotePath != NULL) {
1877         free(fileListInfo->remotePath);
1878         fileListInfo->remotePath = NULL;
1879     }
1880     free(fileListInfo);
1881 }
1882 
DFileStartTransInner(DFileSession * session,FileListInfo * fileListInfo)1883 static int32_t DFileStartTransInner(DFileSession *session, FileListInfo *fileListInfo)
1884 {
1885     uint16_t transId = session->lastDFileTransId + 1;
1886     if (transId == 0) { /* overflow */
1887         transId = 1;
1888     }
1889 
1890     PeerInfo *peerInfo = TransSelectPeerInfo(session);
1891     DFileTrans *trans = CreateTrans(transId, session, peerInfo, NSTACKX_TRUE);
1892     if (trans == NULL) {
1893         DFILE_LOGE(TAG, "CreateTrans error");
1894         return NSTACKX_ENOMEM;
1895     }
1896 
1897     if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
1898         DFILE_LOGE(TAG, "set trans mtu failed");
1899     }
1900     if (RealPathFileName(fileListInfo) != NSTACKX_EOK) {
1901         DFileTransDestroy(trans);
1902         return NSTACKX_EFAILED;
1903     }
1904     int32_t ret = DFileTransSendFiles(trans, fileListInfo);
1905     if (ret != NSTACKX_EOK) {
1906         DFileTransDestroy(trans);
1907         DFILE_LOGE(TAG, "DFileTransSendFiles fail, error: %d", ret);
1908         return ret;
1909     }
1910     ret = DFileTransAddExtraInfo(trans, fileListInfo->pathType, fileListInfo->noticeFileNameType,
1911                                  fileListInfo->userData);
1912     if (ret != NSTACKX_EOK) {
1913         DFILE_LOGE(TAG, "DFileTransAddExtraInfo fail");
1914         DFileTransDestroy(trans);
1915         return NSTACKX_EFAILED;
1916     }
1917     trans->fileList->tarFlag = fileListInfo->tarFlag;
1918     trans->fileList->smallFlag = fileListInfo->smallFlag;
1919     trans->fileList->tarFile = fileListInfo->tarFile;
1920     trans->fileList->noSyncFlag = fileListInfo->noSyncFlag;
1921 
1922     fileListInfo->userData = NULL;
1923     ListInsertTail(&(session->dFileTransChain), &(trans->list));
1924     session->lastDFileTransId = transId;
1925     if (fileListInfo->smallFlag == NSTACKX_TRUE) {
1926         session->smallListProcessingCnt++;
1927     } else {
1928         session->fileListProcessingCnt++;
1929     }
1930     /* Elements in ctx->fileListInfo->files[] are reused by dFileTranns, so don't need to free. */
1931     FreeTransFileListInfo(fileListInfo);
1932     return NSTACKX_EOK;
1933 }
1934 
DFileStartTrans(DFileSession * session,FileListInfo * fileListInfo)1935 int32_t DFileStartTrans(DFileSession *session, FileListInfo *fileListInfo)
1936 {
1937     if (PthreadMutexLock(&session->transIdLock) != 0) {
1938         DFILE_LOGE(TAG, "pthread mutex lock error");
1939         return NSTACKX_EFAILED;
1940     }
1941     int32_t ret = DFileStartTransInner(session, fileListInfo);
1942     if (PthreadMutexUnlock(&session->transIdLock) != 0) {
1943         DFILE_LOGE(TAG, "pthread mutex unlock error");
1944     }
1945     return ret;
1946 }
1947 
TerminateMainThreadInner(void * arg)1948 void TerminateMainThreadInner(void *arg)
1949 {
1950     DFileSession *session = (DFileSession *)arg;
1951     DFileSessionSetTerminateFlag(session);
1952 }
1953 
StartDFileThreadsInner(DFileSession * session)1954 int32_t StartDFileThreadsInner(DFileSession *session)
1955 {
1956     if (PthreadCreate(&(session->tid), NULL, DFileMainLoop, session)) {
1957         DFILE_LOGE(TAG, "Create mainloop thread failed");
1958         goto L_ERR_MAIN_LOOP_THREAD;
1959     }
1960 
1961     if (CreateSenderThread(session) != NSTACKX_EOK) {
1962         goto L_ERR_SENDER_THREAD;
1963     }
1964 
1965     if (PthreadCreate(&(session->receiverTid), NULL, DFileReceiverHandle, session)) {
1966         DFILE_LOGE(TAG, "Create receiver thread failed");
1967         goto L_ERR_RECEIVER_THREAD;
1968     }
1969 
1970     if (PthreadCreate(&(session->controlTid), NULL, DFileControlHandle, session)) {
1971         DFILE_LOGE(TAG, "Create control thread failed");
1972         goto L_ERR_CONTROL_THREAD;
1973     }
1974     return NSTACKX_EOK;
1975 L_ERR_CONTROL_THREAD:
1976     DFileSessionSetTerminateFlag(session);
1977     PthreadJoin(session->controlTid, NULL);
1978     session->receiverTid = INVALID_TID;
1979 L_ERR_RECEIVER_THREAD:
1980     DFileSessionSetTerminateFlag(session);
1981     PthreadJoin(session->senderTid[0], NULL);
1982     session->senderTid[0] = INVALID_TID;
1983     PostOutboundQueueWait(session);
1984 L_ERR_SENDER_THREAD:
1985     DFileSessionSetTerminateFlag(session);
1986     if (PostEvent(&session->eventNodeChain, session->epollfd, TerminateMainThreadInner, session) != NSTACKX_EOK) {
1987         DFILE_LOGE(TAG, "post terminate thread failed");
1988     }
1989     PthreadJoin(session->tid, NULL);
1990     session->tid = INVALID_TID;
1991 L_ERR_MAIN_LOOP_THREAD:
1992     return NSTACKX_EFAILED;
1993 }
1994 
FileManagerMsgHandle(FileManagerMsgType msgType,int32_t errCode,void * context)1995 static void FileManagerMsgHandle(FileManagerMsgType msgType, int32_t errCode, void *context)
1996 {
1997     DFileSession *session = context;
1998     if (msgType == FILE_MANAGER_INNER_ERROR) {
1999         DFILE_LOGE(TAG, "Session (%u) fatal error -- File Manager error: %d", session->sessionId, errCode);
2000         PostFatalEvent(session);
2001     }
2002 
2003     if (msgType == FILE_MANAGER_IN_PROGRESS) {
2004         NoticeSessionProgress(session);
2005     }
2006 }
2007 
CreateFileManager(DFileSession * session,const uint8_t * key,uint32_t keyLen,uint8_t isSender,uint16_t connType)2008 int32_t CreateFileManager(DFileSession *session, const uint8_t *key, uint32_t keyLen, uint8_t isSender,
2009     uint16_t connType)
2010 {
2011     FileManagerMsgPara msgPara;
2012     if (session == NULL) {
2013         DFILE_LOGE(TAG, "invalid input");
2014         return NSTACKX_EINVAL;
2015     }
2016     if (isSender && (connType != CONNECT_TYPE_P2P && connType != CONNECT_TYPE_WLAN)) {
2017         DFILE_LOGE(TAG, "connType for sender is illagal");
2018         return NSTACKX_EINVAL;
2019     }
2020     if (keyLen > 0) {
2021         if ((keyLen != AES_128_KEY_LENGTH && keyLen != CHACHA20_KEY_LENGTH) || key == NULL) {
2022             DFILE_LOGE(TAG, "error key or key len");
2023             return NSTACKX_EFAILED;
2024         }
2025         if (!IsCryptoIncluded()) {
2026             DFILE_LOGE(TAG, "crypto is not opened");
2027             return NSTACKX_EFAILED;
2028         }
2029     }
2030     msgPara.epollfd = session->epollfd;
2031     msgPara.eventNodeChain = &session->eventNodeChain;
2032     msgPara.msgReceiver = FileManagerMsgHandle;
2033     msgPara.context = session;
2034     session->fileManager = FileManagerCreate(isSender, &msgPara, key, keyLen, connType);
2035     if (session->fileManager == NULL) {
2036         DFILE_LOGE(TAG, "create filemanager failed");
2037         return NSTACKX_EFAILED;
2038     }
2039     return NSTACKX_EOK;
2040 }
2041 
DestroyReceiverPipe(DFileSession * session)2042 void DestroyReceiverPipe(DFileSession *session)
2043 {
2044     if (session->receiverPipe[PIPE_OUT] != INVALID_PIPE_DESC) {
2045         CloseDesc(session->receiverPipe[PIPE_OUT]);
2046         session->receiverPipe[PIPE_OUT] = INVALID_PIPE_DESC;
2047     }
2048     if (session->receiverPipe[PIPE_IN] != INVALID_PIPE_DESC) {
2049         CloseDesc(session->receiverPipe[PIPE_IN]);
2050         session->receiverPipe[PIPE_IN] = INVALID_PIPE_DESC;
2051     }
2052 }
2053 
DFileSetEvent(void * softObj,DFileEventFunc func)2054 void DFileSetEvent(void *softObj, DFileEventFunc func)
2055 {
2056     g_dfileEventFunc = func;
2057     (void)softObj;
2058 }
2059 
NSTACKX_DFileAssembleFunc(void * softObj,const DFileEvent * info)2060 void NSTACKX_DFileAssembleFunc(void *softObj, const DFileEvent *info)
2061 {
2062     if (g_dfileEventFunc != NULL) {
2063         g_dfileEventFunc(softObj, info);
2064     }
2065 }
2066