• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "nstackx_dfile_session.h"
17 
18 #include "nstackx_congestion.h"
19 #include "nstackx_dfile.h"
20 #include "nstackx_dfile_config.h"
21 #include "nstackx_dfile_frame.h"
22 #include "nstackx_dfile_send.h"
23 #include "nstackx_epoll.h"
24 #include "nstackx_error.h"
25 #include "nstackx_event.h"
26 #include "nstackx_dfile_log.h"
27 #ifdef MBEDTLS_INCLUDED
28 #include "nstackx_mbedtls.h"
29 #else
30 #include "nstackx_openssl.h"
31 #endif
32 #include "nstackx_timer.h"
33 #include "nstackx_dfile_control.h"
34 #include "nstackx_dfile_mp.h"
35 #include "securec.h"
36 #include "nstackx_util.h"
37 #include "nstackx_dfile_dfx.h"
38 
39 #define TAG "nStackXDFile"
40 
41 #define DEFAULT_WAIT_TIME_MS               1000
42 #define MULTI_THREADS_SOCKET_WAIT_TIME_MS  1
43 #define DEFAULT_NEGOTIATE_TIMEOUT          1000
44 #define TCP_NEGOTIATE_TIMEOUT              10000
45 #define MAX_SERVER_NEGOTIATE_VALID_TIMEOUT (600 * DEFAULT_NEGOTIATE_TIMEOUT)
46 #define MAX_NEGOTIATE_TIMEOUT_COUNT        3
47 #define MAX_UNPROCESSED_READ_EVENT_COUNT   3
48 #define MAX_RECVBUF_COUNT                  130000
49 #define MAX_NOMEM_PRINT                    10000
50 
51 static void ReadEventHandle(void *arg);
52 static void ProcessSessionTrans(const DFileSession *session, uint16_t exceptTransId);
53 DFileEventFunc g_dfileEventFunc;
54 
CreateQueueNode(const uint8_t * frame,size_t length,const struct sockaddr_in * peerAddr,uint8_t socketIndex)55 static QueueNode *CreateQueueNode(const uint8_t *frame, size_t length,
56     const struct sockaddr_in *peerAddr, uint8_t socketIndex)
57 {
58     QueueNode *queueNode = NULL;
59 
60     if (frame == NULL || length == 0 || length > NSTACKX_MAX_FRAME_SIZE) {
61         return NULL;
62     }
63 
64     queueNode = calloc(1, sizeof(QueueNode));
65     if (queueNode == NULL) {
66         return NULL;
67     }
68 
69     queueNode->frame = calloc(1, length);
70     if (queueNode->frame == NULL) {
71         free(queueNode);
72         return NULL;
73     }
74     queueNode->length = length;
75     queueNode->sendLen = 0;
76 
77     if (memcpy_s(queueNode->frame, length, frame, length) != EOK) {
78         DestroyQueueNode(queueNode);
79         return NULL;
80     }
81     if (peerAddr != NULL) {
82         if (memcpy_s(&queueNode->peerAddr, sizeof(struct sockaddr_in), peerAddr, sizeof(struct sockaddr_in)) != EOK) {
83             DestroyQueueNode(queueNode);
84             return NULL;
85         }
86     }
87     queueNode->socketIndex = socketIndex;
88 
89     return queueNode;
90 }
91 
DestroyQueueNode(QueueNode * queueNode)92 void DestroyQueueNode(QueueNode *queueNode)
93 {
94     if (queueNode != NULL) {
95         free(queueNode->frame);
96         free(queueNode);
97     }
98 }
99 
NotifyMsgRecver(const DFileSession * session,DFileMsgType msgType,const DFileMsg * msg)100 void NotifyMsgRecver(const DFileSession *session, DFileMsgType msgType, const DFileMsg *msg)
101 {
102     if (session == NULL) {
103         DFILE_LOGI(TAG, "session is NULL");
104         return;
105     }
106 
107     if (session->msgReceiver == NULL) {
108         DFILE_LOGI(TAG, "msgReceiver is NULL");
109         return;
110     }
111 
112     session->msgReceiver(session->sessionId, msgType, msg);
113 }
114 
CalculateSessionTransferRatePrepare(DFileSession * session)115 void CalculateSessionTransferRatePrepare(DFileSession *session)
116 {
117 #ifdef NSTACKX_SMALL_FILE_SUPPORT
118     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && (!ListIsEmpty(&session->pendingFileLists)
119         || !ListIsEmpty(&session->smallFileLists))) {
120         return;
121     }
122 #else
123     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && !ListIsEmpty(&session->pendingFileLists)) {
124         return;
125     }
126 #endif
127     if (!ListIsEmpty(&session->dFileTransChain)) {
128         return;
129     }
130     DFILE_LOGI(TAG, "begin to calculate transfer rate");
131     session->bytesTransferred = 0;
132     session->transCount = 0;
133     ClockGetTime(CLOCK_MONOTONIC, &session->startTs);
134 }
135 
136 #ifdef NSTACKX_SMALL_FILE_SUPPORT
SendSmallList(DFileSession * session)137 static int32_t SendSmallList(DFileSession *session)
138 {
139     FileListInfo *fileListInfo = NULL;
140     DFileMsg data;
141     if (session->fileListProcessingCnt > 0 || session->fileListPendingCnt == 0) {
142         while (!ListIsEmpty(&session->smallFileLists)) {
143             fileListInfo = (FileListInfo *)ListPopFront(&session->smallFileLists);
144             session->smallListPendingCnt--;
145             if (fileListInfo == NULL) {
146                 continue;
147             }
148             int32_t ret = DFileStartTrans(session, fileListInfo);
149             if (ret == NSTACKX_EOK) {
150                 return NSTACKX_TRUE;
151             }
152             DFILE_LOGE(TAG, "DFileStartTrans fail, error: %d", ret);
153             (void)memset_s(&data, sizeof(data), 0, sizeof(data));
154             data.errorCode = ret;
155             data.fileList.files = (const char **)fileListInfo->files;
156             data.fileList.fileNum = fileListInfo->fileNum;
157             data.fileList.userData = fileListInfo->userData;
158             NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, &data);
159             DestroyFileListInfo(fileListInfo);
160         }
161     }
162     return NSTACKX_FALSE;
163 }
164 #endif
165 
SendPendingList(DFileSession * session)166 static void SendPendingList(DFileSession *session)
167 {
168     FileListInfo *fileListInfo = NULL;
169     DFileMsg data;
170     while (!ListIsEmpty(&session->pendingFileLists)) {
171         fileListInfo = (FileListInfo *)ListPopFront(&session->pendingFileLists);
172         session->fileListPendingCnt--;
173         if (fileListInfo == NULL) {
174             continue;
175         }
176         int32_t ret = DFileStartTrans(session, fileListInfo);
177         if (ret == NSTACKX_EOK) {
178             break;
179         }
180         DFILE_LOGE(TAG, "DFileStartTrans fail, error: %d", ret);
181         (void)memset_s(&data, sizeof(data), 0, sizeof(data));
182         data.errorCode = ret;
183         data.fileList.files = (const char **)fileListInfo->files;
184         data.fileList.fileNum = fileListInfo->fileNum;
185         data.fileList.userData = fileListInfo->userData;
186         NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, &data);
187         DestroyFileListInfo(fileListInfo);
188     }
189 }
190 
SendSmallAndPendingList(DFileSession * session)191 static void SendSmallAndPendingList(DFileSession *session)
192 {
193 #ifdef NSTACKX_SMALL_FILE_SUPPORT
194     DFILE_LOGI(TAG, "fileListPendingCnt %u fileListProcessingCnt %u smallListPendingCnt %u smallListProcessingCnt %u",
195         session->fileListPendingCnt, session->fileListProcessingCnt, session->smallListPendingCnt,
196         session->smallListProcessingCnt);
197     if (SendSmallList(session) != NSTACKX_TRUE) {
198         SendPendingList(session);
199     }
200 #else
201     DFILE_LOGI(TAG, "fileListPendingCnt %u fileListProcessingCnt %u",
202         session->fileListPendingCnt, session->fileListProcessingCnt);
203     SendPendingList(session);
204 #endif
205 }
206 
NoticeSessionProgress(DFileSession * session)207 void NoticeSessionProgress(DFileSession *session)
208 {
209     DFileMsg data;
210     (void)memset_s(&data, sizeof(data), 0, sizeof(data));
211     if ((FileManagerGetTotalBytes(session->fileManager, &data.transferUpdate.totalBytes) == NSTACKX_EOK) &&
212         (FileManagerGetBytesTransferred(session->fileManager, &data.transferUpdate.bytesTransferred) == NSTACKX_EOK) &&
213         (data.transferUpdate.bytesTransferred <= data.transferUpdate.totalBytes) &&
214         (data.transferUpdate.bytesTransferred > 0)) {
215         NotifyMsgRecver(session, DFILE_ON_SESSION_IN_PROGRESS, &data);
216     }
217 }
218 
UpdateMsgProcessInfo(const DFileSession * session,struct DFileTrans * dFileTrans,DFileTransMsgType msgType,DFileTransMsg * msg)219 static void UpdateMsgProcessInfo(const DFileSession *session, struct DFileTrans *dFileTrans,
220                                  DFileTransMsgType msgType, DFileTransMsg *msg)
221 {
222     uint64_t totalBytes = 0;
223     uint64_t bytesTrans = 0;
224 
225     if (session == NULL || dFileTrans == NULL || msg == NULL ||
226         (msgType != DFILE_TRANS_MSG_FILE_RECEIVED && msgType != DFILE_TRANS_MSG_FILE_RECEIVE_FAIL &&
227         msgType != DFILE_TRANS_MSG_FILE_SEND_FAIL && msgType != DFILE_TRANS_MSG_FILE_SENT)) {
228         return;
229     }
230 
231     msg->transferUpdate.transId = dFileTrans->transId;
232 
233     if (msgType == DFILE_TRANS_MSG_FILE_RECEIVED ||  msgType == DFILE_TRANS_MSG_FILE_SENT) {
234         totalBytes = DFileTransGetTotalBytes(dFileTrans);
235         if (totalBytes > 0) {
236             bytesTrans = totalBytes;
237             goto L_END;
238         }
239     }
240 
241     if (FileManagerGetTransUpdateInfo(session->fileManager, dFileTrans->transId, &totalBytes, &bytesTrans) !=
242         NSTACKX_EOK) {
243         return;
244     }
245 
246 L_END:
247     msg->transferUpdate.totalBytes = totalBytes;
248     msg->transferUpdate.bytesTransferred = bytesTrans;
249     if (msgType == DFILE_TRANS_MSG_FILE_SENT) {
250         if (dFileTrans->fileList->vtransFlag) {
251             msg->fileList.transId = dFileTrans->fileList->vtransRealTransId;
252             msg->transferUpdate.transId = dFileTrans->fileList->vtransRealTransId;
253         }
254         NotifyMsgRecver(session, DFILE_ON_TRANS_IN_PROGRESS, msg);
255     }
256 }
257 
WakeSendThread(DFileSession * session,uint8_t isSender,uint8_t socketIndex)258 static void WakeSendThread(DFileSession *session, uint8_t isSender, uint8_t socketIndex)
259 {
260     SemPost(&session->outboundQueueWait[socketIndex]);
261     if (isSender && session->clientSendThreadNum > 1) {
262         for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
263             SemPost(&session->sendThreadPara[i].sendWait);
264         }
265     }
266 }
267 
CalculateSessionTransferRate(DFileSession * session,uint64_t totalBytes,DFileTransMsgType msgType)268 static void CalculateSessionTransferRate(DFileSession *session, uint64_t totalBytes, DFileTransMsgType msgType)
269 {
270     if (msgType != DFILE_TRANS_MSG_FILE_SENT && msgType != DFILE_TRANS_MSG_END) {
271         return;
272     }
273     if (totalBytes <= UINT64_MAX - session->bytesTransferred) {
274         session->bytesTransferred += totalBytes;
275     } else {
276         session->bytesTransferred = UINT64_MAX;
277     }
278     session->transCount++;
279     if (!ListIsEmpty(&session->dFileTransChain)) {
280         return;
281     }
282 #ifdef NSTACKX_SMALL_FILE_SUPPORT
283     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && (!ListIsEmpty(&session->pendingFileLists)
284         || !ListIsEmpty(&session->smallFileLists))) {
285         return;
286     }
287 #else
288     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && !ListIsEmpty(&session->pendingFileLists)) {
289         return;
290     }
291 #endif
292     struct timespec endTs;
293     ClockGetTime(CLOCK_MONOTONIC, &endTs);
294 
295     uint32_t spendTime = GetTimeDiffMs(&endTs, &session->startTs);
296     if (spendTime == 0) {
297         return;
298     }
299     const double rate = 1.0 * session->bytesTransferred / DFILE_MEGABYTES * MSEC_TICKS_PER_SEC / spendTime;
300     DFILE_LOGI(TAG, "Total %u trans, %llu bytes, used %u ms. rate %.2f MB/s",
301          session->transCount, session->bytesTransferred, spendTime, rate);
302     DFileMsg msgData;
303     (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
304     msgData.rate = (uint32_t)rate;
305     NotifyMsgRecver(session, DFILE_ON_SESSION_TRANSFER_RATE, &msgData);
306 
307     TransferCompleteEvent(rate);
308 }
309 
CheckTransDone(DFileSession * session,struct DFileTrans * dFileTrans,DFileTransMsgType msgType)310 static void CheckTransDone(DFileSession *session, struct DFileTrans *dFileTrans, DFileTransMsgType msgType)
311 {
312     if (msgType == DFILE_TRANS_MSG_FILE_RECEIVE_FAIL || msgType == DFILE_TRANS_MSG_FILE_SENT ||
313         msgType == DFILE_TRANS_MSG_FILE_SEND_FAIL || msgType == DFILE_TRANS_MSG_END) {
314         uint8_t flag = dFileTrans->fileList->smallFlag;
315         if (SetTransIdState(session, dFileTrans->transId, STATE_TRANS_DONE) != NSTACKX_EOK) {
316             DFILE_LOGE(TAG, "set trans id state fail");
317         }
318         if (((PeerInfo *)dFileTrans->context)->currentTransCount > 0) {
319             ((PeerInfo *)dFileTrans->context)->currentTransCount--;
320         }
321         ListRemoveNode(&dFileTrans->list);
322         uint64_t totalBytes = DFileTransGetTotalBytes(dFileTrans);
323         DFileTransDestroy(dFileTrans);
324         if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
325             if (flag == NSTACKX_TRUE && session->smallListProcessingCnt > 0) {
326                 session->smallListProcessingCnt--;
327             } else if (session->fileListProcessingCnt > 0) {
328                 session->fileListProcessingCnt--;
329             }
330             SendSmallAndPendingList(session);
331         }
332         CalculateSessionTransferRate(session, totalBytes, msgType);
333     }
334 }
335 
DTransMsgReceiver(struct DFileTrans * dFileTrans,DFileTransMsgType msgType,DFileTransMsg * msg)336 static void DTransMsgReceiver(struct DFileTrans *dFileTrans, DFileTransMsgType msgType,
337                               DFileTransMsg *msg)
338 {
339     PeerInfo *peerInfo = dFileTrans->context;
340     DFileSession *session = peerInfo->session;
341 
342     UpdateMsgProcessInfo(session, dFileTrans, msgType, msg);
343 
344     switch (msgType) {
345         case DFILE_TRANS_MSG_FILE_SEND_DATA:
346             WakeSendThread(session, dFileTrans->isSender, peerInfo->socketIndex);
347             break;
348         case DFILE_TRANS_MSG_FILE_LIST_RECEIVED:
349             NotifyMsgRecver(session, DFILE_ON_FILE_LIST_RECEIVED, msg);
350             break;
351         case DFILE_TRANS_MSG_FILE_RECEIVED: /* Receiver receive all the file data */
352             NotifyMsgRecver(session, DFILE_ON_FILE_RECEIVE_SUCCESS, msg);
353             break;
354         case DFILE_TRANS_MSG_FILE_RECEIVED_TO_FAIL:
355         case DFILE_TRANS_MSG_FILE_RECEIVE_FAIL:
356             NotifyMsgRecver(session, DFILE_ON_FILE_RECEIVE_FAIL, msg);
357             break;
358         case DFILE_TRANS_MSG_FILE_SENT: /* Sender send TRANSFER DONE ACK frame and come to end */
359             NoticeSessionProgress(session);
360             NotifyMsgRecver(session, DFILE_ON_FILE_SEND_SUCCESS, msg);
361             break;
362         case DFILE_TRANS_MSG_FILE_SEND_FAIL:
363             NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, msg);
364             break;
365         case DFILE_TRANS_MSG_IN_PROGRESS:
366             NotifyMsgRecver(session, DFILE_ON_TRANS_IN_PROGRESS, msg);
367             break;
368         case DFILE_TRANS_MSG_FILE_SEND_ACK:
369             ProcessSessionTrans(session, dFileTrans->transId);
370             break;
371         default:
372             DFILE_LOGI(TAG, "transId %u, recv DFileTrans event %d", dFileTrans->transId, msgType);
373     }
374 
375     CheckTransDone(session, dFileTrans, msgType);
376 }
377 
ServerSettingTimeoutHandle(void * data)378 static void ServerSettingTimeoutHandle(void *data)
379 {
380     PeerInfo *peerInfo = data;
381     ListRemoveNode(&peerInfo->list);
382     TimerDelete(peerInfo->settingTimer);
383     peerInfo->session->peerInfoCnt--;
384     free(peerInfo);
385     DFILE_LOGI(TAG, "DFileServer Setting Negotationion timeout");
386 }
387 
ClientSettingTimeoutHandle(void * data)388 static void ClientSettingTimeoutHandle(void *data)
389 {
390     PeerInfo *peerInfo = data;
391     uint8_t cnt = peerInfo->settingTimeoutCnt++;
392     DFileMsg msgData;
393     uint32_t timeout = CapsTcp(peerInfo->session) ? TCP_NEGOTIATE_TIMEOUT : DEFAULT_NEGOTIATE_TIMEOUT;
394     (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
395     if (cnt >= MAX_NEGOTIATE_TIMEOUT_COUNT) {
396         TimerDelete(peerInfo->settingTimer);
397         peerInfo->settingTimer = NULL;
398         peerInfo->settingTimeoutCnt = 0;
399         msgData.errorCode = NSTACKX_EFAILED;
400         DFILE_LOGI(TAG, "DFileClient Setting Negotationion timeout");
401         NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &msgData);
402     } else {
403         DFileSessionSendSetting(peerInfo);
404         DFILE_LOGI(TAG, "Client Setting Negotationion timeout %u times", peerInfo->settingTimeoutCnt);
405         if (TimerSetTimeout(peerInfo->settingTimer, timeout, NSTACKX_FALSE) != NSTACKX_EOK) {
406             msgData.errorCode = NSTACKX_EFAILED;
407             NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &msgData);
408             DFILE_LOGE(TAG, "Timer setting timer fail");
409         }
410     }
411 }
412 
SearchDFileTransNode(List * dFileTransChain,uint16_t transId)413 static DFileTrans *SearchDFileTransNode(List *dFileTransChain, uint16_t transId)
414 {
415     List *pos = NULL;
416     DFileTrans *trans = NULL;
417 
418     if (dFileTransChain == NULL || transId == 0) {
419         return NULL;
420     }
421 
422     LIST_FOR_EACH(pos, dFileTransChain) {
423         trans = (DFileTrans *)pos;
424         if (trans->transId == transId) {
425             return trans;
426         }
427     }
428     return NULL;
429 }
430 
SearchPeerInfoNode(const DFileSession * session,const struct sockaddr_in * peerAddr)431 static PeerInfo *SearchPeerInfoNode(const DFileSession *session, const struct sockaddr_in *peerAddr)
432 {
433     List *pos = NULL;
434     PeerInfo *peerInfo = NULL;
435     LIST_FOR_EACH(pos, &session->peerInfoChain) {
436         peerInfo = (PeerInfo *)pos;
437         if (memcmp(&peerInfo->dstAddr, peerAddr, sizeof(struct sockaddr_in)) == 0 &&
438             peerInfo->session->sessionId == session->sessionId) {
439             return peerInfo;
440         }
441     }
442     return NULL;
443 }
444 
DFileSessionSendSetting(PeerInfo * peerInfo)445 void DFileSessionSendSetting(PeerInfo *peerInfo)
446 {
447     uint32_t timeout = CapsTcp(peerInfo->session) ? TCP_NEGOTIATE_TIMEOUT : DEFAULT_NEGOTIATE_TIMEOUT;
448     uint8_t buf[NSTACKX_DEFAULT_FRAME_SIZE];
449     size_t frameLen = 0;
450     DFileMsg data;
451     (void)memset_s(&data, sizeof(data), 0, sizeof(data));
452     SettingFrame settingFramePara;
453     settingFramePara.connType = peerInfo->connType;
454     settingFramePara.mtu = peerInfo->localMtu;
455     settingFramePara.capability = peerInfo->session->capability & NSTACKX_CAPS_LINK_SEQUENCE;
456     settingFramePara.dataFrameSize = 0;
457     settingFramePara.capsCheck = NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
458     if (peerInfo->session->fileManager->keyLen) {
459         DFileGetCipherCaps(peerInfo->session, &settingFramePara);
460     }
461     EncodeSettingFrame(buf, NSTACKX_DEFAULT_FRAME_SIZE, &frameLen, &settingFramePara);
462 
463     if (peerInfo->settingTimer != NULL) {
464         int32_t ret = DFileWriteHandle(buf, frameLen, peerInfo);
465         if (ret != (int32_t)frameLen && ret != NSTACKX_EAGAIN) {
466             data.errorCode = NSTACKX_EFAILED;
467             NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
468         }
469         return;
470     }
471 
472     if (peerInfo->session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
473         peerInfo->settingTimer = TimerStart(peerInfo->session->epollfd, timeout,
474                                             NSTACKX_FALSE, ClientSettingTimeoutHandle, peerInfo);
475         if (peerInfo->settingTimer == NULL) {
476             DFILE_LOGE(TAG, "setting timmer creat fail");
477             data.errorCode = NSTACKX_EFAILED;
478             NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
479             return;
480         }
481     } else {
482         peerInfo->settingTimer = TimerStart(peerInfo->session->epollfd, MAX_SERVER_NEGOTIATE_VALID_TIMEOUT,
483                                             NSTACKX_FALSE, ServerSettingTimeoutHandle, peerInfo);
484         if (peerInfo->settingTimer == NULL) {
485             return;
486         }
487     }
488 
489     int32_t ret = DFileWriteHandle(buf, frameLen, peerInfo);
490     if (ret != (int32_t)frameLen && ret != NSTACKX_EAGAIN) {
491         data.errorCode = NSTACKX_EFAILED;
492         NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
493     }
494 }
495 
SetDFileSessionConfig(DFileSession * session,DFileConfig * dFileConfig,uint16_t connType,PeerInfo * peerInfo)496 static void SetDFileSessionConfig(DFileSession *session, DFileConfig *dFileConfig, uint16_t connType,
497     PeerInfo *peerInfo)
498 {
499     peerInfo->maxSendRate = dFileConfig->sendRate;
500     (void)memset_s(peerInfo->integralLossRate, INTEGRAL_TIME * sizeof(double), 0, INTEGRAL_TIME * sizeof(double));
501     peerInfo->fastStartCounter = 0;
502     ClockGetTime(CLOCK_MONOTONIC, &peerInfo->measureBefore);
503     peerInfo->dataFrameSize = dFileConfig->dataFrameSize;
504     if (connType == CONNECT_TYPE_WLAN) {
505         peerInfo->sendRate = dFileConfig->sendRate / NSTACKX_WLAN_INIT_SPEED_DIVISOR;
506     } else {
507         peerInfo->sendRate = dFileConfig->sendRate / NSTACKX_P2P_INIT_SPEED_DIVISOR;
508     }
509 
510 #ifndef NSTACKX_WITH_LITEOS
511     if ((!peerInfo->gotWifiRate) && (connType == CONNECT_TYPE_WLAN)) {
512         peerInfo->sendRate = (uint16_t)(NSTACKX_WLAN_INIT_RATE / MSEC_TICKS_PER_SEC * DATA_FRAME_SEND_INTERVAL_MS
513             / dFileConfig->dataFrameSize + NSTACKX_WLAN_COMPENSATION_RATE);
514     }
515 #endif
516     if (peerInfo->sendRate < NSTACKX_MIN_SENDRATE) {
517         peerInfo->sendRate = NSTACKX_MIN_SENDRATE;
518     }
519 
520     if (FileManagerSetMaxFrameLength(session->fileManager, peerInfo->dataFrameSize) != NSTACKX_EOK) {
521         DFILE_LOGE(TAG, "failed to set max frame length");
522     }
523 
524     DFILE_LOGI(TAG, "connType is %u set sendrate is %u maxSendRate is %u peerInfo->dataFrameSize is %u",
525          connType, peerInfo->sendRate, peerInfo->maxSendRate, peerInfo->dataFrameSize);
526     if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
527         if (FileManagerSetRecvParaWithConnType(session->fileManager, connType) != NSTACKX_EOK) {
528             DFILE_LOGE(TAG, "failed to set recv para");
529         }
530     }
531 }
532 
DFileSessionSetPeerInfo(PeerInfo * peerInfo,SettingState state,SettingFrame * settingFrame)533 static void DFileSessionSetPeerInfo(PeerInfo *peerInfo, SettingState state, SettingFrame *settingFrame)
534 {
535     peerInfo->state = state;
536     peerInfo->mtu = settingFrame->mtu;
537     peerInfo->connType = settingFrame->connType;
538     uint16_t localMtu = peerInfo->localMtu;
539     peerInfo->mtuInuse = (localMtu < peerInfo->mtu) ? localMtu : peerInfo->mtu;
540     peerInfo->remoteSessionId = settingFrame->header.sessionId;
541 }
542 
DFileSessionHandleClientSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)543 static void DFileSessionHandleClientSetting(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
544 {
545     List *pos = NULL;
546     SettingFrame hostSettingFrame;
547     (void)memset_s(&hostSettingFrame, sizeof(hostSettingFrame), 0, sizeof(hostSettingFrame));
548     DFILE_LOGI(TAG, "handle Setting Frame, DFileSessionType %u", session->sessionType);
549     /* unsupport version */
550     if (DecodeSettingFrame((SettingFrame *)dFileFrame, &hostSettingFrame) != NSTACKX_EOK || hostSettingFrame.mtu == 0) {
551         return;
552     }
553     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
554     if (peerInfo == NULL) {
555         DFILE_LOGE(TAG, "recv unknown peer setting, maybe be attacked");
556         return;
557     }
558     peerInfo->remoteDFileVersion = hostSettingFrame.dFileVersion;
559     TimerDelete(peerInfo->settingTimer);
560     peerInfo->settingTimer = NULL;
561     DFileSessionSetPeerInfo(peerInfo, SETTING_NEGOTIATED, &hostSettingFrame);
562     LIST_FOR_EACH(pos, &session->dFileTransChain) {
563         DFileTrans *trans = (DFileTrans *)pos;
564         if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
565             DFILE_LOGE(TAG, "set trans mtu failed");
566         }
567     }
568     DFileMsg data;
569     (void)memset_s(&data, sizeof(data), 0, sizeof(data));
570     data.errorCode = NSTACKX_EOK;
571     NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_SUCCESS, &data);
572 
573     DFileConfig dFileConfig;
574     (void)memset_s(&dFileConfig, sizeof(dFileConfig), 0, sizeof(dFileConfig));
575     if (GetDFileConfig(&dFileConfig, peerInfo->mtuInuse, hostSettingFrame.connType) == NSTACKX_EOK) {
576         SetDFileSessionConfig(session, &dFileConfig, hostSettingFrame.connType, peerInfo);
577     }
578     if (hostSettingFrame.capability & NSTACKX_CAPS_LINK_SEQUENCE) {
579         DFILE_LOGI(TAG, "server replies not support Link Sequence");
580     } else {
581         DFILE_LOGI(TAG, "server replies using normal ACK");
582         session->capability &= ~NSTACKX_CAPS_LINK_SEQUENCE;
583     }
584     DFileChooseCipherType(&hostSettingFrame, session);
585 }
586 
DFileGetMTU(SocketProtocol protocol)587 static uint16_t DFileGetMTU(SocketProtocol protocol)
588 {
589     /* for udp, return NSTACKX_DEFAULT_FRAME_SIZE, for D2D, need call D2D MTU interface */
590     uint16_t mtu = 0;
591     if (protocol == NSTACKX_PROTOCOL_UDP) {
592         mtu = NSTACKX_DEFAULT_FRAME_SIZE;
593     } else if (protocol == NSTACKX_PROTOCOL_D2D) {
594         DFILE_LOGE(TAG, "d2d not support");
595     } else if (protocol == NSTACKX_PROTOCOL_TCP) {
596         mtu = NSTACKX_DEFAULT_FRAME_SIZE;
597     }
598 
599     return mtu;
600 }
601 
CreatePeerInfo(DFileSession * session,const struct sockaddr_in * dstAddr,uint16_t peerMtu,uint16_t connType,uint8_t socketIndex)602 PeerInfo *CreatePeerInfo(DFileSession *session, const struct sockaddr_in *dstAddr, uint16_t peerMtu,
603     uint16_t connType, uint8_t socketIndex)
604 {
605     if (session->peerInfoCnt >= MAX_PEERINFO_SIZE) {
606         return NULL;
607     }
608     PeerInfo *peerInfo = calloc(1, sizeof(PeerInfo));
609     if (peerInfo == NULL) {
610         return NULL;
611     }
612     peerInfo->session = session;
613     peerInfo->dstAddr = *dstAddr;
614     peerInfo->connType = connType;
615     peerInfo->socketIndex = socketIndex;
616     peerInfo->localMtu = DFileGetMTU(session->protocol);
617     session->peerInfoCnt++;
618     peerInfo->gotWifiRate = 0;
619     peerInfo->ackInterval = NSTACKX_INIT_ACK_INTERVAL;
620     peerInfo->rateStateInterval = NSTACKX_INIT_RATE_STAT_INTERVAL;
621 
622     if (GetInterfaceNameByIP(session->socket[socketIndex]->srcAddr.sin_addr.s_addr,
623         peerInfo->localInterfaceName, sizeof(peerInfo->localInterfaceName)) != NSTACKX_EOK) {
624         DFILE_LOGE(TAG, "GetInterfaceNameByIP failed %d", errno);
625     }
626 
627     if (peerMtu == 0) {
628         return peerInfo;
629     }
630     peerInfo->mtu = peerMtu;
631     peerInfo->mtuInuse = (peerInfo->localMtu < peerInfo->mtu) ? peerInfo->localMtu : peerInfo->mtu;
632     return peerInfo;
633 }
634 
HandleLinkSeqCap(DFileSession * session,SettingFrame * hostSettingFrame)635 static void HandleLinkSeqCap(DFileSession *session, SettingFrame *hostSettingFrame)
636 {
637     if (hostSettingFrame->capability & NSTACKX_CAPS_LINK_SEQUENCE) {
638         DFILE_LOGI(TAG, "client wants to enable Link Sequence");
639     } else {
640         DFILE_LOGI(TAG, "client wants to use normal ACK");
641         session->capability &= ~NSTACKX_CAPS_LINK_SEQUENCE;
642     }
643 
644     if (hostSettingFrame->capability & NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK) {
645         DFILE_LOGI(TAG, "client support recv feedback");
646         session->capsCheck |= NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
647     } else {
648         DFILE_LOGI(TAG, "client do not support recv feedback");
649         session->capsCheck &= ~NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
650     }
651 }
652 
AllocPeerInfo(DFileSession * session,const struct sockaddr_in * peerAddr,const SettingFrame * frame,uint8_t socketIndex)653 static PeerInfo *AllocPeerInfo(DFileSession *session, const struct sockaddr_in *peerAddr, const SettingFrame *frame,
654     uint8_t socketIndex)
655 {
656     PeerInfo *peerInfo = NULL;
657 
658     peerInfo = CreatePeerInfo(session, peerAddr, frame->mtu, frame->connType, socketIndex);
659     if (peerInfo == NULL) {
660         return NULL;
661     }
662 
663     peerInfo->remoteSessionId = frame->header.sessionId;
664     peerInfo->state = SETTING_NEGOTIATING;
665     DFileSessionSendSetting(peerInfo);
666     if (peerInfo->settingTimer == NULL) {
667         free(peerInfo);
668         session->peerInfoCnt--;
669         return NULL;
670     }
671     ListInsertTail(&peerInfo->session->peerInfoChain, &peerInfo->list);
672     return peerInfo;
673 }
674 
DFileSessionHandleServerSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr,uint8_t socketIndex)675 static void DFileSessionHandleServerSetting(DFileSession *session, DFileFrame *dFileFrame,
676     struct sockaddr_in *peerAddr, uint8_t socketIndex)
677 {
678     SettingFrame hostSettingFrame;
679     DFileConfig dFileConfig;
680     (void)memset_s(&hostSettingFrame, sizeof(hostSettingFrame), 0, sizeof(hostSettingFrame));
681     (void)memset_s(&dFileConfig, sizeof(dFileConfig), 0, sizeof(dFileConfig));
682     DFILE_LOGI(TAG, "handle Setting Frame, DFileSessionType %u", session->sessionType);
683     if (DecodeSettingFrame((SettingFrame *)dFileFrame, &hostSettingFrame) != NSTACKX_EOK || hostSettingFrame.mtu == 0) {
684         return;
685     }
686 
687     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
688     if (peerInfo != NULL) {
689         if (peerInfo->settingTimeoutCnt >= MAX_NEGOTIATE_TIMEOUT_COUNT) {
690             DFILE_LOGE(TAG, "receive more than %d Setting for one peer, drop", MAX_NEGOTIATE_TIMEOUT_COUNT);
691             return;
692         } else {
693             DFileSessionSetPeerInfo(peerInfo, SETTING_NEGOTIATING, &hostSettingFrame);
694             DFileSessionSendSetting(peerInfo);
695             goto L_END;
696         }
697     }
698 
699     peerInfo = AllocPeerInfo(session, peerAddr, &hostSettingFrame, socketIndex);
700     if (peerInfo == NULL) {
701         return;
702     }
703 
704 L_END:
705     peerInfo->settingTimeoutCnt++;
706     DFILE_LOGI(TAG, "DFileServer response Setting Frame. count %u", peerInfo->settingTimeoutCnt);
707     peerInfo->remoteDFileVersion = hostSettingFrame.dFileVersion;
708     if (GetDFileConfig(&dFileConfig, peerInfo->mtuInuse, hostSettingFrame.connType) == NSTACKX_EOK) {
709         SetDFileSessionConfig(session, &dFileConfig, hostSettingFrame.connType, peerInfo);
710     }
711     HandleLinkSeqCap(session, &hostSettingFrame);
712     DFileChooseCipherType(&hostSettingFrame, session);
713 }
714 
DFileSessionHandleSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr,uint8_t socketIndex)715 static void DFileSessionHandleSetting(DFileSession *session, DFileFrame *dFileFrame,
716     struct sockaddr_in *peerAddr, uint8_t socketIndex)
717 {
718     if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
719         DFileSessionHandleServerSetting(session, dFileFrame, peerAddr, socketIndex);
720     } else if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
721         DFileSessionHandleClientSetting(session, dFileFrame, peerAddr);
722     } else {
723         return;
724     }
725 }
726 
HandleWithoutSettingError(DFileSession * session,const struct sockaddr_in * peerAddr)727 static void HandleWithoutSettingError(DFileSession *session, const struct sockaddr_in *peerAddr)
728 {
729     List *pos = NULL;
730     DFileTrans *trans = NULL;
731 
732     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
733     if (peerInfo == NULL) {
734         DFILE_LOGE(TAG, "recv unknown peer rst, maybe be attacked");
735         return;
736     }
737 
738     LIST_FOR_EACH(pos, &session->dFileTransChain) {
739         trans = (DFileTrans *)pos;
740         /*
741              * when client recv peer RST NSTACKX_DFILE_WITHOUT_SETTING_ERROR, Set Trans MTU 0,
742              * and will reset after new setting negotion.
743              */
744         if (DFileTransSetMtu(trans, 0) != NSTACKX_EOK) {
745             DFILE_LOGE(TAG, "DFileTransSetMtu(trans, 0) failed %d", errno);
746         }
747     }
748 
749     LIST_FOR_EACH(pos, &session->peerInfoChain) {
750         peerInfo = (PeerInfo *)pos;
751         if (memcmp(&peerInfo->dstAddr, peerAddr, sizeof(struct sockaddr_in)) == 0 &&
752             peerInfo->session->sessionId == session->sessionId) {
753             TimerDelete(peerInfo->settingTimer);
754             peerInfo->settingTimer = NULL;
755             peerInfo->state = SETTING_NEGOTIATING;
756             DFILE_LOGD(TAG, "Send Setting Frame");
757             DFileSessionSendSetting(peerInfo);
758             break;
759         }
760     }
761 }
762 
DFileSessionHandleRst(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)763 static void DFileSessionHandleRst(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
764 {
765     uint16_t errCode = 0;
766     if (DecodeRstFrame((RstFrame *)dFileFrame, &errCode, NULL, NULL) != NSTACKX_EOK) {
767         return;
768     }
769 
770     uint16_t transId = ntohs(dFileFrame->header.transId);
771     DFILE_LOGD(TAG, "handle RST (%hu) frame, transId %hu", errCode, transId);
772 
773     switch (errCode) {
774         case NSTACKX_DFILE_WITHOUT_SETTING_ERROR:
775             HandleWithoutSettingError(session, peerAddr);
776             break;
777         default:
778             DFILE_LOGE(TAG, "Unspported error code %hu", errCode);
779             break;
780     }
781 }
782 
DFileSessionResolveBackPress(DFileSession * session,DataBackPressure backPress,uint32_t count)783 static void DFileSessionResolveBackPress(DFileSession *session, DataBackPressure backPress, uint32_t count)
784 {
785     uint32_t index;
786 
787     if (PthreadMutexLock(&session->backPressLock) != 0) {
788         DFILE_LOGE(TAG, "pthread backPressLock mutex lock failed");
789         return;
790     }
791 
792     if (backPress.recvListOverIo == 1) {
793         for (index = 0; index < count; index++) {
794             session->stopSendCnt[index]++;
795         }
796     } else {
797         for (index = 0; index < count; index++) {
798             session->stopSendCnt[index] = 0;
799         }
800     }
801 
802     if (PthreadMutexUnlock(&session->backPressLock) != 0) {
803         DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
804         return;
805     }
806 
807     return;
808 }
809 
DFileSessionHandleBackPressure(DFileSession * session,const DFileFrame * dFileFrame,const struct sockaddr_in * peerAddr)810 static void DFileSessionHandleBackPressure(DFileSession *session, const DFileFrame *dFileFrame,
811     const struct sockaddr_in *peerAddr)
812 {
813     DataBackPressure backPress;
814     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
815     if (peerInfo == NULL) {
816         DFILE_LOGE(TAG, "can't get valid peerinfo");
817         return;
818     }
819 
820     if (DecodeBackPressFrame((BackPressureFrame *)dFileFrame, &backPress) != NSTACKX_EOK) {
821         return;
822     }
823 
824     DFileSessionResolveBackPress(session, backPress, session->clientSendThreadNum);
825 
826     DFILE_LOGI(TAG, "handle back pressure recvListOverIo %u recvBufThreshold %u stopSendPeriod %u",
827          backPress.recvListOverIo, backPress.recvBufThreshold,
828          backPress.stopSendPeriod);
829 
830     return;
831 }
832 
CreateTrans(uint16_t transId,DFileSession * session,PeerInfo * peerInfo,uint8_t isSender)833 static DFileTrans *CreateTrans(uint16_t transId, DFileSession *session, PeerInfo *peerInfo, uint8_t isSender)
834 {
835     DFileTransPara transPara;
836 
837     if (peerInfo == NULL) {
838         return NULL;
839     }
840     (void)memset_s(&transPara, sizeof(transPara), 0, sizeof(transPara));
841     transPara.isSender = isSender;
842     transPara.transId = transId; /* for receiver, use transId of sender */
843     transPara.fileManager = session->fileManager;
844     transPara.writeHandle = DFileWriteHandle;
845     transPara.msgReceiver = DTransMsgReceiver;
846     transPara.connType = peerInfo->connType;
847     transPara.context = peerInfo;
848     transPara.session = session;
849     transPara.onRenameFile = session->onRenameFile;
850 
851     ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
852     return DFileTransCreate(&transPara);
853 }
854 
DFileSessionHandleFrame(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)855 static int32_t DFileSessionHandleFrame(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
856 {
857     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
858     if (peerInfo == NULL) {
859         DFILE_LOGI(TAG, "can't find peerInfo");
860         return NSTACKX_EFAILED;
861     }
862 
863     if (peerInfo->session->sessionType == DFILE_SESSION_TYPE_SERVER && peerInfo->state == SETTING_NEGOTIATING) {
864         peerInfo->state = SETTING_NEGOTIATED;
865         TimerDelete(peerInfo->settingTimer);
866         peerInfo->settingTimer = NULL;
867     }
868 
869     uint16_t transId = ntohs(dFileFrame->header.transId);
870     if (transId == 0) {
871         DFILE_LOGE(TAG, "transId is 0");
872         return NSTACKX_EFAILED;
873     }
874 
875     DFileTrans *trans = SearchDFileTransNode(&(session->dFileTransChain), transId);
876     if (trans == NULL) {
877         if (dFileFrame->header.type != NSTACKX_DFILE_FILE_HEADER_FRAME) {
878             /* Only HEADER frame can start dfile transfer (receiver) */
879             DFILE_LOGE(TAG, "trans %u is NULL && type is %u", transId, dFileFrame->header.type);
880             return NSTACKX_EFAILED;
881         }
882         if (IsTransIdDone(session, transId) == NSTACKX_EOK) {
883             return NSTACKX_EFAILED;
884         }
885         trans = CreateTrans(transId, session, peerInfo, NSTACKX_FALSE);
886         if (trans == NULL) {
887             DFileMsg data;
888             (void)memset_s(&data, sizeof(data), 0, sizeof(data));
889             data.errorCode = NSTACKX_ENOMEM;
890             NotifyMsgRecver(session, DFILE_ON_FATAL_ERROR, &data);
891             DFILE_LOGE(TAG, "trans is NULL");
892             return NSTACKX_EFAILED;
893         }
894         if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
895             DFILE_LOGE(TAG, "set trans mtu failed");
896         }
897         CalculateSessionTransferRatePrepare(session);
898         ListInsertTail(&(session->dFileTransChain), &(trans->list));
899         peerInfo->currentTransCount++;
900     }
901 
902     return HandleDFileFrame(trans, dFileFrame);
903 }
904 
DFileWriteHandle(const uint8_t * frame,size_t len,void * context)905 int32_t DFileWriteHandle(const uint8_t *frame, size_t len, void *context)
906 {
907     PeerInfo *peerInfo = context;
908     DFileSession *session = peerInfo->session;
909     QueueNode *queueNode = NULL;
910     struct sockaddr_in peerAddr;
911 
912     peerAddr = peerInfo->dstAddr;
913     queueNode = CreateQueueNode(frame, len, &peerAddr, peerInfo->socketIndex);
914     if (queueNode == NULL) {
915         return NSTACKX_ENOMEM;
916     }
917 
918     if (PthreadMutexLock(&session->outboundQueueLock) != 0) {
919         DFILE_LOGE(TAG, "pthread mutex lock failed");
920         DestroyQueueNode(queueNode);
921         return NSTACKX_EFAILED;
922     }
923     ListInsertTail(&session->outboundQueue, &queueNode->list);
924     session->outboundQueueSize++;
925     if (PthreadMutexUnlock(&session->outboundQueueLock) != 0) {
926         /* Don't need to free node, as it's mount to outboundQueue */
927         DFILE_LOGE(TAG, "pthread mutex unlock failed");
928         return NSTACKX_EFAILED;
929     }
930     SemPost(&session->outboundQueueWait[peerInfo->socketIndex]);
931     return (int32_t)len;
932 }
933 
BindMainLoopToTargetCpu(void)934 static void BindMainLoopToTargetCpu(void)
935 {
936     int32_t cpu;
937     int32_t cpus = GetCpuNum();
938     if (cpus >= FIRST_CPU_NUM_LEVEL) {
939         return;
940     } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
941         cpu = CPU_IDX_0;
942     } else {
943         return;
944     }
945     StartThreadBindCore(cpu);
946 }
947 
GetEpollWaitTimeOut(DFileSession * session)948 static int64_t GetEpollWaitTimeOut(DFileSession *session)
949 {
950     int64_t minTimeout = DEFAULT_WAIT_TIME_MS;
951     if (session->mainLoopActiveReadFlag && session->inboundQueueSize) {
952         minTimeout = 0;
953         return minTimeout;
954     }
955     List *pos = NULL;
956     DFileTrans *trans = NULL;
957     int64_t timeout;
958     LIST_FOR_EACH(pos, &session->dFileTransChain) {
959         trans = (DFileTrans *)pos;
960         timeout = DFileTransGetTimeout(trans);
961         if (timeout >= 0 && timeout < minTimeout) {
962             minTimeout = timeout;
963         }
964     }
965     if (minTimeout > DEFAULT_WAIT_TIME_MS) {
966         minTimeout = DEFAULT_WAIT_TIME_MS;
967     }
968     return minTimeout;
969 }
970 
ProcessSessionTrans(const DFileSession * session,uint16_t exceptTransId)971 static void ProcessSessionTrans(const DFileSession *session, uint16_t exceptTransId)
972 {
973     List *tmp = NULL;
974     List *pos = NULL;
975     LIST_FOR_EACH_SAFE(pos, tmp, &session->dFileTransChain) {
976         DFileTrans *trans = (DFileTrans *)pos;
977         if (trans->transId != exceptTransId) {
978             DFileTransProcess(trans);
979         }
980     }
981 }
982 
NotifyBindPort(const DFileSession * session)983 static void NotifyBindPort(const DFileSession *session)
984 {
985     DFileMsg data;
986     int32_t socketNum;
987     (void)memset_s(&data, sizeof(data), 0, sizeof(data));
988     socketNum = 1;
989 
990     for (int i = 0; i < socketNum; i++) {
991         data.sockAddr[i].sin_port = ntohs(session->socket[i]->srcAddr.sin_port);
992         data.sockAddr[i].sin_addr.s_addr = ntohl(session->socket[i]->srcAddr.sin_addr.s_addr);
993     }
994     NotifyMsgRecver(session, DFILE_ON_BIND, &data);
995 }
996 
DFileMainLoop(void * arg)997 void *DFileMainLoop(void *arg)
998 {
999     DFileSession *session = arg;
1000     int32_t ret = NSTACKX_EOK;
1001     DFileMsg msgData;
1002     (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
1003     uint8_t isBind = NSTACKX_FALSE;
1004     DFILE_LOGI(TAG, "main thread start");
1005     SetThreadName(DFFILE_MAIN_THREAD_NAME);
1006     SetMaximumPriorityForThread();
1007     SetTidToBindInfo(session, POS_MAIN_THERD_START);
1008     NotifyBindPort(session);
1009     while (!session->closeFlag) {
1010         int64_t minTimeout = GetEpollWaitTimeOut(session);
1011         ret = EpollLoop(session->epollfd, (int32_t)minTimeout);
1012         if (ret == NSTACKX_EFAILED) {
1013             DFILE_LOGE(TAG, "epoll wait failed");
1014             break;
1015         }
1016         if (isBind == NSTACKX_FALSE && session->transFlag == NSTACKX_TRUE) {
1017             BindMainLoopToTargetCpu();
1018             isBind = NSTACKX_TRUE;
1019         }
1020         ProcessSessionTrans(session, 0);
1021         if (session->mainLoopActiveReadFlag && session->inboundQueueSize) {
1022             session->partReadFlag = NSTACKX_TRUE;
1023             ReadEventHandle(session);
1024             session->partReadFlag = NSTACKX_FALSE;
1025         }
1026     }
1027 
1028     if (ret == NSTACKX_EFAILED || DFileSessionCheckFatalFlag(session)) {
1029         msgData.errorCode = NSTACKX_EFAILED;
1030         NotifyMsgRecver(session, DFILE_ON_FATAL_ERROR, &msgData);
1031     }
1032 
1033     /* Notify sender thread to terminate */
1034     PostOutboundQueueWait(session);
1035 
1036     /* Unblock "select" and notify receiver thread to terminate */
1037     NotifyPipeEvent(session);
1038     return NULL;
1039 }
1040 
AmendPeerInfoSendRate(PeerInfo * peerInfo)1041 static void AmendPeerInfoSendRate(PeerInfo *peerInfo)
1042 {
1043     peerInfo->amendSendRate = peerInfo->sendRate;
1044     DFILE_LOGI(TAG, "current: sendrate %u, realsendframerate %u---new amendSendRate %d",
1045          peerInfo->sendRate, peerInfo->sendFrameRate, peerInfo->amendSendRate);
1046 }
1047 
DFileSendCalculateRate(DFileSession * session,PeerInfo * peerInfo)1048 static void DFileSendCalculateRate(DFileSession *session, PeerInfo *peerInfo)
1049 {
1050     uint64_t sendCount;
1051 
1052     if (session->sessionType != DFILE_SESSION_TYPE_CLIENT) {
1053         return;
1054     }
1055 
1056     struct timespec nowTime;
1057     ClockGetTime(CLOCK_MONOTONIC, &nowTime);
1058     uint64_t measureElapse = GetTimeDiffUs(&nowTime, &peerInfo->startTime);
1059     /* just calculate io rate */
1060     if (measureElapse > peerInfo->rateStateInterval) {
1061         /* ONLY PEER 0 calculate io rate */
1062         sendCount = (uint64_t)peerInfo->sendCount;
1063         if (peerInfo->socketIndex == 0) {
1064             session->fileManager->iorRate = (uint32_t)(session->fileManager->iorBytes *
1065                 NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1066             DFILE_LOGI(TAG, "IO read rate: %u MB/s send list full times %u", session->fileManager->iorRate,
1067                 session->fileManager->sendListFullTimes);
1068             session->fileManager->sendListFullTimes = 0;
1069             session->fileManager->iorBytes = 0;
1070         }
1071 
1072         peerInfo->sendFrameRate = (uint32_t)(sendCount  * DATA_FRAME_SEND_INTERVAL_US / measureElapse);
1073         peerInfo->sendCountRateMB = (uint32_t)(sendCount * peerInfo->dataFrameSize *
1074             NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1075         if (peerInfo->qdiscSearchNum != 0) {
1076             peerInfo->qdiscAveLeft = peerInfo->qdiscAveLeft / peerInfo->qdiscSearchNum;
1077         }
1078 
1079         DFILE_LOGI(TAG, "framesize %u maxsendrate %u sendRate %u, amendSendRate %d sendCount %llu,"
1080                   "measureElapse %llu sendFrameRate %u %uMB/s,"
1081                   "total send block num %llu eAgainCount %u send list empty times %u sleep times %u, noPendingData %u,"
1082                   "min qdisc %u max qdisc %u search num %u ave qdisc %u"
1083                   "totalRecvBlocks %llu socket:%u "
1084                   "overRun %llu maxRetryCountPerSec %u maxRetryCountLastSec %u wlanCatagory %u",
1085              peerInfo->dataFrameSize, peerInfo->maxSendRate, peerInfo->sendRate, peerInfo->amendSendRate,
1086              peerInfo->sendCount, measureElapse, peerInfo->sendFrameRate, peerInfo->sendCountRateMB,
1087              NSTACKX_ATOM_FETCH(&(session->totalSendBlocks)), peerInfo->eAgainCount,
1088              NSTACKX_ATOM_FETCH(&(session->sendBlockListEmptyTimes)), session->sleepTimes,
1089              NSTACKX_ATOM_FETCH(&(session->noPendingDataTimes)), peerInfo->qdiscMinLeft,
1090              peerInfo->qdiscMaxLeft, peerInfo->qdiscSearchNum, peerInfo->qdiscAveLeft,
1091              NSTACKX_ATOM_FETCH(&(session->totalRecvBlocks)), peerInfo->socketIndex,
1092              peerInfo->overRun, peerInfo->maxRetryCountPerSec, peerInfo->maxRetryCountLastSec, session->wlanCatagory);
1093         AmendPeerInfoSendRate(peerInfo);
1094         ClearSessionStats(session);
1095         ClearPeerinfoStats(peerInfo);
1096         ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1097     }
1098 }
1099 
UpdateAllTransRetryCount(DFileSession * session,PeerInfo * peerInfo)1100 void UpdateAllTransRetryCount(DFileSession *session, PeerInfo *peerInfo)
1101 {
1102     List *pos = NULL;
1103     DFileTrans *trans = NULL;
1104     uint32_t allRetryCount = 0;
1105     LIST_FOR_EACH(pos, &session->dFileTransChain) { /* for client , only one peer */
1106         trans = (DFileTrans *)pos;
1107         allRetryCount += trans->retryCount;
1108     }
1109     peerInfo->allDtransRetryCount = allRetryCount;
1110 }
1111 
DFileRecvCalculateRate(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)1112 static void DFileRecvCalculateRate(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
1113 {
1114     struct timespec nowTime;
1115     uint64_t recvCount;
1116     uint32_t timeOut;
1117 
1118     if (session->sessionType != DFILE_SESSION_TYPE_SERVER ||
1119         dFileFrame->header.type != NSTACKX_DFILE_FILE_DATA_FRAME) {
1120         return;
1121     }
1122 
1123     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
1124     if (peerInfo == NULL) {
1125         return;
1126     }
1127 
1128     timeOut = (peerInfo->connType == CONNECT_TYPE_P2P) ? NSTACKX_P2P_MAX_CONTROL_FRAME_TIMEOUT :
1129         NSTACKX_WLAN_MAX_CONTROL_FRAME_TIMEOUT;
1130 
1131     peerInfo->recvCount++;
1132     ClockGetTime(CLOCK_MONOTONIC, &nowTime);
1133     uint64_t measureElapse = GetTimeDiffUs(&nowTime, &session->measureBefore);
1134     if (measureElapse > peerInfo->rateStateInterval) {
1135         session->fileManager->iowRate = (uint32_t)(session->fileManager->iowBytes *
1136             NSTACKX_MICRO_TICKS / measureElapse / DFILE_KILOBYTES);
1137         session->fileManager->iowCount = session->fileManager->iowBytes / peerInfo->dataFrameSize *
1138             (NSTACKX_MILLI_TICKS * timeOut - peerInfo->rateStateInterval) / measureElapse;
1139         if (session->fileManager->iowRate > session->fileManager->iowMaxRate) {
1140             session->fileManager->iowMaxRate = session->fileManager->iowRate;
1141         }
1142         DFILE_LOGI(TAG, "measureElapse %llu iowBytes %llu iowCount %llu IO write rate : %u KB/s", measureElapse,
1143              session->fileManager->iowBytes, session->fileManager->iowCount, session->fileManager->iowRate);
1144         session->fileManager->iowBytes = 0;
1145         ClockGetTime(CLOCK_MONOTONIC, &session->measureBefore);
1146     }
1147 
1148     measureElapse = GetTimeDiffUs(&nowTime, &peerInfo->startTime);
1149     if (measureElapse > peerInfo->rateStateInterval) {
1150         recvCount = (uint64_t)peerInfo->recvCount;
1151         peerInfo->recvFrameRate = (uint32_t)(recvCount * DATA_FRAME_SEND_INTERVAL_US / measureElapse);
1152         peerInfo->recvCountRateMB = (uint32_t)(recvCount * peerInfo->dataFrameSize *
1153             NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1154         peerInfo->recvCount = 0;
1155         ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1156     }
1157 }
1158 
CheckElapseTime(const struct timespec * before,uint64_t overRun)1159 static uint64_t CheckElapseTime(const struct timespec *before, uint64_t overRun)
1160 {
1161     struct timespec now;
1162     ClockGetTime(CLOCK_MONOTONIC, &now);
1163     uint64_t elapseUs = GetTimeDiffUs(&now, before);
1164     elapseUs += overRun;
1165     if (elapseUs < DATA_FRAME_SEND_INTERVAL_US) {
1166         if (usleep((useconds_t)((uint64_t)DATA_FRAME_SEND_INTERVAL_US - elapseUs)) != NSTACKX_EOK) {
1167             DFILE_LOGE(TAG, "usleep(DATA_FRAME_SEND_INTERVAL_US - elapseUs) failed %d", errno);
1168         }
1169         return 0;
1170     } else {
1171         uint64_t delta = (elapseUs - DATA_FRAME_SEND_INTERVAL_US);
1172         return delta;
1173     }
1174 }
1175 
BindClientSendThreadToTargetCpu(uint32_t idx)1176 static void BindClientSendThreadToTargetCpu(uint32_t idx)
1177 {
1178     int32_t cpu;
1179     int32_t cpus = GetCpuNum();
1180     if (cpus >= FIRST_CPU_NUM_LEVEL) {
1181         return;
1182     } else if (cpus >= SECOND_CPU_NUM_LEVEL) {
1183         cpu = CPU_IDX_1 + (int32_t)idx;
1184     } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
1185         cpu = CPU_IDX_1;
1186     } else {
1187         return;
1188     }
1189     if (cpu > cpus) {
1190         cpu = cpus - 1;
1191     }
1192     StartThreadBindCore(cpu);
1193 }
1194 
DFileSenderUpdateMeasureTime(DFileSession * session,uint8_t socketIndex)1195 static void DFileSenderUpdateMeasureTime(DFileSession *session, uint8_t socketIndex)
1196 {
1197     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
1198         PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
1199         if (peerInfo == NULL) {
1200             return;
1201         }
1202         ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1203         BindClientSendThreadToTargetCpu(0);
1204     }
1205 }
1206 
TerminateMainThreadFatalInner(void * arg)1207 static void TerminateMainThreadFatalInner(void *arg)
1208 {
1209     DFileSession *session = (DFileSession *)arg;
1210     DFileSessionSetFatalFlag(session);
1211 }
1212 
PostFatalEvent(DFileSession * session)1213 static void PostFatalEvent(DFileSession *session)
1214 {
1215     if (PostEvent(&session->eventNodeChain, session->epollfd, TerminateMainThreadFatalInner, session) != NSTACKX_EOK) {
1216         DFILE_LOGE(TAG, "PostEvent TerminateMainThreadFatalInner failed");
1217         DFileSessionSetFatalFlag(session);
1218     }
1219 }
1220 
GetSocketWaitMs(uint32_t threadNum)1221 static uint32_t GetSocketWaitMs(uint32_t threadNum)
1222 {
1223     if (threadNum > 1) {
1224         return MULTI_THREADS_SOCKET_WAIT_TIME_MS;
1225     }
1226     return DEFAULT_WAIT_TIME_MS;
1227 }
1228 
SetSendThreadName(uint32_t threadIdx)1229 static void SetSendThreadName(uint32_t threadIdx)
1230 {
1231     char name[MAX_THREAD_NAME_LEN] = {0};
1232     if (sprintf_s(name, sizeof(name), "%s%u", DFFILE_SEND_THREAD_NAME_PREFIX, threadIdx) < 0) {
1233         DFILE_LOGE(TAG, "sprintf send thead name failed");
1234     }
1235     SetThreadName(name);
1236 }
1237 
1238 typedef struct {
1239     struct DFileSession *session;
1240     uint32_t threadIdx;
1241 }SendThreadCtx;
1242 
DFileAddiSenderHandle(void * arg)1243 static void *DFileAddiSenderHandle(void *arg)
1244 {
1245     SendThreadCtx *sendThreadCtx = (SendThreadCtx *)arg;
1246     struct DFileSession *session = sendThreadCtx->session;
1247     uint32_t threadIdx = sendThreadCtx->threadIdx;
1248     free(sendThreadCtx);
1249     int32_t ret = NSTACKX_EOK;
1250     DFILE_LOGI(TAG, "send thread %u start", threadIdx);
1251     SetSendThreadName(threadIdx);
1252     SetTidToBindInfo(session, threadIdx + POS_SEND_THERD_START);
1253     List unsent;
1254     uint8_t canWrite = NSTACKX_FALSE;
1255     uint32_t socketWaitMs = GetSocketWaitMs(session->clientSendThreadNum);
1256     BindClientSendThreadToTargetCpu(threadIdx + 1);
1257     ListInitHead(&unsent);
1258     while (!session->addiSenderCloseFlag) {
1259         if (ListIsEmpty(&unsent) && !FileManagerHasPendingData(session->fileManager)) {
1260             NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
1261             SemWait(&session->sendThreadPara[threadIdx].sendWait);
1262             if (session->addiSenderCloseFlag) {
1263                 break;
1264             }
1265         }
1266         if (ret == NSTACKX_EAGAIN) {
1267             ret = WaitSocketEvent(NULL, session->socket[0]->sockfd, socketWaitMs, NULL, &canWrite);
1268             if (ret != NSTACKX_EOK || session->closeFlag) {
1269                 break;
1270             }
1271             if (!canWrite) {
1272                 ret = NSTACKX_EAGAIN;
1273                 continue;
1274             }
1275         }
1276         ret = SendDataFrame(session, &unsent, threadIdx, 0);
1277         if ((ret < 0 && ret != NSTACKX_EAGAIN) || session->closeFlag) {
1278             break;
1279         }
1280         if (ret == NSTACKX_EAGAIN) {
1281             continue;
1282         }
1283         SemWait(&session->sendThreadPara[threadIdx].semNewCycle);
1284     }
1285     if (ret < 0 && ret != NSTACKX_EAGAIN) {
1286         PostFatalEvent(session);
1287     }
1288     DestroyIovList(&unsent, session, threadIdx);
1289     return NULL;
1290 }
1291 
CloseAddiSendThread(struct DFileSession * session)1292 static void CloseAddiSendThread(struct DFileSession *session)
1293 {
1294     if (session->sessionType == DFILE_SESSION_TYPE_SERVER || session->clientSendThreadNum <= 1) {
1295         return;
1296     }
1297     session->addiSenderCloseFlag = NSTACKX_TRUE;
1298     for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
1299         SemPost(&session->sendThreadPara[i].sendWait);
1300         SemPost(&session->sendThreadPara[i].semNewCycle);
1301         PthreadJoin(session->sendThreadPara[i].senderTid, NULL);
1302         SemDestroy(&session->sendThreadPara[i].sendWait);
1303         SemDestroy(&session->sendThreadPara[i].semNewCycle);
1304     }
1305 }
1306 
ErrorHandle(struct DFileSession * session,uint16_t cnt)1307 static void ErrorHandle(struct DFileSession *session, uint16_t cnt)
1308 {
1309     while (cnt > 0) {
1310         SemPost(&session->sendThreadPara[cnt - 1].sendWait);
1311         SemPost(&session->sendThreadPara[cnt - 1].semNewCycle);
1312         PthreadJoin(session->sendThreadPara[cnt - 1].senderTid, NULL);
1313         SemDestroy(&session->sendThreadPara[cnt - 1].sendWait);
1314         SemDestroy(&session->sendThreadPara[cnt - 1].semNewCycle);
1315         cnt--;
1316     }
1317 }
1318 
CreateAddiSendThread(struct DFileSession * session)1319 static int32_t CreateAddiSendThread(struct DFileSession *session)
1320 {
1321     uint16_t i;
1322     if (session->sessionType == DFILE_SESSION_TYPE_SERVER || session->clientSendThreadNum <= 1) {
1323         return NSTACKX_EOK;
1324     }
1325     session->addiSenderCloseFlag = NSTACKX_FALSE;
1326     for (i = 0; i < session->clientSendThreadNum - 1; i++) {
1327         SendThreadCtx *sendThreadCtx = (SendThreadCtx *)calloc(1, sizeof(SendThreadCtx));
1328         if (sendThreadCtx == NULL) {
1329             goto L_ERR_SENDER_THREAD;
1330         }
1331 
1332         if (SemInit(&session->sendThreadPara[i].sendWait, 0, 0) != 0) {
1333             free(sendThreadCtx);
1334             goto L_ERR_SENDER_THREAD;
1335         }
1336 
1337         if (SemInit(&session->sendThreadPara[i].semNewCycle, 0, 0) != 0) {
1338             SemDestroy(&session->sendThreadPara[i].sendWait);
1339             free(sendThreadCtx);
1340             goto L_ERR_SENDER_THREAD;
1341         }
1342 
1343         sendThreadCtx->session = session;
1344         sendThreadCtx->threadIdx = i;
1345         if (PthreadCreate(&(session->sendThreadPara[i].senderTid), NULL, DFileAddiSenderHandle, sendThreadCtx)) {
1346             DFILE_LOGE(TAG, "Create sender thread failed");
1347             SemDestroy(&session->sendThreadPara[i].sendWait);
1348             SemDestroy(&session->sendThreadPara[i].semNewCycle);
1349             free(sendThreadCtx);
1350             goto L_ERR_SENDER_THREAD;
1351         }
1352     }
1353     return NSTACKX_EOK;
1354 
1355 L_ERR_SENDER_THREAD:
1356     session->addiSenderCloseFlag = NSTACKX_TRUE;
1357     ErrorHandle(session, i);
1358     return NSTACKX_EFAILED;
1359 }
1360 
UpdatePeerinfoQdiscInfo(PeerInfo * peerInfo,uint32_t qDiscLeft)1361 static void UpdatePeerinfoQdiscInfo(PeerInfo *peerInfo, uint32_t qDiscLeft)
1362 {
1363     if (peerInfo->qdiscMinLeft == 0) {
1364         peerInfo->qdiscMinLeft = (uint16_t)qDiscLeft;
1365     } else if (peerInfo->qdiscMinLeft > qDiscLeft) {
1366         peerInfo->qdiscMinLeft = (uint16_t)qDiscLeft;
1367     }
1368     if (peerInfo->qdiscMaxLeft < qDiscLeft) {
1369         peerInfo->qdiscMaxLeft = (uint16_t)qDiscLeft;
1370     }
1371     peerInfo->qdiscSearchNum++;
1372     peerInfo->qdiscAveLeft += qDiscLeft;
1373 }
1374 
UpdatePeerinfoAmendSendrateByQdisc(PeerInfo * peerInfo)1375 static void UpdatePeerinfoAmendSendrateByQdisc(PeerInfo *peerInfo)
1376 {
1377     uint32_t qDiscLeft;
1378     uint32_t qDiscLeftSendRate;
1379 
1380     if (GetQdiscLen(peerInfo->localInterfaceName, ROOT_QUEUE, &qDiscLeft) == NSTACKX_EOK) {
1381         if (peerInfo->mtuInuse == 0) {
1382             return;
1383         }
1384         uint32_t mtuNumInOneFrame = peerInfo->dataFrameSize / peerInfo->mtuInuse;
1385         if (mtuNumInOneFrame == 0) {
1386             mtuNumInOneFrame = 1;
1387         }
1388         UpdatePeerinfoQdiscInfo(peerInfo, qDiscLeft);
1389         qDiscLeftSendRate = qDiscLeft / mtuNumInOneFrame;
1390         if (peerInfo->sendRate >= qDiscLeftSendRate) {
1391             peerInfo->amendSendRate = (int32_t)qDiscLeftSendRate;
1392         } else {
1393             peerInfo->amendSendRate = peerInfo->sendRate;
1394         }
1395     }
1396     return;
1397 }
1398 
UpdateInfoAfterSend(DFileSession * session,PeerInfo * peerInfo,int32_t curAmendSendRate,struct timespec * before,uint8_t socketIndex)1399 static void UpdateInfoAfterSend(DFileSession *session, PeerInfo *peerInfo, int32_t curAmendSendRate,
1400     struct timespec *before, uint8_t socketIndex)
1401 {
1402     session->cycleRunning[socketIndex] = NSTACKX_FALSE;
1403     if (curAmendSendRate > 0 && peerInfo->intervalSendCount >= (uint32_t)curAmendSendRate) {
1404         if (peerInfo->decreaseStatus == 1) {
1405             peerInfo->overRun = 0;
1406             peerInfo->decreaseStatus = 0;
1407         }
1408         peerInfo->overRun = CheckElapseTime(before, peerInfo->overRun);
1409         if (peerInfo->overRun == 0) {
1410             session->sleepTimes++;
1411         }
1412     }
1413     peerInfo->intervalSendCount = 0;
1414 
1415     UpdatePeerinfoAmendSendrateByQdisc(peerInfo);
1416 }
1417 
DFileSessionSendFrame(DFileSession * session,QueueNode ** preQueueNode,List * unsent,struct timespec * before,uint8_t socketIndex)1418 static int32_t DFileSessionSendFrame(DFileSession *session, QueueNode **preQueueNode, List *unsent,
1419                                      struct timespec *before, uint8_t socketIndex)
1420 {
1421     int32_t ret;
1422 
1423     if (CapsTcp(session) && !ListIsEmpty(unsent)) {
1424         session->sendRemain = 1;
1425         ret = SendDataFrame(session, unsent, (uint32_t)(session->clientSendThreadNum - 1), socketIndex);
1426         session->sendRemain = 0;
1427         if ((ret == NSTACKX_EFAILED || ret == NSTACKX_EAGAIN) || session->closeFlag) {
1428             DFILE_LOGI(TAG, "ret is %d", ret);
1429             return ret;
1430         }
1431     }
1432 
1433     ret = SendOutboundFrame(session, preQueueNode);
1434     if (session->sessionType != DFILE_SESSION_TYPE_CLIENT) {
1435         return ret;
1436     }
1437     PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
1438     if (peerInfo == NULL) {
1439         DFILE_LOGE(TAG, "can't get valid peerinfo");
1440         return NSTACKX_EFAILED;
1441     }
1442     if (peerInfo->mtuInuse == 0) {
1443         return ret;
1444     }
1445 
1446     if (ret == NSTACKX_EOK) {
1447         if (!session->cycleRunning[socketIndex]) {
1448             session->cycleRunning[socketIndex] = NSTACKX_TRUE;
1449         }
1450         ret = SendDataFrame(session, unsent, (uint32_t)(session->clientSendThreadNum - 1), socketIndex);
1451     }
1452     int32_t curAmendSendRate = peerInfo->amendSendRate;
1453     DFileSendCalculateRate(session, peerInfo);
1454 
1455     if ((ret == NSTACKX_EFAILED || ret == NSTACKX_EAGAIN) || session->closeFlag) {
1456         DFILE_LOGI(TAG, "ret is %d and peerInfo->intervalSendCount is %u", ret, peerInfo->intervalSendCount);
1457         return ret;
1458     }
1459 
1460     UpdateInfoAfterSend(session, peerInfo, curAmendSendRate, before, socketIndex);
1461 
1462     for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
1463         SemPost(&session->sendThreadPara[i].semNewCycle);
1464     }
1465     return ret;
1466 }
1467 
WaitNewSendData(const QueueNode * queueNode,const List * unsent,DFileSession * session,uint8_t socketIndex)1468 static void WaitNewSendData(const QueueNode *queueNode, const List *unsent, DFileSession *session, uint8_t socketIndex)
1469 {
1470     if (queueNode == NULL && ListIsEmpty(unsent) && !FileManagerHasPendingData(session->fileManager) &&
1471         !session->outboundQueueSize) {
1472         NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
1473         SemWait(&session->outboundQueueWait[socketIndex]);
1474     }
1475 }
1476 
DFileSenderPre(DFileSession * session,uint8_t socketIndex)1477 static void DFileSenderPre(DFileSession *session, uint8_t socketIndex)
1478 {
1479     SetSendThreadName((uint32_t)(session->clientSendThreadNum + socketIndex - 1));
1480     DFileSenderUpdateMeasureTime(session, socketIndex);
1481     SetMaximumPriorityForThread();
1482     uint32_t pos = (uint32_t)(session->clientSendThreadNum - 1 + socketIndex + POS_SEND_THERD_START);
1483     if (pos >= POS_SEND_THERD_START + NSTACKX_MAX_CLIENT_SEND_THREAD_NUM) {
1484         return;
1485     }
1486 
1487     SetTidToBindInfo(session, pos);
1488 }
1489 
DFileSenderClose(DFileSession * session,QueueNode * queueNode,List * unsent,void * arg)1490 void DFileSenderClose(DFileSession *session, QueueNode *queueNode, List *unsent, void *arg)
1491 {
1492     DFILE_LOGI(TAG, "DFileSendCalculateRate: total send block num %llu.", session->totalSendBlocks);
1493     CloseAddiSendThread(session);
1494     DestroyIovList(unsent, session, session->clientSendThreadNum - 1U);
1495     DestroyQueueNode(queueNode);
1496     free(arg);
1497     return;
1498 }
1499 
DFileSenderHandle(void * arg)1500 void *DFileSenderHandle(void *arg)
1501 {
1502     DFileSession *session = ((SenderThreadPara*)arg)->session;
1503     uint8_t socketIndex = ((SenderThreadPara*)arg)->socketIndex;
1504     QueueNode *queueNode = NULL;
1505     List unsent;
1506     int32_t ret = NSTACKX_EOK;
1507     struct timespec before;
1508     uint8_t canWrite = NSTACKX_FALSE;
1509     uint32_t socketWaitMs = GetSocketWaitMs(session->clientSendThreadNum);
1510     uint8_t isBind = NSTACKX_FALSE;
1511 
1512     if (CreateAddiSendThread(session) != NSTACKX_EOK) {
1513         PostFatalEvent(session);
1514         return NULL;
1515     }
1516     ListInitHead(&unsent);
1517     DFileSenderPre(session, socketIndex);
1518     while (!session->closeFlag) {
1519         WaitNewSendData(queueNode, &unsent, session, socketIndex);
1520         if (session->closeFlag) {
1521             break;
1522         }
1523         if (!session->cycleRunning[socketIndex]) {
1524             ClockGetTime(CLOCK_MONOTONIC, &before);
1525         }
1526         if (ret == NSTACKX_EAGAIN) {
1527             ret = WaitSocketEvent(NULL, session->socket[socketIndex]->sockfd, socketWaitMs, NULL, &canWrite);
1528             if (ret != NSTACKX_EOK || session->closeFlag) {
1529                 break;
1530             }
1531             if (!canWrite) {
1532                 ret = NSTACKX_EAGAIN;
1533                 continue;
1534             }
1535         }
1536         if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && isBind == NSTACKX_FALSE &&
1537             session->transFlag == NSTACKX_TRUE) {
1538             BindClientSendThreadToTargetCpu(0);
1539             isBind = NSTACKX_TRUE;
1540         }
1541         ret = DFileSessionSendFrame(session, &queueNode, &unsent, &before, socketIndex);
1542         if (ret < 0 && ret != NSTACKX_EAGAIN) {
1543             PeerShuttedEvent();
1544             PostFatalEvent(session);
1545             break;
1546         }
1547     }
1548     DFileSenderClose(session, queueNode, &unsent, arg);
1549     return NULL;
1550 }
1551 
ClearDFileFrameList(List * head)1552 static void ClearDFileFrameList(List *head)
1553 {
1554     if (head == NULL || ListIsEmpty(head)) {
1555         return;
1556     }
1557     List *tmp = NULL;
1558     List *pos = NULL;
1559     LIST_FOR_EACH_SAFE(pos, tmp, head) {
1560         QueueNode *node = (QueueNode *)pos;
1561         ListRemoveNode(&node->list);
1562         DestroyQueueNode(node);
1563     }
1564 }
1565 
CheckDfileType(uint8_t type)1566 static inline int32_t CheckDfileType(uint8_t type)
1567 {
1568     if (type >= NSTACKX_DFILE_TYPE_MAX || type < NSTACKX_DFILE_FILE_HEADER_FRAME) {
1569         return NSTACKX_EFAILED;
1570     }
1571     return NSTACKX_EOK;
1572 }
1573 
CheckSessionType(DFileSessionType type)1574 static inline int32_t CheckSessionType(DFileSessionType type)
1575 {
1576     if (type < DFILE_SESSION_TYPE_CLIENT || type > DFILE_SESSION_TYPE_SERVER) {
1577         return NSTACKX_EFAILED;
1578     }
1579     return NSTACKX_EOK;
1580 }
1581 
ProcessDFileFrameList(DFileSession * session,List * head)1582 static void ProcessDFileFrameList(DFileSession *session, List *head)
1583 {
1584     QueueNode *queueNode = NULL;
1585     DFileFrame *dFileFrame = NULL;
1586     struct sockaddr_in *peerAddr = NULL;
1587     int32_t handleFrameRet = NSTACKX_EOK;
1588     while (!ListIsEmpty(head) && !session->closeFlag) {
1589         queueNode = (QueueNode *)ListPopFront(head);
1590         if (queueNode == NULL) {
1591             continue;
1592         }
1593         dFileFrame = (DFileFrame *)(queueNode->frame);
1594         peerAddr = &queueNode->peerAddr;
1595         uint8_t type = dFileFrame->header.type;
1596 
1597         if (ntohs(dFileFrame->header.length) > NSTACKX_MAX_FRAME_SIZE - sizeof(DFileFrameHeader)) {
1598             DFILE_LOGE(TAG, "header length %u is too big", ntohs(dFileFrame->header.length));
1599             DestroyQueueNode(queueNode);
1600             continue;
1601         }
1602         if (CheckDfileType(dFileFrame->header.type) != NSTACKX_EOK ||
1603             CheckSessionType(session->sessionType) != NSTACKX_EOK) {
1604             handleFrameRet = NSTACKX_EFAILED;
1605         } else if (dFileFrame->header.type == NSTACKX_DFILE_SETTING_FRAME) {
1606             DFileSessionHandleSetting(session, dFileFrame, peerAddr, queueNode->socketIndex);
1607         } else if (dFileFrame->header.type == NSTACKX_DFILE_RST_FRAME &&
1608                    dFileFrame->header.transId == 0) {
1609             DFileSessionHandleRst(session, dFileFrame, peerAddr);
1610         } else if (dFileFrame->header.type == NSTACKX_DFILE_FILE_BACK_PRESSURE_FRAME) {
1611             DFileSessionHandleBackPressure(session, dFileFrame, peerAddr);
1612         } else {
1613             /*
1614              * For NSTACKX_DFILE_FILE_DATA_FRAME, "dFileFrame" may be free when handling failed, and become invalid.
1615              */
1616             handleFrameRet = DFileSessionHandleFrame(session, dFileFrame, peerAddr);
1617         }
1618 
1619         if (handleFrameRet != NSTACKX_EOK || type != NSTACKX_DFILE_FILE_DATA_FRAME) {
1620             /* For FILE_DATA frame, the memory is passed to file manager. */
1621             free(queueNode->frame);
1622             queueNode->frame = NULL;
1623         }
1624         free(queueNode);
1625     }
1626     ClearDFileFrameList(head);
1627 }
1628 
ReadEventHandle(void * arg)1629 static void ReadEventHandle(void *arg)
1630 {
1631     DFileSession *session = arg;
1632     List newHead;
1633     ListInitHead(&newHead);
1634     struct timespec before, now;
1635     if (!session->partReadFlag) {
1636         NSTACKX_ATOM_FETCH_DEC(&session->unprocessedReadEventCount);
1637     } else {
1638         ClockGetTime(CLOCK_MONOTONIC, &before);
1639     }
1640     while (session->inboundQueueSize && !session->closeFlag) {
1641         if (session->partReadFlag) {
1642             ClockGetTime(CLOCK_MONOTONIC, &now);
1643             if (GetTimeDiffMs(&now, &before) >= DEFAULT_WAIT_TIME_MS) {
1644                 break;
1645             }
1646         }
1647         if (PthreadMutexLock(&session->inboundQueueLock) != 0) {
1648             DFILE_LOGE(TAG, "PthreadMutexLock error");
1649             return;
1650         }
1651         ListMove(&session->inboundQueue, &newHead);
1652         session->recvBlockNumInner += session->inboundQueueSize;
1653         session->inboundQueueSize = 0;
1654         if (PthreadMutexUnlock(&session->inboundQueueLock) != 0) {
1655             DFILE_LOGE(TAG, "PthreadMutexUnlock error");
1656             break;
1657         }
1658         ProcessDFileFrameList(session, &newHead);
1659     }
1660     ClearDFileFrameList(&newHead);
1661 }
1662 
DFileAddInboundQueue(DFileSession * session,const uint8_t * frame,size_t frameLength,struct sockaddr_in * peerAddr,uint8_t socketIndex)1663 static int32_t DFileAddInboundQueue(DFileSession *session, const uint8_t *frame, size_t frameLength,
1664                                     struct sockaddr_in *peerAddr, uint8_t socketIndex)
1665 {
1666     if (session->inboundQueueSize > MAX_RECVBUF_COUNT) {
1667         if (session->inboundQueueSize % MAX_NOMEM_PRINT == 0) {
1668             DFILE_LOGI(TAG, "no mem inboundQueueSize:%llu", session->inboundQueueSize);
1669         }
1670         return NSTACKX_ENOMEM;
1671     }
1672     QueueNode *queueNode = CreateQueueNode(frame, frameLength, peerAddr, socketIndex);
1673     if (queueNode == NULL) {
1674         return NSTACKX_ENOMEM;
1675     }
1676 
1677     if (PthreadMutexLock(&session->inboundQueueLock) != 0) {
1678         DestroyQueueNode(queueNode);
1679         return NSTACKX_EFAILED;
1680     }
1681     ListInsertTail(&session->inboundQueue, &queueNode->list);
1682     session->inboundQueueSize++;
1683     session->recvBlockNumDirect++;
1684 
1685     if (PthreadMutexUnlock(&session->inboundQueueLock) != 0) {
1686         /* queue node is pushed to list, don't need destroy here. */
1687         return NSTACKX_EFAILED;
1688     }
1689 
1690     return NSTACKX_EOK;
1691 }
1692 
DFileReceiverUpdateSessionMeasureTime(DFileSession * session)1693 static void DFileReceiverUpdateSessionMeasureTime(DFileSession *session)
1694 {
1695     if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
1696         ClockGetTime(CLOCK_MONOTONIC, &session->measureBefore);
1697     }
1698 }
1699 
BindServerRecvThreadToTargetCpu(DFileSession * session)1700 static void BindServerRecvThreadToTargetCpu(DFileSession *session)
1701 {
1702     int32_t cpu;
1703     int32_t cpus = GetCpuNum();
1704     if (session->sessionType != DFILE_SESSION_TYPE_SERVER) {
1705         cpu = CPU_IDX_2;
1706         StartThreadBindCore(cpu);
1707         return;
1708     }
1709     if (cpus >= FIRST_CPU_NUM_LEVEL) {
1710         return;
1711     } else if (cpus >= SECOND_CPU_NUM_LEVEL) {
1712         cpu = CPU_IDX_1;
1713     } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
1714         cpu = CPU_IDX_0;
1715     } else {
1716         return;
1717     }
1718     StartThreadBindCore(cpu);
1719 }
1720 
PostReadEventToMainLoop(DFileSession * session)1721 static void PostReadEventToMainLoop(DFileSession *session)
1722 {
1723     if (NSTACKX_ATOM_FETCH(&(session->unprocessedReadEventCount)) >= MAX_UNPROCESSED_READ_EVENT_COUNT) {
1724         return;
1725     }
1726     NSTACKX_ATOM_FETCH_INC(&session->unprocessedReadEventCount);
1727     if (PostEvent(&session->eventNodeChain, session->epollfd, ReadEventHandle, session) != NSTACKX_EOK) {
1728         DFILE_LOGE(TAG, "post read event failed");
1729         NSTACKX_ATOM_FETCH_DEC(&session->unprocessedReadEventCount);
1730         session->mainLoopActiveReadFlag = NSTACKX_TRUE;
1731     } else {
1732         session->mainLoopActiveReadFlag = NSTACKX_FALSE;
1733     }
1734 }
1735 
DFileSessionHandleReadBuffer(DFileSession * session,const uint8_t * buf,size_t bufLen,struct sockaddr_in * peerAddr,uint8_t socketIndex)1736 int32_t DFileSessionHandleReadBuffer(DFileSession *session, const uint8_t *buf, size_t bufLen,
1737                                      struct sockaddr_in *peerAddr, uint8_t socketIndex)
1738 {
1739     DFileFrame *dFileFrame = NULL;
1740     if (DecodeDFileFrame(buf, bufLen, &dFileFrame) != NSTACKX_EOK) {
1741         /* discard packet with non-zero trans id during cancel stage */
1742         DFILE_LOGE(TAG, "drop malformed frame");
1743         return NSTACKX_EOK;
1744     }
1745 
1746     int32_t ret = DFileAddInboundQueue(session, buf, bufLen, peerAddr, socketIndex);
1747     if (ret == NSTACKX_ENOMEM) {
1748         return NSTACKX_EOK;
1749     }
1750     if (ret != NSTACKX_EOK) {
1751         DFILE_LOGE(TAG, "frame add in bound queue failed :%d", ret);
1752         return NSTACKX_EFAILED;
1753     }
1754     PostReadEventToMainLoop(session);
1755     DFileRecvCalculateRate(session, dFileFrame, peerAddr);
1756     return NSTACKX_EOK;
1757 }
1758 
DFileRecverPre(DFileSession * session)1759 static void DFileRecverPre(DFileSession *session)
1760 {
1761     SetThreadName(DFFILE_RECV_THREAD_NAME);
1762     DFileReceiverUpdateSessionMeasureTime(session);
1763     SetMaximumPriorityForThread();
1764     SetTidToBindInfo(session, POS_RECV_THERD_START);
1765 }
1766 
RcverWaitSocket(DFileSession * session,uint8_t * canRead)1767 int32_t RcverWaitSocket(DFileSession *session, uint8_t *canRead)
1768 {
1769     if (session->acceptFlag == 0) {
1770         return WaitSocketEvent(session, session->socket[0]->sockfd, DEFAULT_WAIT_TIME_MS, canRead, NULL);
1771     } else {
1772         return WaitSocketEvent(session, session->acceptSocket->sockfd, DEFAULT_WAIT_TIME_MS, canRead, NULL);
1773     }
1774 }
1775 
DFileSocketRecv(DFileSession * session)1776 int32_t DFileSocketRecv(DFileSession *session)
1777 {
1778     return DFileSocketRecvSP(session);
1779 }
1780 
DFileAcceptSocket(DFileSession * session)1781 int32_t DFileAcceptSocket(DFileSession *session)
1782 {
1783     session->acceptSocket = AcceptSocket(session->socket[0]);
1784     if (session->acceptSocket == NULL) {
1785         DFILE_LOGI(TAG, "accept socket failed");
1786         return NSTACKX_EFAILED;
1787     } else {
1788         DFILE_LOGI(TAG, "accept socket success");
1789         session->acceptFlag = 1;
1790         SetTcpKeepAlive(session->acceptSocket->sockfd);
1791     }
1792 
1793     AcceptSocketEvent();
1794 
1795     return NSTACKX_EOK;
1796 }
1797 
DFileReceiverHandle(void * arg)1798 void *DFileReceiverHandle(void *arg)
1799 {
1800     DFileSession *session = arg;
1801     uint8_t canRead = NSTACKX_FALSE;
1802     int32_t ret = NSTACKX_EAGAIN;
1803     uint8_t isBind = NSTACKX_FALSE;
1804 
1805     DFILE_LOGI(TAG, "recv thread start");
1806     DFileRecverPre(session);
1807     while (!session->closeFlag) {
1808         if (ret == NSTACKX_EAGAIN || !canRead) {
1809             ret = RcverWaitSocket(session, &canRead);
1810             if (ret != NSTACKX_EOK || session->closeFlag) {
1811                 break;
1812             }
1813         }
1814         if (!canRead) {
1815             continue;
1816         }
1817         if (isBind == NSTACKX_FALSE && session->transFlag == NSTACKX_TRUE) {
1818             BindServerRecvThreadToTargetCpu(session);
1819             isBind = NSTACKX_TRUE;
1820         }
1821 
1822         ret = DFileSocketRecv(session);
1823         if (ret != NSTACKX_EAGAIN && ret != NSTACKX_EOK) {
1824             PeerShuttedEvent();
1825             break;
1826         }
1827     }
1828     DFILE_LOGI(TAG, "Total recv blocks: direct %llu inner %llu", session->recvBlockNumDirect,
1829             session->recvBlockNumInner);
1830     if (ret < 0 && ret != NSTACKX_EAGAIN && ret != NSTACKX_PEER_CLOSE) {
1831         PostFatalEvent(session);
1832     }
1833 
1834     DFILE_LOGI(TAG, "session %u Exit receiver thread ret %d", session->sessionId, ret);
1835     return NULL;
1836 }
1837 
DFileControlHandle(void * arg)1838 void *DFileControlHandle(void *arg)
1839 {
1840     SetThreadName(DFFILE_CONTROL_THREAD_NAME);
1841     DFileSession *session = arg;
1842     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
1843         DFileSenderControlHandle(session);
1844     } else {
1845         DFileReceiverControlHandle(session);
1846     }
1847     return NULL;
1848 }
1849 
RealPathFileName(FileListInfo * fileListInfo)1850 static int32_t RealPathFileName(FileListInfo *fileListInfo)
1851 {
1852     uint32_t i;
1853     int32_t ret = NSTACKX_EOK;
1854     for (i = 0; i < fileListInfo->fileNum; i++) {
1855         char *tmpFileName = fileListInfo->files[i];
1856         char *tmpFileNameRes = realpath(tmpFileName, NULL);
1857         if (tmpFileNameRes == NULL) {
1858             ret = NSTACKX_EFAILED;
1859             break;
1860         }
1861         fileListInfo->files[i] = tmpFileNameRes;
1862         free(tmpFileName);
1863     }
1864     if (ret != NSTACKX_EOK) {
1865         DFILE_LOGE(TAG, "realpath failed");
1866     }
1867     return ret;
1868 }
1869 
FreeTransFileListInfo(FileListInfo * fileListInfo)1870 static void FreeTransFileListInfo(FileListInfo *fileListInfo)
1871 {
1872     free(fileListInfo->files);
1873     fileListInfo->files = NULL;
1874     if (fileListInfo->remotePath != NULL) {
1875         free(fileListInfo->remotePath);
1876         fileListInfo->remotePath = NULL;
1877     }
1878     free(fileListInfo);
1879 }
1880 
DFileStartTransInner(DFileSession * session,FileListInfo * fileListInfo)1881 static int32_t DFileStartTransInner(DFileSession *session, FileListInfo *fileListInfo)
1882 {
1883     uint16_t transId = session->lastDFileTransId + 1;
1884     if (transId == 0) { /* overflow */
1885         transId = 1;
1886     }
1887 
1888     PeerInfo *peerInfo = TransSelectPeerInfo(session);
1889     DFileTrans *trans = CreateTrans(transId, session, peerInfo, NSTACKX_TRUE);
1890     if (trans == NULL) {
1891         DFILE_LOGE(TAG, "CreateTrans error");
1892         return NSTACKX_ENOMEM;
1893     }
1894 
1895     if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
1896         DFILE_LOGE(TAG, "set trans mtu failed");
1897     }
1898     if (RealPathFileName(fileListInfo) != NSTACKX_EOK) {
1899         DFileTransDestroy(trans);
1900         return NSTACKX_EFAILED;
1901     }
1902     int32_t ret = DFileTransSendFiles(trans, fileListInfo);
1903     if (ret != NSTACKX_EOK) {
1904         DFileTransDestroy(trans);
1905         DFILE_LOGE(TAG, "DFileTransSendFiles fail, error: %d", ret);
1906         return ret;
1907     }
1908     ret = DFileTransAddExtraInfo(trans, fileListInfo->pathType, fileListInfo->noticeFileNameType,
1909                                  fileListInfo->userData);
1910     if (ret != NSTACKX_EOK) {
1911         DFILE_LOGE(TAG, "DFileTransAddExtraInfo fail");
1912         DFileTransDestroy(trans);
1913         return NSTACKX_EFAILED;
1914     }
1915     trans->fileList->tarFlag = fileListInfo->tarFlag;
1916     trans->fileList->smallFlag = fileListInfo->smallFlag;
1917     trans->fileList->tarFile = fileListInfo->tarFile;
1918     trans->fileList->noSyncFlag = fileListInfo->noSyncFlag;
1919 
1920     fileListInfo->userData = NULL;
1921     ListInsertTail(&(session->dFileTransChain), &(trans->list));
1922     session->lastDFileTransId = transId;
1923     if (fileListInfo->smallFlag == NSTACKX_TRUE) {
1924         session->smallListProcessingCnt++;
1925     } else {
1926         session->fileListProcessingCnt++;
1927     }
1928     /* Elements in ctx->fileListInfo->files[] are reused by dFileTranns, so don't need to free. */
1929     FreeTransFileListInfo(fileListInfo);
1930     return NSTACKX_EOK;
1931 }
1932 
DFileStartTrans(DFileSession * session,FileListInfo * fileListInfo)1933 int32_t DFileStartTrans(DFileSession *session, FileListInfo *fileListInfo)
1934 {
1935     if (PthreadMutexLock(&session->transIdLock) != 0) {
1936         DFILE_LOGE(TAG, "pthread mutex lock error");
1937         return NSTACKX_EFAILED;
1938     }
1939     int32_t ret = DFileStartTransInner(session, fileListInfo);
1940     if (PthreadMutexUnlock(&session->transIdLock) != 0) {
1941         DFILE_LOGE(TAG, "pthread mutex unlock error");
1942     }
1943     return ret;
1944 }
1945 
TerminateMainThreadInner(void * arg)1946 void TerminateMainThreadInner(void *arg)
1947 {
1948     DFileSession *session = (DFileSession *)arg;
1949     DFileSessionSetTerminateFlag(session);
1950 }
1951 
StartDFileThreadsInner(DFileSession * session)1952 int32_t StartDFileThreadsInner(DFileSession *session)
1953 {
1954     if (PthreadCreate(&(session->tid), NULL, DFileMainLoop, session)) {
1955         DFILE_LOGE(TAG, "Create mainloop thread failed");
1956         goto L_ERR_MAIN_LOOP_THREAD;
1957     }
1958 
1959     if (CreateSenderThread(session) != NSTACKX_EOK) {
1960         goto L_ERR_SENDER_THREAD;
1961     }
1962 
1963     if (PthreadCreate(&(session->receiverTid), NULL, DFileReceiverHandle, session)) {
1964         DFILE_LOGE(TAG, "Create receiver thread failed");
1965         goto L_ERR_RECEIVER_THREAD;
1966     }
1967 
1968     if (PthreadCreate(&(session->controlTid), NULL, DFileControlHandle, session)) {
1969         DFILE_LOGE(TAG, "Create control thread failed");
1970         goto L_ERR_CONTROL_THREAD;
1971     }
1972     return NSTACKX_EOK;
1973 L_ERR_CONTROL_THREAD:
1974     DFileSessionSetTerminateFlag(session);
1975     PthreadJoin(session->controlTid, NULL);
1976     session->receiverTid = INVALID_TID;
1977 L_ERR_RECEIVER_THREAD:
1978     DFileSessionSetTerminateFlag(session);
1979     PthreadJoin(session->senderTid[0], NULL);
1980     session->senderTid[0] = INVALID_TID;
1981     PostOutboundQueueWait(session);
1982 L_ERR_SENDER_THREAD:
1983     DFileSessionSetTerminateFlag(session);
1984     if (PostEvent(&session->eventNodeChain, session->epollfd, TerminateMainThreadInner, session) != NSTACKX_EOK) {
1985         DFILE_LOGE(TAG, "post terminate thread failed");
1986     }
1987     PthreadJoin(session->tid, NULL);
1988     session->tid = INVALID_TID;
1989 L_ERR_MAIN_LOOP_THREAD:
1990     return NSTACKX_EFAILED;
1991 }
1992 
FileManagerMsgHandle(FileManagerMsgType msgType,int32_t errCode,void * context)1993 static void FileManagerMsgHandle(FileManagerMsgType msgType, int32_t errCode, void *context)
1994 {
1995     DFileSession *session = context;
1996     if (msgType == FILE_MANAGER_INNER_ERROR) {
1997         DFILE_LOGE(TAG, "Session (%u) fatal error -- File Manager error: %d", session->sessionId, errCode);
1998         PostFatalEvent(session);
1999     }
2000 
2001     if (msgType == FILE_MANAGER_IN_PROGRESS) {
2002         NoticeSessionProgress(session);
2003     }
2004 }
2005 
CreateFileManager(DFileSession * session,const uint8_t * key,uint32_t keyLen,uint8_t isSender,uint16_t connType)2006 int32_t CreateFileManager(DFileSession *session, const uint8_t *key, uint32_t keyLen, uint8_t isSender,
2007     uint16_t connType)
2008 {
2009     FileManagerMsgPara msgPara;
2010     if (session == NULL) {
2011         DFILE_LOGE(TAG, "invalid input");
2012         return NSTACKX_EINVAL;
2013     }
2014     if (isSender && (connType != CONNECT_TYPE_P2P && connType != CONNECT_TYPE_WLAN)) {
2015         DFILE_LOGE(TAG, "connType for sender is illagal");
2016         return NSTACKX_EINVAL;
2017     }
2018     if (keyLen > 0) {
2019         if ((keyLen != AES_128_KEY_LENGTH && keyLen != CHACHA20_KEY_LENGTH) || key == NULL) {
2020             DFILE_LOGE(TAG, "error key or key len");
2021             return NSTACKX_EFAILED;
2022         }
2023         if (!IsCryptoIncluded()) {
2024             DFILE_LOGE(TAG, "crypto is not opened");
2025             return NSTACKX_EFAILED;
2026         }
2027     }
2028     msgPara.epollfd = session->epollfd;
2029     msgPara.eventNodeChain = &session->eventNodeChain;
2030     msgPara.msgReceiver = FileManagerMsgHandle;
2031     msgPara.context = session;
2032     session->fileManager = FileManagerCreate(isSender, &msgPara, key, keyLen, connType);
2033     if (session->fileManager == NULL) {
2034         DFILE_LOGE(TAG, "create filemanager failed");
2035         return NSTACKX_EFAILED;
2036     }
2037     return NSTACKX_EOK;
2038 }
2039 
DestroyReceiverPipe(DFileSession * session)2040 void DestroyReceiverPipe(DFileSession *session)
2041 {
2042     if (session->receiverPipe[PIPE_OUT] != INVALID_PIPE_DESC) {
2043         CloseDesc(session->receiverPipe[PIPE_OUT]);
2044         session->receiverPipe[PIPE_OUT] = INVALID_PIPE_DESC;
2045     }
2046     if (session->receiverPipe[PIPE_IN] != INVALID_PIPE_DESC) {
2047         CloseDesc(session->receiverPipe[PIPE_IN]);
2048         session->receiverPipe[PIPE_IN] = INVALID_PIPE_DESC;
2049     }
2050 }
2051 
DFileSetEvent(void * softObj,DFileEventFunc func)2052 void DFileSetEvent(void *softObj, DFileEventFunc func)
2053 {
2054     g_dfileEventFunc = func;
2055     (void)softObj;
2056 }
2057 
NSTACKX_DFileAssembleFunc(void * softObj,const DFileEvent * info)2058 void NSTACKX_DFileAssembleFunc(void *softObj, const DFileEvent *info)
2059 {
2060     if (g_dfileEventFunc != NULL) {
2061         g_dfileEventFunc(softObj, info);
2062     }
2063 }
2064