• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "nstackx_dfile_session.h"
17 
18 #include "nstackx_congestion.h"
19 #include "nstackx_dfile.h"
20 #include "nstackx_dfile_config.h"
21 #include "nstackx_dfile_frame.h"
22 #include "nstackx_dfile_send.h"
23 #include "nstackx_epoll.h"
24 #include "nstackx_error.h"
25 #include "nstackx_event.h"
26 #include "nstackx_dfile_log.h"
27 #ifdef MBEDTLS_INCLUDED
28 #include "nstackx_mbedtls.h"
29 #else
30 #include "nstackx_openssl.h"
31 #endif
32 #include "nstackx_timer.h"
33 #include "nstackx_dfile_control.h"
34 #include "nstackx_dfile_mp.h"
35 #include "securec.h"
36 #include "nstackx_util.h"
37 #include "nstackx_dfile_dfx.h"
38 
39 #define TAG "nStackXDFile"
40 
41 #define DEFAULT_WAIT_TIME_MS               1000
42 #define MULTI_THREADS_SOCKET_WAIT_TIME_MS  1
43 #define DEFAULT_NEGOTIATE_TIMEOUT          1000
44 #define TCP_NEGOTIATE_TIMEOUT              10000
45 #define MAX_SERVER_NEGOTIATE_VALID_TIMEOUT (600 * DEFAULT_NEGOTIATE_TIMEOUT)
46 #define MAX_NEGOTIATE_TIMEOUT_COUNT        3
47 #define MAX_UNPROCESSED_READ_EVENT_COUNT   3
48 #define MAX_RECVBUF_COUNT                  130000
49 #define MAX_NOMEM_PRINT                    10000
50 
51 static void ReadEventHandle(void *arg);
52 static void ProcessSessionTrans(const DFileSession *session, uint16_t exceptTransId);
53 DFileEventFunc g_dfileEventFunc;
54 
CreateQueueNode(const uint8_t * frame,size_t length,const struct sockaddr_in * peerAddr,uint8_t socketIndex)55 static QueueNode *CreateQueueNode(const uint8_t *frame, size_t length,
56     const struct sockaddr_in *peerAddr, uint8_t socketIndex)
57 {
58     QueueNode *queueNode = NULL;
59 
60     if (frame == NULL || length == 0 || length > NSTACKX_MAX_FRAME_SIZE) {
61         return NULL;
62     }
63 
64     queueNode = calloc(1, sizeof(QueueNode));
65     if (queueNode == NULL) {
66         return NULL;
67     }
68 
69     queueNode->frame = calloc(1, length);
70     if (queueNode->frame == NULL) {
71         free(queueNode);
72         return NULL;
73     }
74     queueNode->length = length;
75     queueNode->sendLen = 0;
76 
77     if (memcpy_s(queueNode->frame, length, frame, length) != EOK) {
78         DestroyQueueNode(queueNode);
79         return NULL;
80     }
81     if (peerAddr != NULL) {
82         if (memcpy_s(&queueNode->peerAddr, sizeof(struct sockaddr_in), peerAddr, sizeof(struct sockaddr_in)) != EOK) {
83             DestroyQueueNode(queueNode);
84             return NULL;
85         }
86     }
87     queueNode->socketIndex = socketIndex;
88 
89     return queueNode;
90 }
91 
DestroyQueueNode(QueueNode * queueNode)92 void DestroyQueueNode(QueueNode *queueNode)
93 {
94     if (queueNode != NULL) {
95         free(queueNode->frame);
96         free(queueNode);
97     }
98 }
99 
NotifyMsgRecver(const DFileSession * session,DFileMsgType msgType,const DFileMsg * msg)100 void NotifyMsgRecver(const DFileSession *session, DFileMsgType msgType, const DFileMsg *msg)
101 {
102     if (session == NULL) {
103         DFILE_LOGI(TAG, "session is NULL");
104         return;
105     }
106 
107     if (session->msgReceiver == NULL) {
108         DFILE_LOGI(TAG, "msgReceiver is NULL");
109         return;
110     }
111 
112     session->msgReceiver(session->sessionId, msgType, msg);
113 }
114 
CalculateSessionTransferRatePrepare(DFileSession * session)115 void CalculateSessionTransferRatePrepare(DFileSession *session)
116 {
117 #ifdef NSTACKX_SMALL_FILE_SUPPORT
118     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && (!ListIsEmpty(&session->pendingFileLists)
119         || !ListIsEmpty(&session->smallFileLists))) {
120         return;
121     }
122 #else
123     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && !ListIsEmpty(&session->pendingFileLists)) {
124         return;
125     }
126 #endif
127     if (!ListIsEmpty(&session->dFileTransChain)) {
128         return;
129     }
130     DFILE_LOGI(TAG, "begin to calculate transfer rate");
131     session->bytesTransferred = 0;
132     session->transCount = 0;
133     ClockGetTime(CLOCK_MONOTONIC, &session->startTs);
134 }
135 
136 #ifdef NSTACKX_SMALL_FILE_SUPPORT
SendSmallList(DFileSession * session)137 static int32_t SendSmallList(DFileSession *session)
138 {
139     FileListInfo *fileListInfo = NULL;
140     DFileMsg data;
141     if (session->fileListProcessingCnt > 0 || session->fileListPendingCnt == 0) {
142         while (!ListIsEmpty(&session->smallFileLists)) {
143             fileListInfo = (FileListInfo *)ListPopFront(&session->smallFileLists);
144             session->smallListPendingCnt--;
145             if (fileListInfo == NULL) {
146                 continue;
147             }
148             int32_t ret = DFileStartTrans(session, fileListInfo);
149             if (ret == NSTACKX_EOK) {
150                 return NSTACKX_TRUE;
151             }
152             DFILE_LOGE(TAG, "DFileStartTrans fail, error: %d", ret);
153             (void)memset_s(&data, sizeof(data), 0, sizeof(data));
154             data.errorCode = ret;
155             data.fileList.files = (const char **)fileListInfo->files;
156             data.fileList.fileNum = fileListInfo->fileNum;
157             data.fileList.userData = fileListInfo->userData;
158             NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, &data);
159             DestroyFileListInfo(fileListInfo);
160         }
161     }
162     return NSTACKX_FALSE;
163 }
164 #endif
165 
SendPendingList(DFileSession * session)166 static void SendPendingList(DFileSession *session)
167 {
168     FileListInfo *fileListInfo = NULL;
169     DFileMsg data;
170     while (!ListIsEmpty(&session->pendingFileLists)) {
171         fileListInfo = (FileListInfo *)ListPopFront(&session->pendingFileLists);
172         session->fileListPendingCnt--;
173         if (fileListInfo == NULL) {
174             continue;
175         }
176         int32_t ret = DFileStartTrans(session, fileListInfo);
177         if (ret == NSTACKX_EOK) {
178             break;
179         }
180         DFILE_LOGE(TAG, "DFileStartTrans fail, error: %d", ret);
181         (void)memset_s(&data, sizeof(data), 0, sizeof(data));
182         data.errorCode = ret;
183         data.fileList.files = (const char **)fileListInfo->files;
184         data.fileList.fileNum = fileListInfo->fileNum;
185         data.fileList.userData = fileListInfo->userData;
186         NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, &data);
187         DestroyFileListInfo(fileListInfo);
188     }
189 }
190 
SendSmallAndPendingList(DFileSession * session)191 static void SendSmallAndPendingList(DFileSession *session)
192 {
193 #ifdef NSTACKX_SMALL_FILE_SUPPORT
194     DFILE_LOGI(TAG, "fileListPendingCnt %u fileListProcessingCnt %u smallListPendingCnt %u smallListProcessingCnt %u",
195         session->fileListPendingCnt, session->fileListProcessingCnt, session->smallListPendingCnt,
196         session->smallListProcessingCnt);
197     if (SendSmallList(session) != NSTACKX_TRUE) {
198         SendPendingList(session);
199     }
200 #else
201     DFILE_LOGI(TAG, "fileListPendingCnt %u fileListProcessingCnt %u",
202         session->fileListPendingCnt, session->fileListProcessingCnt);
203     SendPendingList(session);
204 #endif
205 }
206 
NoticeSessionProgress(DFileSession * session)207 void NoticeSessionProgress(DFileSession *session)
208 {
209     DFileMsg data;
210     (void)memset_s(&data, sizeof(data), 0, sizeof(data));
211     if ((FileManagerGetTotalBytes(session->fileManager, &data.transferUpdate.totalBytes) == NSTACKX_EOK) &&
212         (FileManagerGetBytesTransferred(session->fileManager, &data.transferUpdate.bytesTransferred) == NSTACKX_EOK) &&
213         (data.transferUpdate.bytesTransferred <= data.transferUpdate.totalBytes) &&
214         (data.transferUpdate.bytesTransferred > 0)) {
215         NotifyMsgRecver(session, DFILE_ON_SESSION_IN_PROGRESS, &data);
216     }
217 }
218 
UpdateMsgProcessInfo(const DFileSession * session,struct DFileTrans * dFileTrans,DFileTransMsgType msgType,DFileTransMsg * msg)219 static void UpdateMsgProcessInfo(const DFileSession *session, struct DFileTrans *dFileTrans,
220                                  DFileTransMsgType msgType, DFileTransMsg *msg)
221 {
222     uint64_t totalBytes = 0;
223     uint64_t bytesTrans = 0;
224 
225     if (session == NULL || dFileTrans == NULL || msg == NULL ||
226         (msgType != DFILE_TRANS_MSG_FILE_RECEIVED && msgType != DFILE_TRANS_MSG_FILE_RECEIVE_FAIL &&
227         msgType != DFILE_TRANS_MSG_FILE_SEND_FAIL && msgType != DFILE_TRANS_MSG_FILE_SENT)) {
228         return;
229     }
230 
231     msg->transferUpdate.transId = dFileTrans->transId;
232 
233     if (msgType == DFILE_TRANS_MSG_FILE_RECEIVED ||  msgType == DFILE_TRANS_MSG_FILE_SENT) {
234         totalBytes = DFileTransGetTotalBytes(dFileTrans);
235         if (totalBytes > 0) {
236             bytesTrans = totalBytes;
237             goto L_END;
238         }
239     }
240 
241     if (FileManagerGetTransUpdateInfo(session->fileManager, dFileTrans->transId, &totalBytes, &bytesTrans) !=
242         NSTACKX_EOK) {
243         return;
244     }
245 
246 L_END:
247     msg->transferUpdate.totalBytes = totalBytes;
248     msg->transferUpdate.bytesTransferred = bytesTrans;
249     if (msgType == DFILE_TRANS_MSG_FILE_SENT) {
250         if (dFileTrans->fileList->vtransFlag) {
251             msg->fileList.transId = dFileTrans->fileList->vtransRealTransId;
252             msg->transferUpdate.transId = dFileTrans->fileList->vtransRealTransId;
253         }
254         NotifyMsgRecver(session, DFILE_ON_TRANS_IN_PROGRESS, msg);
255     }
256 }
257 
WakeSendThread(DFileSession * session,uint8_t isSender,uint8_t socketIndex)258 static void WakeSendThread(DFileSession *session, uint8_t isSender, uint8_t socketIndex)
259 {
260     SemPost(&session->outboundQueueWait[socketIndex]);
261     if (isSender && session->clientSendThreadNum > 1) {
262         for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
263             SemPost(&session->sendThreadPara[i].sendWait);
264         }
265     }
266 }
267 
CalculateSessionTransferRate(DFileSession * session,uint64_t totalBytes,DFileTransMsgType msgType)268 static void CalculateSessionTransferRate(DFileSession *session, uint64_t totalBytes, DFileTransMsgType msgType)
269 {
270     if (msgType != DFILE_TRANS_MSG_FILE_SENT && msgType != DFILE_TRANS_MSG_END) {
271         return;
272     }
273     if (totalBytes <= UINT64_MAX - session->bytesTransferred) {
274         session->bytesTransferred += totalBytes;
275     } else {
276         session->bytesTransferred = UINT64_MAX;
277     }
278     session->transCount++;
279     if (!ListIsEmpty(&session->dFileTransChain)) {
280         return;
281     }
282 #ifdef NSTACKX_SMALL_FILE_SUPPORT
283     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && (!ListIsEmpty(&session->pendingFileLists)
284         || !ListIsEmpty(&session->smallFileLists))) {
285         return;
286     }
287 #else
288     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && !ListIsEmpty(&session->pendingFileLists)) {
289         return;
290     }
291 #endif
292     struct timespec endTs;
293     ClockGetTime(CLOCK_MONOTONIC, &endTs);
294 
295     uint32_t spendTime = GetTimeDiffMs(&endTs, &session->startTs);
296     if (spendTime == 0) {
297         return;
298     }
299     const double rate = 1.0 * session->bytesTransferred / DFILE_MEGABYTES * MSEC_TICKS_PER_SEC / spendTime;
300     DFILE_LOGI(TAG, "Total %u trans, %llu bytes, used %u ms. rate %.2f MB/s",
301         session->transCount, session->bytesTransferred, spendTime, rate);
302     DFileMsg msgData;
303     (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
304     msgData.rate = (uint32_t)rate;
305     NotifyMsgRecver(session, DFILE_ON_SESSION_TRANSFER_RATE, &msgData);
306 
307     TransferCompleteEvent(rate);
308 }
309 
CheckTransDone(DFileSession * session,struct DFileTrans * dFileTrans,DFileTransMsgType msgType)310 static void CheckTransDone(DFileSession *session, struct DFileTrans *dFileTrans, DFileTransMsgType msgType)
311 {
312     if (msgType == DFILE_TRANS_MSG_FILE_RECEIVE_FAIL || msgType == DFILE_TRANS_MSG_FILE_SENT ||
313         msgType == DFILE_TRANS_MSG_FILE_SEND_FAIL || msgType == DFILE_TRANS_MSG_END) {
314         uint8_t flag = dFileTrans->fileList->smallFlag;
315         if (SetTransIdState(session, dFileTrans->transId, STATE_TRANS_DONE) != NSTACKX_EOK) {
316             DFILE_LOGE(TAG, "set trans id state fail");
317         }
318         ((PeerInfo *)dFileTrans->context)->currentTransCount--;
319         ListRemoveNode(&dFileTrans->list);
320         uint64_t totalBytes = DFileTransGetTotalBytes(dFileTrans);
321         DFileTransDestroy(dFileTrans);
322         if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
323             if (flag == NSTACKX_TRUE && session->smallListProcessingCnt > 0) {
324                 session->smallListProcessingCnt--;
325             } else if (session->fileListProcessingCnt > 0) {
326                 session->fileListProcessingCnt--;
327             }
328             SendSmallAndPendingList(session);
329         }
330         CalculateSessionTransferRate(session, totalBytes, msgType);
331     }
332 }
333 
DTransMsgReceiver(struct DFileTrans * dFileTrans,DFileTransMsgType msgType,DFileTransMsg * msg)334 static void DTransMsgReceiver(struct DFileTrans *dFileTrans, DFileTransMsgType msgType,
335                               DFileTransMsg *msg)
336 {
337     PeerInfo *peerInfo = dFileTrans->context;
338     DFileSession *session = peerInfo->session;
339 
340     UpdateMsgProcessInfo(session, dFileTrans, msgType, msg);
341 
342     switch (msgType) {
343         case DFILE_TRANS_MSG_FILE_SEND_DATA:
344             WakeSendThread(session, dFileTrans->isSender, peerInfo->socketIndex);
345             break;
346         case DFILE_TRANS_MSG_FILE_LIST_RECEIVED:
347             NotifyMsgRecver(session, DFILE_ON_FILE_LIST_RECEIVED, msg);
348             break;
349         case DFILE_TRANS_MSG_FILE_RECEIVED: /* Receiver receive all the file data */
350             NotifyMsgRecver(session, DFILE_ON_FILE_RECEIVE_SUCCESS, msg);
351             break;
352         case DFILE_TRANS_MSG_FILE_RECEIVED_TO_FAIL:
353         case DFILE_TRANS_MSG_FILE_RECEIVE_FAIL:
354             NotifyMsgRecver(session, DFILE_ON_FILE_RECEIVE_FAIL, msg);
355             break;
356         case DFILE_TRANS_MSG_FILE_SENT: /* Sender send TRANSFER DONE ACK frame and come to end */
357             NoticeSessionProgress(session);
358             NotifyMsgRecver(session, DFILE_ON_FILE_SEND_SUCCESS, msg);
359             break;
360         case DFILE_TRANS_MSG_FILE_SEND_FAIL:
361             NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, msg);
362             break;
363         case DFILE_TRANS_MSG_IN_PROGRESS:
364             NotifyMsgRecver(session, DFILE_ON_TRANS_IN_PROGRESS, msg);
365             break;
366         case DFILE_TRANS_MSG_FILE_SEND_ACK:
367             ProcessSessionTrans(session, dFileTrans->transId);
368             break;
369         default:
370             DFILE_LOGI(TAG, "transId %u, recv DFileTrans event %d", dFileTrans->transId, msgType);
371     }
372 
373     CheckTransDone(session, dFileTrans, msgType);
374 }
375 
ServerSettingTimeoutHandle(void * data)376 static void ServerSettingTimeoutHandle(void *data)
377 {
378     PeerInfo *peerInfo = data;
379     ListRemoveNode(&peerInfo->list);
380     TimerDelete(peerInfo->settingTimer);
381     peerInfo->session->peerInfoCnt--;
382     free(peerInfo);
383     DFILE_LOGI(TAG, "DFileServer Setting Negotationion timeout");
384 }
385 
ClientSettingTimeoutHandle(void * data)386 static void ClientSettingTimeoutHandle(void *data)
387 {
388     PeerInfo *peerInfo = data;
389     uint8_t cnt = peerInfo->settingTimeoutCnt++;
390     DFileMsg msgData;
391     uint32_t timeout = CapsTcp(peerInfo->session) ? TCP_NEGOTIATE_TIMEOUT : DEFAULT_NEGOTIATE_TIMEOUT;
392     (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
393     if (cnt >= MAX_NEGOTIATE_TIMEOUT_COUNT) {
394         TimerDelete(peerInfo->settingTimer);
395         peerInfo->settingTimer = NULL;
396         peerInfo->settingTimeoutCnt = 0;
397         msgData.errorCode = NSTACKX_EFAILED;
398         DFILE_LOGI(TAG, "DFileClient Setting Negotationion timeout");
399         NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &msgData);
400     } else {
401         DFileSessionSendSetting(peerInfo);
402         DFILE_LOGI(TAG, "Client Setting Negotationion timeout %u times", peerInfo->settingTimeoutCnt);
403         if (TimerSetTimeout(peerInfo->settingTimer, timeout, NSTACKX_FALSE) != NSTACKX_EOK) {
404             msgData.errorCode = NSTACKX_EFAILED;
405             NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &msgData);
406             DFILE_LOGE(TAG, "Timer setting timer fail");
407         }
408     }
409 }
410 
SearchDFileTransNode(List * dFileTransChain,uint16_t transId)411 static DFileTrans *SearchDFileTransNode(List *dFileTransChain, uint16_t transId)
412 {
413     List *pos = NULL;
414     DFileTrans *trans = NULL;
415 
416     if (dFileTransChain == NULL || transId == 0) {
417         return NULL;
418     }
419 
420     LIST_FOR_EACH(pos, dFileTransChain) {
421         trans = (DFileTrans *)pos;
422         if (trans->transId == transId) {
423             return trans;
424         }
425     }
426     return NULL;
427 }
428 
SearchPeerInfoNode(const DFileSession * session,const struct sockaddr_in * peerAddr)429 static PeerInfo *SearchPeerInfoNode(const DFileSession *session, const struct sockaddr_in *peerAddr)
430 {
431     List *pos = NULL;
432     PeerInfo *peerInfo = NULL;
433     LIST_FOR_EACH(pos, &session->peerInfoChain) {
434         peerInfo = (PeerInfo *)pos;
435         if (memcmp(&peerInfo->dstAddr, peerAddr, sizeof(struct sockaddr_in)) == 0 &&
436             peerInfo->session->sessionId == session->sessionId) {
437             return peerInfo;
438         }
439     }
440     return NULL;
441 }
442 
DFileSessionSendSetting(PeerInfo * peerInfo)443 void DFileSessionSendSetting(PeerInfo *peerInfo)
444 {
445     uint32_t timeout = CapsTcp(peerInfo->session) ? TCP_NEGOTIATE_TIMEOUT : DEFAULT_NEGOTIATE_TIMEOUT;
446     uint8_t buf[NSTACKX_DEFAULT_FRAME_SIZE];
447     size_t frameLen = 0;
448     DFileMsg data;
449     (void)memset_s(&data, sizeof(data), 0, sizeof(data));
450     SettingFrame settingFramePara;
451     settingFramePara.connType = peerInfo->connType;
452     settingFramePara.mtu = peerInfo->localMtu;
453     settingFramePara.capability = peerInfo->session->capability & NSTACKX_CAPS_LINK_SEQUENCE;
454     settingFramePara.dataFrameSize = 0;
455     settingFramePara.capsCheck = NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
456     if (peerInfo->session->fileManager->keyLen) {
457         DFileGetCipherCaps(peerInfo->session, &settingFramePara);
458         settingFramePara.deviceBits = DFileGetDeviceBits();
459     }
460     EncodeSettingFrame(buf, NSTACKX_DEFAULT_FRAME_SIZE, &frameLen, &settingFramePara);
461 
462     if (peerInfo->settingTimer != NULL) {
463         int32_t ret = DFileWriteHandle(buf, frameLen, peerInfo);
464         if (ret != (int32_t)frameLen && ret != NSTACKX_EAGAIN) {
465             data.errorCode = NSTACKX_EFAILED;
466             NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
467         }
468         return;
469     }
470 
471     if (peerInfo->session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
472         peerInfo->settingTimer = TimerStart(peerInfo->session->epollfd, timeout,
473                                             NSTACKX_FALSE, ClientSettingTimeoutHandle, peerInfo);
474         if (peerInfo->settingTimer == NULL) {
475             DFILE_LOGE(TAG, "setting timmer creat fail");
476             data.errorCode = NSTACKX_EFAILED;
477             NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
478             return;
479         }
480     } else {
481         peerInfo->settingTimer = TimerStart(peerInfo->session->epollfd, MAX_SERVER_NEGOTIATE_VALID_TIMEOUT,
482                                             NSTACKX_FALSE, ServerSettingTimeoutHandle, peerInfo);
483         if (peerInfo->settingTimer == NULL) {
484             return;
485         }
486     }
487 
488     int32_t ret = DFileWriteHandle(buf, frameLen, peerInfo);
489     if (ret != (int32_t)frameLen && ret != NSTACKX_EAGAIN) {
490         data.errorCode = NSTACKX_EFAILED;
491         NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
492     }
493 }
494 
SetDFileSessionConfig(DFileSession * session,DFileConfig * dFileConfig,uint16_t connType,PeerInfo * peerInfo)495 static void SetDFileSessionConfig(DFileSession *session, DFileConfig *dFileConfig, uint16_t connType,
496     PeerInfo *peerInfo)
497 {
498     peerInfo->maxSendRate = dFileConfig->sendRate;
499     (void)memset_s(peerInfo->integralLossRate, INTEGRAL_TIME * sizeof(double), 0, INTEGRAL_TIME * sizeof(double));
500     peerInfo->fastStartCounter = 0;
501     ClockGetTime(CLOCK_MONOTONIC, &peerInfo->measureBefore);
502     peerInfo->dataFrameSize = dFileConfig->dataFrameSize;
503     if (connType == CONNECT_TYPE_WLAN) {
504         peerInfo->sendRate = dFileConfig->sendRate / NSTACKX_WLAN_INIT_SPEED_DIVISOR;
505     } else {
506         peerInfo->sendRate = dFileConfig->sendRate / NSTACKX_P2P_INIT_SPEED_DIVISOR;
507     }
508 
509 #ifndef NSTACKX_WITH_LITEOS
510     if ((!peerInfo->gotWifiRate) && (connType == CONNECT_TYPE_WLAN)) {
511         peerInfo->sendRate = (uint16_t)(NSTACKX_WLAN_INIT_RATE / MSEC_TICKS_PER_SEC * DATA_FRAME_SEND_INTERVAL_MS
512             / dFileConfig->dataFrameSize + NSTACKX_WLAN_COMPENSATION_RATE);
513     }
514 #endif
515     if (peerInfo->sendRate < NSTACKX_MIN_SENDRATE) {
516         peerInfo->sendRate = NSTACKX_MIN_SENDRATE;
517     }
518 
519     if (FileManagerSetMaxFrameLength(session->fileManager, peerInfo->dataFrameSize) != NSTACKX_EOK) {
520         DFILE_LOGE(TAG, "failed to set max frame length");
521     }
522 
523     DFILE_LOGI(TAG, "connType is %u set sendrate is %u maxSendRate is %u peerInfo->dataFrameSize is %u",
524         connType, peerInfo->sendRate, peerInfo->maxSendRate, peerInfo->dataFrameSize);
525     if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
526         if (FileManagerSetRecvParaWithConnType(session->fileManager, connType) != NSTACKX_EOK) {
527             DFILE_LOGE(TAG, "failed to set recv para");
528         }
529     }
530 }
531 
DFileSessionSetPeerInfo(PeerInfo * peerInfo,SettingState state,SettingFrame * settingFrame)532 static void DFileSessionSetPeerInfo(PeerInfo *peerInfo, SettingState state, SettingFrame *settingFrame)
533 {
534     peerInfo->state = state;
535     peerInfo->mtu = settingFrame->mtu;
536     peerInfo->connType = settingFrame->connType;
537     uint16_t localMtu = peerInfo->localMtu;
538     peerInfo->mtuInuse = (localMtu < peerInfo->mtu) ? localMtu : peerInfo->mtu;
539     peerInfo->remoteSessionId = settingFrame->header.sessionId;
540 }
541 
DFileSessionHandleClientSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)542 static void DFileSessionHandleClientSetting(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
543 {
544     List *pos = NULL;
545     SettingFrame hostSettingFrame;
546     (void)memset_s(&hostSettingFrame, sizeof(hostSettingFrame), 0, sizeof(hostSettingFrame));
547     DFILE_LOGI(TAG, "handle Setting Frame, DFileSessionType %u", session->sessionType);
548     /* unsupport version */
549     if (DecodeSettingFrame((SettingFrame *)dFileFrame, &hostSettingFrame) != NSTACKX_EOK || hostSettingFrame.mtu == 0) {
550         return;
551     }
552     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
553     if (peerInfo == NULL) {
554         DFILE_LOGE(TAG, "recv unknown peer setting, maybe be attacked");
555         return;
556     }
557     peerInfo->remoteDFileVersion = hostSettingFrame.dFileVersion;
558     TimerDelete(peerInfo->settingTimer);
559     peerInfo->settingTimer = NULL;
560     DFileSessionSetPeerInfo(peerInfo, SETTING_NEGOTIATED, &hostSettingFrame);
561     LIST_FOR_EACH(pos, &session->dFileTransChain) {
562         DFileTrans *trans = (DFileTrans *)pos;
563         if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
564             DFILE_LOGE(TAG, "set trans mtu failed");
565         }
566     }
567     DFileMsg data;
568     (void)memset_s(&data, sizeof(data), 0, sizeof(data));
569     data.errorCode = NSTACKX_EOK;
570     NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_SUCCESS, &data);
571 
572     DFileConfig dFileConfig;
573     (void)memset_s(&dFileConfig, sizeof(dFileConfig), 0, sizeof(dFileConfig));
574     if (GetDFileConfig(&dFileConfig, peerInfo->mtuInuse, hostSettingFrame.connType) == NSTACKX_EOK) {
575         SetDFileSessionConfig(session, &dFileConfig, hostSettingFrame.connType, peerInfo);
576     }
577     if (hostSettingFrame.capability & NSTACKX_CAPS_LINK_SEQUENCE) {
578         DFILE_LOGI(TAG, "server replies not support Link Sequence");
579     } else {
580         DFILE_LOGI(TAG, "server replies using normal ACK");
581         session->capability &= ~NSTACKX_CAPS_LINK_SEQUENCE;
582     }
583     DFileChooseCipherType(&hostSettingFrame, session);
584 }
585 
DFileGetMTU(SocketProtocol protocol)586 static uint16_t DFileGetMTU(SocketProtocol protocol)
587 {
588     /* for udp, return NSTACKX_DEFAULT_FRAME_SIZE, for D2D, need call D2D MTU interface */
589     uint16_t mtu = 0;
590     if (protocol == NSTACKX_PROTOCOL_UDP) {
591         mtu = NSTACKX_DEFAULT_FRAME_SIZE;
592     } else if (protocol == NSTACKX_PROTOCOL_D2D) {
593         DFILE_LOGE(TAG, "d2d not support");
594     } else if (protocol == NSTACKX_PROTOCOL_TCP) {
595         mtu = NSTACKX_DEFAULT_FRAME_SIZE;
596     }
597 
598     return mtu;
599 }
600 
CreatePeerInfo(DFileSession * session,const struct sockaddr_in * dstAddr,uint16_t peerMtu,uint16_t connType,uint8_t socketIndex)601 PeerInfo *CreatePeerInfo(DFileSession *session, const struct sockaddr_in *dstAddr, uint16_t peerMtu,
602     uint16_t connType, uint8_t socketIndex)
603 {
604     if (session->peerInfoCnt >= MAX_PEERINFO_SIZE) {
605         return NULL;
606     }
607     PeerInfo *peerInfo = calloc(1, sizeof(PeerInfo));
608     if (peerInfo == NULL) {
609         return NULL;
610     }
611     peerInfo->session = session;
612     peerInfo->dstAddr = *dstAddr;
613     peerInfo->connType = connType;
614     peerInfo->socketIndex = socketIndex;
615     peerInfo->localMtu = DFileGetMTU(session->protocol);
616     session->peerInfoCnt++;
617     peerInfo->gotWifiRate = 0;
618     peerInfo->ackInterval = NSTACKX_INIT_ACK_INTERVAL;
619     peerInfo->rateStateInterval = NSTACKX_INIT_RATE_STAT_INTERVAL;
620 
621     if (GetInterfaceNameByIP(session->socket[socketIndex]->srcAddr.sin_addr.s_addr,
622         peerInfo->localInterfaceName, sizeof(peerInfo->localInterfaceName)) != NSTACKX_EOK) {
623         DFILE_LOGE(TAG, "GetInterfaceNameByIP failed %d", errno);
624     }
625 
626     if (peerMtu == 0) {
627         return peerInfo;
628     }
629     peerInfo->mtu = peerMtu;
630     peerInfo->mtuInuse = (peerInfo->localMtu < peerInfo->mtu) ? peerInfo->localMtu : peerInfo->mtu;
631     return peerInfo;
632 }
633 
HandleLinkSeqCap(DFileSession * session,SettingFrame * hostSettingFrame)634 static void HandleLinkSeqCap(DFileSession *session, SettingFrame *hostSettingFrame)
635 {
636     if (hostSettingFrame->capability & NSTACKX_CAPS_LINK_SEQUENCE) {
637         DFILE_LOGI(TAG, "client wants to enable Link Sequence");
638     } else {
639         DFILE_LOGI(TAG, "client wants to use normal ACK");
640         session->capability &= ~NSTACKX_CAPS_LINK_SEQUENCE;
641     }
642 
643     if (hostSettingFrame->capability & NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK) {
644         DFILE_LOGI(TAG, "client support recv feedback");
645         session->capsCheck |= NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
646     } else {
647         DFILE_LOGI(TAG, "client do not support recv feedback");
648         session->capsCheck &= ~NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
649     }
650 }
651 
AllocPeerInfo(DFileSession * session,const struct sockaddr_in * peerAddr,const SettingFrame * frame,uint8_t socketIndex)652 static PeerInfo *AllocPeerInfo(DFileSession *session, const struct sockaddr_in *peerAddr, const SettingFrame *frame,
653     uint8_t socketIndex)
654 {
655     PeerInfo *peerInfo = NULL;
656 
657     peerInfo = CreatePeerInfo(session, peerAddr, frame->mtu, frame->connType, socketIndex);
658     if (peerInfo == NULL) {
659         return NULL;
660     }
661 
662     peerInfo->remoteSessionId = frame->header.sessionId;
663     peerInfo->state = SETTING_NEGOTIATING;
664     DFileSessionSendSetting(peerInfo);
665     if (peerInfo->settingTimer == NULL) {
666         free(peerInfo);
667         session->peerInfoCnt--;
668         return NULL;
669     }
670     ListInsertTail(&peerInfo->session->peerInfoChain, &peerInfo->list);
671     return peerInfo;
672 }
673 
DFileSessionHandleServerSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr,uint8_t socketIndex)674 static void DFileSessionHandleServerSetting(DFileSession *session, DFileFrame *dFileFrame,
675     struct sockaddr_in *peerAddr, uint8_t socketIndex)
676 {
677     SettingFrame hostSettingFrame;
678     DFileConfig dFileConfig;
679     (void)memset_s(&hostSettingFrame, sizeof(hostSettingFrame), 0, sizeof(hostSettingFrame));
680     (void)memset_s(&dFileConfig, sizeof(dFileConfig), 0, sizeof(dFileConfig));
681     DFILE_LOGI(TAG, "handle Setting Frame, DFileSessionType %u", session->sessionType);
682     if (DecodeSettingFrame((SettingFrame *)dFileFrame, &hostSettingFrame) != NSTACKX_EOK || hostSettingFrame.mtu == 0) {
683         return;
684     }
685 
686     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
687     if (peerInfo != NULL) {
688         if (peerInfo->settingTimeoutCnt >= MAX_NEGOTIATE_TIMEOUT_COUNT) {
689             DFILE_LOGE(TAG, "receive more than %d Setting for one peer, drop", MAX_NEGOTIATE_TIMEOUT_COUNT);
690             return;
691         } else {
692             DFileSessionSetPeerInfo(peerInfo, SETTING_NEGOTIATING, &hostSettingFrame);
693             DFileSessionSendSetting(peerInfo);
694             goto L_END;
695         }
696     }
697 
698     peerInfo = AllocPeerInfo(session, peerAddr, &hostSettingFrame, socketIndex);
699     if (peerInfo == NULL) {
700         return;
701     }
702 
703 L_END:
704     peerInfo->settingTimeoutCnt++;
705     DFILE_LOGI(TAG, "DFileServer response Setting Frame. count %u", peerInfo->settingTimeoutCnt);
706     peerInfo->remoteDFileVersion = hostSettingFrame.dFileVersion;
707     if (GetDFileConfig(&dFileConfig, peerInfo->mtuInuse, hostSettingFrame.connType) == NSTACKX_EOK) {
708         SetDFileSessionConfig(session, &dFileConfig, hostSettingFrame.connType, peerInfo);
709     }
710     HandleLinkSeqCap(session, &hostSettingFrame);
711     DFileChooseCipherType(&hostSettingFrame, session);
712 }
713 
DFileSessionHandleSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr,uint8_t socketIndex)714 static void DFileSessionHandleSetting(DFileSession *session, DFileFrame *dFileFrame,
715     struct sockaddr_in *peerAddr, uint8_t socketIndex)
716 {
717     if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
718         DFileSessionHandleServerSetting(session, dFileFrame, peerAddr, socketIndex);
719     } else if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
720         DFileSessionHandleClientSetting(session, dFileFrame, peerAddr);
721     } else {
722         return;
723     }
724 }
725 
HandleWithoutSettingError(DFileSession * session,const struct sockaddr_in * peerAddr)726 static void HandleWithoutSettingError(DFileSession *session, const struct sockaddr_in *peerAddr)
727 {
728     List *pos = NULL;
729     DFileTrans *trans = NULL;
730 
731     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
732     if (peerInfo == NULL) {
733         DFILE_LOGE(TAG, "recv unknown peer rst, maybe be attacked");
734         return;
735     }
736 
737     LIST_FOR_EACH(pos, &session->dFileTransChain) {
738         trans = (DFileTrans *)pos;
739         /*
740              * when client recv peer RST NSTACKX_DFILE_WITHOUT_SETTING_ERROR, Set Trans MTU 0,
741              * and will reset after new setting negotion.
742              */
743         if (DFileTransSetMtu(trans, 0) != NSTACKX_EOK) {
744             DFILE_LOGE(TAG, "DFileTransSetMtu(trans, 0) failed %d", errno);
745         }
746     }
747 
748     LIST_FOR_EACH(pos, &session->peerInfoChain) {
749         peerInfo = (PeerInfo *)pos;
750         if (memcmp(&peerInfo->dstAddr, peerAddr, sizeof(struct sockaddr_in)) == 0 &&
751             peerInfo->session->sessionId == session->sessionId) {
752             TimerDelete(peerInfo->settingTimer);
753             peerInfo->settingTimer = NULL;
754             peerInfo->state = SETTING_NEGOTIATING;
755             DFILE_LOGD(TAG, "Send Setting Frame");
756             DFileSessionSendSetting(peerInfo);
757             break;
758         }
759     }
760 }
761 
DFileSessionHandleRst(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)762 static void DFileSessionHandleRst(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
763 {
764     uint16_t errCode = 0;
765     if (DecodeRstFrame((RstFrame *)dFileFrame, &errCode, NULL, NULL) != NSTACKX_EOK) {
766         return;
767     }
768 
769     uint16_t transId = ntohs(dFileFrame->header.transId);
770     DFILE_LOGD(TAG, "handle RST (%hu) frame, transId %hu", errCode, transId);
771 
772     switch (errCode) {
773         case NSTACKX_DFILE_WITHOUT_SETTING_ERROR:
774             HandleWithoutSettingError(session, peerAddr);
775             break;
776         default:
777             DFILE_LOGE(TAG, "Unspported error code %hu", errCode);
778             break;
779     }
780 }
781 
DFileSessionResolveBackPress(DFileSession * session,DataBackPressure backPress,uint32_t count)782 static void DFileSessionResolveBackPress(DFileSession *session, DataBackPressure backPress, uint32_t count)
783 {
784     uint32_t index;
785 
786     if (PthreadMutexLock(&session->backPressLock) != 0) {
787         DFILE_LOGE(TAG, "pthread backPressLock mutex lock failed");
788         return;
789     }
790 
791     if (backPress.recvListOverIo == 1) {
792         for (index = 0; index < count; index++) {
793             session->stopSendCnt[index]++;
794         }
795     } else {
796         for (index = 0; index < count; index++) {
797             session->stopSendCnt[index] = 0;
798         }
799     }
800 
801     if (PthreadMutexUnlock(&session->backPressLock) != 0) {
802         DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
803         return;
804     }
805 
806     return;
807 }
808 
DFileSessionHandleBackPressure(DFileSession * session,const DFileFrame * dFileFrame,const struct sockaddr_in * peerAddr)809 static void DFileSessionHandleBackPressure(DFileSession *session, const DFileFrame *dFileFrame,
810     const struct sockaddr_in *peerAddr)
811 {
812     DataBackPressure backPress;
813     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
814     if (peerInfo == NULL) {
815         DFILE_LOGE(TAG, "can't get valid peerinfo");
816         return;
817     }
818 
819     if (DecodeBackPressFrame((BackPressureFrame *)dFileFrame, &backPress) != NSTACKX_EOK) {
820         return;
821     }
822 
823     DFileSessionResolveBackPress(session, backPress, session->clientSendThreadNum);
824 
825     DFILE_LOGI(TAG, "handle back pressure recvListOverIo %u recvBufThreshold %u stopSendPeriod %u",
826         backPress.recvListOverIo, backPress.recvBufThreshold,
827         backPress.stopSendPeriod);
828 
829     return;
830 }
831 
CreateTrans(uint16_t transId,DFileSession * session,PeerInfo * peerInfo,uint8_t isSender)832 static DFileTrans *CreateTrans(uint16_t transId, DFileSession *session, PeerInfo *peerInfo, uint8_t isSender)
833 {
834     DFileTransPara transPara;
835 
836     if (peerInfo == NULL) {
837         return NULL;
838     }
839     (void)memset_s(&transPara, sizeof(transPara), 0, sizeof(transPara));
840     transPara.isSender = isSender;
841     transPara.transId = transId; /* for receiver, use transId of sender */
842     transPara.fileManager = session->fileManager;
843     transPara.writeHandle = DFileWriteHandle;
844     transPara.msgReceiver = DTransMsgReceiver;
845     transPara.connType = peerInfo->connType;
846     transPara.context = peerInfo;
847     transPara.session = session;
848     transPara.onRenameFile = session->onRenameFile;
849 
850     ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
851     return DFileTransCreate(&transPara);
852 }
853 
DFileSessionHandleFrame(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)854 static int32_t DFileSessionHandleFrame(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
855 {
856     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
857     if (peerInfo == NULL) {
858         DFILE_LOGI(TAG, "can't find peerInfo");
859         return NSTACKX_EFAILED;
860     }
861 
862     if (peerInfo->session->sessionType == DFILE_SESSION_TYPE_SERVER && peerInfo->state == SETTING_NEGOTIATING) {
863         peerInfo->state = SETTING_NEGOTIATED;
864         TimerDelete(peerInfo->settingTimer);
865         peerInfo->settingTimer = NULL;
866     }
867 
868     uint16_t transId = ntohs(dFileFrame->header.transId);
869     if (transId == 0) {
870         DFILE_LOGE(TAG, "transId is 0");
871         return NSTACKX_EFAILED;
872     }
873 
874     DFileTrans *trans = SearchDFileTransNode(&(session->dFileTransChain), transId);
875     if (trans == NULL) {
876         if (dFileFrame->header.type != NSTACKX_DFILE_FILE_HEADER_FRAME) {
877             /* Only HEADER frame can start dfile transfer (receiver) */
878             DFILE_LOGE(TAG, "trans %u is NULL && type is %u", transId, dFileFrame->header.type);
879             return NSTACKX_EFAILED;
880         }
881         if (IsTransIdDone(session, transId) == NSTACKX_EOK) {
882             return NSTACKX_EFAILED;
883         }
884         trans = CreateTrans(transId, session, peerInfo, NSTACKX_FALSE);
885         if (trans == NULL) {
886             DFileMsg data;
887             (void)memset_s(&data, sizeof(data), 0, sizeof(data));
888             data.errorCode = NSTACKX_ENOMEM;
889             NotifyMsgRecver(session, DFILE_ON_FATAL_ERROR, &data);
890             DFILE_LOGE(TAG, "trans is NULL");
891             return NSTACKX_EFAILED;
892         }
893         if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
894             DFILE_LOGE(TAG, "set trans mtu failed");
895         }
896         CalculateSessionTransferRatePrepare(session);
897         ListInsertTail(&(session->dFileTransChain), &(trans->list));
898         peerInfo->currentTransCount++;
899     }
900 
901     return HandleDFileFrame(trans, dFileFrame);
902 }
903 
DFileWriteHandle(const uint8_t * frame,size_t len,void * context)904 int32_t DFileWriteHandle(const uint8_t *frame, size_t len, void *context)
905 {
906     PeerInfo *peerInfo = context;
907     DFileSession *session = peerInfo->session;
908     QueueNode *queueNode = NULL;
909     struct sockaddr_in peerAddr;
910 
911     peerAddr = peerInfo->dstAddr;
912     queueNode = CreateQueueNode(frame, len, &peerAddr, peerInfo->socketIndex);
913     if (queueNode == NULL) {
914         return NSTACKX_ENOMEM;
915     }
916 
917     if (PthreadMutexLock(&session->outboundQueueLock) != 0) {
918         DFILE_LOGE(TAG, "pthread mutex lock failed");
919         DestroyQueueNode(queueNode);
920         return NSTACKX_EFAILED;
921     }
922     ListInsertTail(&session->outboundQueue, &queueNode->list);
923     session->outboundQueueSize++;
924     if (PthreadMutexUnlock(&session->outboundQueueLock) != 0) {
925         /* Don't need to free node, as it's mount to outboundQueue */
926         DFILE_LOGE(TAG, "pthread mutex unlock failed");
927         return NSTACKX_EFAILED;
928     }
929     SemPost(&session->outboundQueueWait[peerInfo->socketIndex]);
930     return (int32_t)len;
931 }
932 
BindMainLoopToTargetCpu(void)933 static void BindMainLoopToTargetCpu(void)
934 {
935     int32_t cpu;
936     int32_t cpus = GetCpuNum();
937     if (cpus >= FIRST_CPU_NUM_LEVEL) {
938         return;
939     } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
940         cpu = CPU_IDX_0;
941     } else {
942         return;
943     }
944     StartThreadBindCore(cpu);
945 }
946 
GetEpollWaitTimeOut(DFileSession * session)947 static int64_t GetEpollWaitTimeOut(DFileSession *session)
948 {
949     int64_t minTimeout = DEFAULT_WAIT_TIME_MS;
950     if (session->mainLoopActiveReadFlag && session->inboundQueueSize) {
951         minTimeout = 0;
952         return minTimeout;
953     }
954     List *pos = NULL;
955     DFileTrans *trans = NULL;
956     int64_t timeout;
957     LIST_FOR_EACH(pos, &session->dFileTransChain) {
958         trans = (DFileTrans *)pos;
959         timeout = DFileTransGetTimeout(trans);
960         if (timeout >= 0 && timeout < minTimeout) {
961             minTimeout = timeout;
962         }
963     }
964     if (minTimeout > DEFAULT_WAIT_TIME_MS) {
965         minTimeout = DEFAULT_WAIT_TIME_MS;
966     }
967     return minTimeout;
968 }
969 
ProcessSessionTrans(const DFileSession * session,uint16_t exceptTransId)970 static void ProcessSessionTrans(const DFileSession *session, uint16_t exceptTransId)
971 {
972     List *tmp = NULL;
973     List *pos = NULL;
974     LIST_FOR_EACH_SAFE(pos, tmp, &session->dFileTransChain) {
975         DFileTrans *trans = (DFileTrans *)pos;
976         if (trans->transId != exceptTransId) {
977             DFileTransProcess(trans);
978         }
979     }
980 }
981 
NotifyBindPort(const DFileSession * session)982 static void NotifyBindPort(const DFileSession *session)
983 {
984     DFileMsg data;
985     int32_t socketNum;
986     (void)memset_s(&data, sizeof(data), 0, sizeof(data));
987     socketNum = 1;
988 
989     for (int i = 0; i < socketNum; i++) {
990         data.sockAddr[i].sin_port = ntohs(session->socket[i]->srcAddr.sin_port);
991         data.sockAddr[i].sin_addr.s_addr = ntohl(session->socket[i]->srcAddr.sin_addr.s_addr);
992     }
993     NotifyMsgRecver(session, DFILE_ON_BIND, &data);
994 }
995 
DFileMainLoop(void * arg)996 void *DFileMainLoop(void *arg)
997 {
998     DFileSession *session = arg;
999     int32_t ret = NSTACKX_EOK;
1000     DFileMsg msgData;
1001     (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
1002     uint8_t isBind = NSTACKX_FALSE;
1003     DFILE_LOGI(TAG, "main thread start");
1004     SetThreadName(DFFILE_MAIN_THREAD_NAME);
1005     SetMaximumPriorityForThread();
1006     SetTidToBindInfo(session, POS_MAIN_THERD_START);
1007     NotifyBindPort(session);
1008     while (!session->closeFlag) {
1009         int64_t minTimeout = GetEpollWaitTimeOut(session);
1010         ret = EpollLoop(session->epollfd, (int32_t)minTimeout);
1011         if (ret == NSTACKX_EFAILED) {
1012             DFILE_LOGE(TAG, "epoll wait failed");
1013             break;
1014         }
1015         if (isBind == NSTACKX_FALSE && session->transFlag == NSTACKX_TRUE) {
1016             BindMainLoopToTargetCpu();
1017             isBind = NSTACKX_TRUE;
1018         }
1019         ProcessSessionTrans(session, 0);
1020         if (session->mainLoopActiveReadFlag && session->inboundQueueSize) {
1021             session->partReadFlag = NSTACKX_TRUE;
1022             ReadEventHandle(session);
1023             session->partReadFlag = NSTACKX_FALSE;
1024         }
1025     }
1026 
1027     if (ret == NSTACKX_EFAILED || DFileSessionCheckFatalFlag(session)) {
1028         msgData.errorCode = NSTACKX_EFAILED;
1029         NotifyMsgRecver(session, DFILE_ON_FATAL_ERROR, &msgData);
1030     }
1031 
1032     /* Notify sender thread to terminate */
1033     PostOutboundQueueWait(session);
1034 
1035     /* Unblock "select" and notify receiver thread to terminate */
1036     NotifyPipeEvent(session);
1037     return NULL;
1038 }
1039 
AmendPeerInfoSendRate(PeerInfo * peerInfo)1040 static void AmendPeerInfoSendRate(PeerInfo *peerInfo)
1041 {
1042     peerInfo->amendSendRate = peerInfo->sendRate;
1043     DFILE_LOGI(TAG, "current: sendrate %u, realsendframerate %u---new amendSendRate %d",
1044         peerInfo->sendRate, peerInfo->sendFrameRate, peerInfo->amendSendRate);
1045 }
1046 
DFileSendCalculateRate(DFileSession * session,PeerInfo * peerInfo)1047 static void DFileSendCalculateRate(DFileSession *session, PeerInfo *peerInfo)
1048 {
1049     uint64_t sendCount;
1050 
1051     if (session->sessionType != DFILE_SESSION_TYPE_CLIENT) {
1052         return;
1053     }
1054 
1055     struct timespec nowTime;
1056     ClockGetTime(CLOCK_MONOTONIC, &nowTime);
1057     uint64_t measureElapse = GetTimeDiffUs(&nowTime, &peerInfo->startTime);
1058     /* just calculate io rate */
1059     if (measureElapse > peerInfo->rateStateInterval) {
1060         /* ONLY PEER 0 calculate io rate */
1061         sendCount = (uint64_t)peerInfo->sendCount;
1062         if (peerInfo->socketIndex == 0) {
1063             session->fileManager->iorRate = (uint32_t)(session->fileManager->iorBytes *
1064                 NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1065             DFILE_LOGI(TAG, "IO read rate: %u MB/s send list full times %u", session->fileManager->iorRate,
1066                 session->fileManager->sendListFullTimes);
1067             session->fileManager->sendListFullTimes = 0;
1068             session->fileManager->iorBytes = 0;
1069         }
1070 
1071         peerInfo->sendFrameRate = (uint32_t)(sendCount  * DATA_FRAME_SEND_INTERVAL_US / measureElapse);
1072         peerInfo->sendCountRateMB = (uint32_t)(sendCount * peerInfo->dataFrameSize *
1073             NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1074         if (peerInfo->qdiscSearchNum != 0) {
1075             peerInfo->qdiscAveLeft = peerInfo->qdiscAveLeft / peerInfo->qdiscSearchNum;
1076         }
1077 
1078         DFILE_LOGI(TAG, "framesize %u maxsendrate %u sendRate %u, amendSendRate %d sendCount %llu,"
1079                   "measureElapse %llu sendFrameRate %u %uMB/s,"
1080                   "total send block num %llu eAgainCount %u send list empty times %u sleep times %u, noPendingData %u,"
1081                   "min qdisc %u max qdisc %u search num %u ave qdisc %u"
1082                   "totalRecvBlocks %llu socket:%u "
1083                   "overRun %llu maxRetryCountPerSec %u maxRetryCountLastSec %u wlanCatagory %u",
1084             peerInfo->dataFrameSize, peerInfo->maxSendRate, peerInfo->sendRate, peerInfo->amendSendRate,
1085             peerInfo->sendCount, measureElapse, peerInfo->sendFrameRate, peerInfo->sendCountRateMB,
1086             NSTACKX_ATOM_FETCH(&(session->totalSendBlocks)), peerInfo->eAgainCount,
1087             NSTACKX_ATOM_FETCH(&(session->sendBlockListEmptyTimes)), session->sleepTimes,
1088             NSTACKX_ATOM_FETCH(&(session->noPendingDataTimes)), peerInfo->qdiscMinLeft,
1089             peerInfo->qdiscMaxLeft, peerInfo->qdiscSearchNum, peerInfo->qdiscAveLeft,
1090             NSTACKX_ATOM_FETCH(&(session->totalRecvBlocks)), peerInfo->socketIndex,
1091             peerInfo->overRun, peerInfo->maxRetryCountPerSec, peerInfo->maxRetryCountLastSec, session->wlanCatagory);
1092         AmendPeerInfoSendRate(peerInfo);
1093         ClearSessionStats(session);
1094         ClearPeerinfoStats(peerInfo);
1095         ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1096     }
1097 }
1098 
UpdateAllTransRetryCount(DFileSession * session,PeerInfo * peerInfo)1099 void UpdateAllTransRetryCount(DFileSession *session, PeerInfo *peerInfo)
1100 {
1101     List *pos = NULL;
1102     DFileTrans *trans = NULL;
1103     uint32_t allRetryCount = 0;
1104     LIST_FOR_EACH(pos, &session->dFileTransChain) { /* for client , only one peer */
1105         trans = (DFileTrans *)pos;
1106         allRetryCount += trans->retryCount;
1107     }
1108     peerInfo->allDtransRetryCount = allRetryCount;
1109 }
1110 
DFileRecvCalculateRate(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)1111 static void DFileRecvCalculateRate(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
1112 {
1113     struct timespec nowTime;
1114     uint64_t recvCount;
1115     uint32_t timeOut;
1116 
1117     if (session->sessionType != DFILE_SESSION_TYPE_SERVER ||
1118         dFileFrame->header.type != NSTACKX_DFILE_FILE_DATA_FRAME) {
1119         return;
1120     }
1121 
1122     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
1123     if (peerInfo == NULL) {
1124         return;
1125     }
1126 
1127     timeOut = (peerInfo->connType == CONNECT_TYPE_P2P) ? NSTACKX_P2P_MAX_CONTROL_FRAME_TIMEOUT :
1128         NSTACKX_WLAN_MAX_CONTROL_FRAME_TIMEOUT;
1129 
1130     peerInfo->recvCount++;
1131     ClockGetTime(CLOCK_MONOTONIC, &nowTime);
1132     uint64_t measureElapse = GetTimeDiffUs(&nowTime, &session->measureBefore);
1133     if (measureElapse > peerInfo->rateStateInterval) {
1134         session->fileManager->iowRate = (uint32_t)(session->fileManager->iowBytes *
1135             NSTACKX_MICRO_TICKS / measureElapse / DFILE_KILOBYTES);
1136         session->fileManager->iowCount = session->fileManager->iowBytes / peerInfo->dataFrameSize *
1137             (NSTACKX_MILLI_TICKS * timeOut - peerInfo->rateStateInterval) / measureElapse;
1138         if (session->fileManager->iowRate > session->fileManager->iowMaxRate) {
1139             session->fileManager->iowMaxRate = session->fileManager->iowRate;
1140         }
1141         DFILE_LOGI(TAG, "measureElapse %llu iowBytes %llu iowCount %llu IO write rate : %u KB/s", measureElapse,
1142             session->fileManager->iowBytes, session->fileManager->iowCount, session->fileManager->iowRate);
1143         session->fileManager->iowBytes = 0;
1144         ClockGetTime(CLOCK_MONOTONIC, &session->measureBefore);
1145     }
1146 
1147     measureElapse = GetTimeDiffUs(&nowTime, &peerInfo->startTime);
1148     if (measureElapse > peerInfo->rateStateInterval) {
1149         recvCount = (uint64_t)peerInfo->recvCount;
1150         peerInfo->recvFrameRate = (uint32_t)(recvCount * DATA_FRAME_SEND_INTERVAL_US / measureElapse);
1151         peerInfo->recvCountRateMB = (uint32_t)(recvCount * peerInfo->dataFrameSize *
1152             NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1153         peerInfo->recvCount = 0;
1154         ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1155     }
1156 }
1157 
CheckElapseTime(const struct timespec * before,uint64_t overRun)1158 static uint64_t CheckElapseTime(const struct timespec *before, uint64_t overRun)
1159 {
1160     struct timespec now;
1161     ClockGetTime(CLOCK_MONOTONIC, &now);
1162     uint64_t elapseUs = GetTimeDiffUs(&now, before);
1163     elapseUs += overRun;
1164     if (elapseUs < DATA_FRAME_SEND_INTERVAL_US) {
1165         if (usleep((useconds_t)((uint64_t)DATA_FRAME_SEND_INTERVAL_US - elapseUs)) != NSTACKX_EOK) {
1166             DFILE_LOGE(TAG, "usleep(DATA_FRAME_SEND_INTERVAL_US - elapseUs) failed %d", errno);
1167         }
1168         return 0;
1169     } else {
1170         uint64_t delta = (elapseUs - DATA_FRAME_SEND_INTERVAL_US);
1171         return delta;
1172     }
1173 }
1174 
BindClientSendThreadToTargetCpu(uint32_t idx)1175 static void BindClientSendThreadToTargetCpu(uint32_t idx)
1176 {
1177     int32_t cpu;
1178     int32_t cpus = GetCpuNum();
1179     if (cpus >= FIRST_CPU_NUM_LEVEL) {
1180         return;
1181     } else if (cpus >= SECOND_CPU_NUM_LEVEL) {
1182         cpu = CPU_IDX_1 + (int32_t)idx;
1183     } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
1184         cpu = CPU_IDX_1;
1185     } else {
1186         return;
1187     }
1188     if (cpu > cpus) {
1189         cpu = cpus - 1;
1190     }
1191     StartThreadBindCore(cpu);
1192 }
1193 
DFileSenderUpdateMeasureTime(DFileSession * session,uint8_t socketIndex)1194 static void DFileSenderUpdateMeasureTime(DFileSession *session, uint8_t socketIndex)
1195 {
1196     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
1197         PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
1198         if (peerInfo == NULL) {
1199             return;
1200         }
1201         ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1202         BindClientSendThreadToTargetCpu(0);
1203     }
1204 }
1205 
TerminateMainThreadFatalInner(void * arg)1206 static void TerminateMainThreadFatalInner(void *arg)
1207 {
1208     DFileSession *session = (DFileSession *)arg;
1209     DFileSessionSetFatalFlag(session);
1210 }
1211 
PostFatalEvent(DFileSession * session)1212 static void PostFatalEvent(DFileSession *session)
1213 {
1214     if (PostEvent(&session->eventNodeChain, session->epollfd, TerminateMainThreadFatalInner, session) != NSTACKX_EOK) {
1215         DFILE_LOGE(TAG, "PostEvent TerminateMainThreadFatalInner failed");
1216         DFileSessionSetFatalFlag(session);
1217     }
1218 }
1219 
GetSocketWaitMs(uint32_t threadNum)1220 static uint32_t GetSocketWaitMs(uint32_t threadNum)
1221 {
1222     if (threadNum > 1) {
1223         return MULTI_THREADS_SOCKET_WAIT_TIME_MS;
1224     }
1225     return DEFAULT_WAIT_TIME_MS;
1226 }
1227 
SetSendThreadName(uint32_t threadIdx)1228 static void SetSendThreadName(uint32_t threadIdx)
1229 {
1230     char name[MAX_THREAD_NAME_LEN] = {0};
1231     if (sprintf_s(name, sizeof(name), "%s%u", DFFILE_SEND_THREAD_NAME_PREFIX, threadIdx) < 0) {
1232         DFILE_LOGE(TAG, "sprintf send thead name failed");
1233     }
1234     SetThreadName(name);
1235 }
1236 
1237 typedef struct {
1238     struct DFileSession *session;
1239     uint32_t threadIdx;
1240 }SendThreadCtx;
1241 
DFileAddiSenderHandle(void * arg)1242 static void *DFileAddiSenderHandle(void *arg)
1243 {
1244     SendThreadCtx *sendThreadCtx = (SendThreadCtx *)arg;
1245     struct DFileSession *session = sendThreadCtx->session;
1246     uint32_t threadIdx = sendThreadCtx->threadIdx;
1247     free(sendThreadCtx);
1248     int32_t ret = NSTACKX_EOK;
1249     DFILE_LOGI(TAG, "send thread %u start", threadIdx);
1250     SetSendThreadName(threadIdx);
1251     SetTidToBindInfo(session, threadIdx + POS_SEND_THERD_START);
1252     List unsent;
1253     uint8_t canWrite = NSTACKX_FALSE;
1254     uint32_t socketWaitMs = GetSocketWaitMs(session->clientSendThreadNum);
1255     BindClientSendThreadToTargetCpu(threadIdx + 1);
1256     ListInitHead(&unsent);
1257     while (!session->addiSenderCloseFlag) {
1258         if (ListIsEmpty(&unsent) && !FileManagerHasPendingData(session->fileManager)) {
1259             NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
1260             SemWait(&session->sendThreadPara[threadIdx].sendWait);
1261             if (session->addiSenderCloseFlag) {
1262                 break;
1263             }
1264         }
1265         if (ret == NSTACKX_EAGAIN) {
1266             ret = WaitSocketEvent(NULL, session->socket[0]->sockfd, socketWaitMs, NULL, &canWrite);
1267             if (ret != NSTACKX_EOK || session->closeFlag) {
1268                 break;
1269             }
1270             if (!canWrite) {
1271                 ret = NSTACKX_EAGAIN;
1272                 continue;
1273             }
1274         }
1275         ret = SendDataFrame(session, &unsent, threadIdx, 0);
1276         if ((ret < 0 && ret != NSTACKX_EAGAIN) || session->closeFlag) {
1277             break;
1278         }
1279         if (ret == NSTACKX_EAGAIN) {
1280             continue;
1281         }
1282         SemWait(&session->sendThreadPara[threadIdx].semNewCycle);
1283     }
1284     if (ret < 0 && ret != NSTACKX_EAGAIN) {
1285         PostFatalEvent(session);
1286     }
1287     DestroyIovList(&unsent, session, threadIdx);
1288     return NULL;
1289 }
1290 
CloseAddiSendThread(struct DFileSession * session)1291 static void CloseAddiSendThread(struct DFileSession *session)
1292 {
1293     if (session->sessionType == DFILE_SESSION_TYPE_SERVER || session->clientSendThreadNum <= 1) {
1294         return;
1295     }
1296     session->addiSenderCloseFlag = NSTACKX_TRUE;
1297     for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
1298         SemPost(&session->sendThreadPara[i].sendWait);
1299         SemPost(&session->sendThreadPara[i].semNewCycle);
1300         PthreadJoin(session->sendThreadPara[i].senderTid, NULL);
1301         SemDestroy(&session->sendThreadPara[i].sendWait);
1302         SemDestroy(&session->sendThreadPara[i].semNewCycle);
1303     }
1304 }
1305 
ErrorHandle(struct DFileSession * session,uint16_t cnt)1306 static void ErrorHandle(struct DFileSession *session, uint16_t cnt)
1307 {
1308     while (cnt > 0) {
1309         SemPost(&session->sendThreadPara[cnt - 1].sendWait);
1310         SemPost(&session->sendThreadPara[cnt - 1].semNewCycle);
1311         PthreadJoin(session->sendThreadPara[cnt - 1].senderTid, NULL);
1312         SemDestroy(&session->sendThreadPara[cnt - 1].sendWait);
1313         SemDestroy(&session->sendThreadPara[cnt - 1].semNewCycle);
1314         cnt--;
1315     }
1316 }
1317 
CreateAddiSendThread(struct DFileSession * session)1318 static int32_t CreateAddiSendThread(struct DFileSession *session)
1319 {
1320     uint16_t i;
1321     if (session->sessionType == DFILE_SESSION_TYPE_SERVER || session->clientSendThreadNum <= 1) {
1322         return NSTACKX_EOK;
1323     }
1324     session->addiSenderCloseFlag = NSTACKX_FALSE;
1325     for (i = 0; i < session->clientSendThreadNum - 1; i++) {
1326         SendThreadCtx *sendThreadCtx = (SendThreadCtx *)calloc(1, sizeof(SendThreadCtx));
1327         if (sendThreadCtx == NULL) {
1328             goto L_ERR_SENDER_THREAD;
1329         }
1330 
1331         if (SemInit(&session->sendThreadPara[i].sendWait, 0, 0) != 0) {
1332             free(sendThreadCtx);
1333             goto L_ERR_SENDER_THREAD;
1334         }
1335 
1336         if (SemInit(&session->sendThreadPara[i].semNewCycle, 0, 0) != 0) {
1337             SemDestroy(&session->sendThreadPara[i].sendWait);
1338             free(sendThreadCtx);
1339             goto L_ERR_SENDER_THREAD;
1340         }
1341 
1342         sendThreadCtx->session = session;
1343         sendThreadCtx->threadIdx = i;
1344         if (PthreadCreate(&(session->sendThreadPara[i].senderTid), NULL, DFileAddiSenderHandle, sendThreadCtx)) {
1345             DFILE_LOGE(TAG, "Create sender thread failed");
1346             SemDestroy(&session->sendThreadPara[i].sendWait);
1347             SemDestroy(&session->sendThreadPara[i].semNewCycle);
1348             free(sendThreadCtx);
1349             goto L_ERR_SENDER_THREAD;
1350         }
1351     }
1352     return NSTACKX_EOK;
1353 
1354 L_ERR_SENDER_THREAD:
1355     session->addiSenderCloseFlag = NSTACKX_TRUE;
1356     ErrorHandle(session, i);
1357     return NSTACKX_EFAILED;
1358 }
1359 
UpdatePeerinfoQdiscInfo(PeerInfo * peerInfo,uint32_t qDiscLeft)1360 static void UpdatePeerinfoQdiscInfo(PeerInfo *peerInfo, uint32_t qDiscLeft)
1361 {
1362     if (peerInfo->qdiscMinLeft == 0) {
1363         peerInfo->qdiscMinLeft = (uint16_t)qDiscLeft;
1364     } else if (peerInfo->qdiscMinLeft > qDiscLeft) {
1365         peerInfo->qdiscMinLeft = (uint16_t)qDiscLeft;
1366     }
1367     if (peerInfo->qdiscMaxLeft < qDiscLeft) {
1368         peerInfo->qdiscMaxLeft = (uint16_t)qDiscLeft;
1369     }
1370     peerInfo->qdiscSearchNum++;
1371     peerInfo->qdiscAveLeft += qDiscLeft;
1372 }
1373 
UpdatePeerinfoAmendSendrateByQdisc(PeerInfo * peerInfo)1374 static void UpdatePeerinfoAmendSendrateByQdisc(PeerInfo *peerInfo)
1375 {
1376     uint32_t qDiscLeft;
1377     uint32_t qDiscLeftSendRate;
1378 
1379     if (GetQdiscLen(peerInfo->localInterfaceName, ROOT_QUEUE, &qDiscLeft) == NSTACKX_EOK) {
1380         if (peerInfo->mtuInuse == 0) {
1381             return;
1382         }
1383         uint32_t mtuNumInOneFrame = peerInfo->dataFrameSize / peerInfo->mtuInuse;
1384         if (mtuNumInOneFrame == 0) {
1385             mtuNumInOneFrame = 1;
1386         }
1387         UpdatePeerinfoQdiscInfo(peerInfo, qDiscLeft);
1388         qDiscLeftSendRate = qDiscLeft / mtuNumInOneFrame;
1389         if (peerInfo->sendRate >= qDiscLeftSendRate) {
1390             peerInfo->amendSendRate = (int32_t)qDiscLeftSendRate;
1391         } else {
1392             peerInfo->amendSendRate = peerInfo->sendRate;
1393         }
1394     }
1395     return;
1396 }
1397 
UpdateInfoAfterSend(DFileSession * session,PeerInfo * peerInfo,int32_t curAmendSendRate,struct timespec * before,uint8_t socketIndex)1398 static void UpdateInfoAfterSend(DFileSession *session, PeerInfo *peerInfo, int32_t curAmendSendRate,
1399     struct timespec *before, uint8_t socketIndex)
1400 {
1401     session->cycleRunning[socketIndex] = NSTACKX_FALSE;
1402     if (curAmendSendRate > 0 && peerInfo->intervalSendCount >= (uint32_t)curAmendSendRate) {
1403         if (peerInfo->decreaseStatus == 1) {
1404             peerInfo->overRun = 0;
1405             peerInfo->decreaseStatus = 0;
1406         }
1407         peerInfo->overRun = CheckElapseTime(before, peerInfo->overRun);
1408         if (peerInfo->overRun == 0) {
1409             session->sleepTimes++;
1410         }
1411     }
1412     peerInfo->intervalSendCount = 0;
1413 
1414     UpdatePeerinfoAmendSendrateByQdisc(peerInfo);
1415 }
1416 
DFileSessionSendFrame(DFileSession * session,QueueNode ** preQueueNode,List * unsent,struct timespec * before,uint8_t socketIndex)1417 static int32_t DFileSessionSendFrame(DFileSession *session, QueueNode **preQueueNode, List *unsent,
1418                                      struct timespec *before, uint8_t socketIndex)
1419 {
1420     int32_t ret;
1421 
1422     if (CapsTcp(session) && !ListIsEmpty(unsent)) {
1423         session->sendRemain = 1;
1424         ret = SendDataFrame(session, unsent, (uint32_t)(session->clientSendThreadNum - 1), socketIndex);
1425         session->sendRemain = 0;
1426         if ((ret == NSTACKX_EFAILED || ret == NSTACKX_EAGAIN) || session->closeFlag) {
1427             DFILE_LOGI(TAG, "ret is %d", ret);
1428             return ret;
1429         }
1430     }
1431 
1432     ret = SendOutboundFrame(session, preQueueNode);
1433     if (session->sessionType != DFILE_SESSION_TYPE_CLIENT) {
1434         return ret;
1435     }
1436     PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
1437     if (peerInfo == NULL) {
1438         DFILE_LOGE(TAG, "can't get valid peerinfo");
1439         return NSTACKX_EFAILED;
1440     }
1441     if (peerInfo->mtuInuse == 0) {
1442         return ret;
1443     }
1444 
1445     if (ret == NSTACKX_EOK) {
1446         if (!session->cycleRunning[socketIndex]) {
1447             session->cycleRunning[socketIndex] = NSTACKX_TRUE;
1448         }
1449         ret = SendDataFrame(session, unsent, (uint32_t)(session->clientSendThreadNum - 1), socketIndex);
1450     }
1451     int32_t curAmendSendRate = peerInfo->amendSendRate;
1452     DFileSendCalculateRate(session, peerInfo);
1453 
1454     if ((ret == NSTACKX_EFAILED || ret == NSTACKX_EAGAIN) || session->closeFlag) {
1455         DFILE_LOGI(TAG, "ret is %d and peerInfo->intervalSendCount is %u", ret, peerInfo->intervalSendCount);
1456         return ret;
1457     }
1458 
1459     UpdateInfoAfterSend(session, peerInfo, curAmendSendRate, before, socketIndex);
1460 
1461     for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
1462         SemPost(&session->sendThreadPara[i].semNewCycle);
1463     }
1464     return ret;
1465 }
1466 
WaitNewSendData(const QueueNode * queueNode,const List * unsent,DFileSession * session,uint8_t socketIndex)1467 static void WaitNewSendData(const QueueNode *queueNode, const List *unsent, DFileSession *session, uint8_t socketIndex)
1468 {
1469     if (queueNode == NULL && ListIsEmpty(unsent) && !FileManagerHasPendingData(session->fileManager) &&
1470         !session->outboundQueueSize) {
1471         NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
1472         SemWait(&session->outboundQueueWait[socketIndex]);
1473     }
1474 }
1475 
DFileSenderPre(DFileSession * session,uint8_t socketIndex)1476 static void DFileSenderPre(DFileSession *session, uint8_t socketIndex)
1477 {
1478     SetSendThreadName((uint32_t)(session->clientSendThreadNum + socketIndex - 1));
1479     DFileSenderUpdateMeasureTime(session, socketIndex);
1480     SetMaximumPriorityForThread();
1481     uint32_t pos = (uint32_t)(session->clientSendThreadNum - 1 + socketIndex + POS_SEND_THERD_START);
1482     if (pos >= POS_SEND_THERD_START + NSTACKX_MAX_CLIENT_SEND_THREAD_NUM) {
1483         return;
1484     }
1485 
1486     SetTidToBindInfo(session, pos);
1487 }
1488 
DFileSenderClose(DFileSession * session,QueueNode * queueNode,List * unsent,void * arg)1489 void DFileSenderClose(DFileSession *session, QueueNode *queueNode, List *unsent, void *arg)
1490 {
1491     DFILE_LOGI(TAG, "DFileSendCalculateRate: total send block num %llu.", session->totalSendBlocks);
1492     CloseAddiSendThread(session);
1493     DestroyIovList(unsent, session, session->clientSendThreadNum - 1U);
1494     DestroyQueueNode(queueNode);
1495     free(arg);
1496     return;
1497 }
1498 
DFileSenderHandle(void * arg)1499 void *DFileSenderHandle(void *arg)
1500 {
1501     DFileSession *session = ((SenderThreadPara*)arg)->session;
1502     uint8_t socketIndex = ((SenderThreadPara*)arg)->socketIndex;
1503     QueueNode *queueNode = NULL;
1504     List unsent;
1505     int32_t ret = NSTACKX_EOK;
1506     struct timespec before;
1507     uint8_t canWrite = NSTACKX_FALSE;
1508     uint32_t socketWaitMs = GetSocketWaitMs(session->clientSendThreadNum);
1509     uint8_t isBind = NSTACKX_FALSE;
1510 
1511     if (CreateAddiSendThread(session) != NSTACKX_EOK) {
1512         PostFatalEvent(session);
1513         return NULL;
1514     }
1515     ListInitHead(&unsent);
1516     DFileSenderPre(session, socketIndex);
1517     while (!session->closeFlag) {
1518         WaitNewSendData(queueNode, &unsent, session, socketIndex);
1519         if (session->closeFlag) {
1520             break;
1521         }
1522         if (!session->cycleRunning[socketIndex]) {
1523             ClockGetTime(CLOCK_MONOTONIC, &before);
1524         }
1525         if (ret == NSTACKX_EAGAIN) {
1526             ret = WaitSocketEvent(NULL, session->socket[socketIndex]->sockfd, socketWaitMs, NULL, &canWrite);
1527             if (ret != NSTACKX_EOK || session->closeFlag) {
1528                 break;
1529             }
1530             if (!canWrite) {
1531                 ret = NSTACKX_EAGAIN;
1532                 continue;
1533             }
1534         }
1535         if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && isBind == NSTACKX_FALSE &&
1536             session->transFlag == NSTACKX_TRUE) {
1537             BindClientSendThreadToTargetCpu(0);
1538             isBind = NSTACKX_TRUE;
1539         }
1540         ret = DFileSessionSendFrame(session, &queueNode, &unsent, &before, socketIndex);
1541         if (ret < 0 && ret != NSTACKX_EAGAIN) {
1542             PeerShuttedEvent();
1543             PostFatalEvent(session);
1544             break;
1545         }
1546     }
1547     DFileSenderClose(session, queueNode, &unsent, arg);
1548     return NULL;
1549 }
1550 
ClearDFileFrameList(List * head)1551 static void ClearDFileFrameList(List *head)
1552 {
1553     if (head == NULL || ListIsEmpty(head)) {
1554         return;
1555     }
1556     List *tmp = NULL;
1557     List *pos = NULL;
1558     LIST_FOR_EACH_SAFE(pos, tmp, head) {
1559         QueueNode *node = (QueueNode *)pos;
1560         ListRemoveNode(&node->list);
1561         DestroyQueueNode(node);
1562     }
1563 }
1564 
CheckDfileType(uint8_t type)1565 static inline int32_t CheckDfileType(uint8_t type)
1566 {
1567     if (type >= NSTACKX_DFILE_TYPE_MAX || type < NSTACKX_DFILE_FILE_HEADER_FRAME) {
1568         return NSTACKX_EFAILED;
1569     }
1570     return NSTACKX_EOK;
1571 }
1572 
CheckSessionType(DFileSessionType type)1573 static inline int32_t CheckSessionType(DFileSessionType type)
1574 {
1575     if (type < DFILE_SESSION_TYPE_CLIENT || type > DFILE_SESSION_TYPE_SERVER) {
1576         return NSTACKX_EFAILED;
1577     }
1578     return NSTACKX_EOK;
1579 }
1580 
ProcessDFileFrameList(DFileSession * session,List * head)1581 static void ProcessDFileFrameList(DFileSession *session, List *head)
1582 {
1583     QueueNode *queueNode = NULL;
1584     DFileFrame *dFileFrame = NULL;
1585     struct sockaddr_in *peerAddr = NULL;
1586     int32_t handleFrameRet = NSTACKX_EOK;
1587     while (!ListIsEmpty(head) && !session->closeFlag) {
1588         queueNode = (QueueNode *)ListPopFront(head);
1589         if (queueNode == NULL) {
1590             continue;
1591         }
1592         dFileFrame = (DFileFrame *)(queueNode->frame);
1593         peerAddr = &queueNode->peerAddr;
1594         uint8_t type = dFileFrame->header.type;
1595 
1596         if (ntohs(dFileFrame->header.length) > NSTACKX_MAX_FRAME_SIZE - sizeof(DFileFrameHeader)) {
1597             DFILE_LOGE(TAG, "header length %u is too big", ntohs(dFileFrame->header.length));
1598             DestroyQueueNode(queueNode);
1599             continue;
1600         }
1601         if (CheckDfileType(dFileFrame->header.type) != NSTACKX_EOK ||
1602             CheckSessionType(session->sessionType) != NSTACKX_EOK) {
1603             handleFrameRet = NSTACKX_EFAILED;
1604         } else if (dFileFrame->header.type == NSTACKX_DFILE_SETTING_FRAME) {
1605             DFileSessionHandleSetting(session, dFileFrame, peerAddr, queueNode->socketIndex);
1606         } else if (dFileFrame->header.type == NSTACKX_DFILE_RST_FRAME &&
1607                    dFileFrame->header.transId == 0) {
1608             DFileSessionHandleRst(session, dFileFrame, peerAddr);
1609         } else if (dFileFrame->header.type == NSTACKX_DFILE_FILE_BACK_PRESSURE_FRAME) {
1610             DFileSessionHandleBackPressure(session, dFileFrame, peerAddr);
1611         } else {
1612             /*
1613              * For NSTACKX_DFILE_FILE_DATA_FRAME, "dFileFrame" may be free when handling failed, and become invalid.
1614              */
1615             handleFrameRet = DFileSessionHandleFrame(session, dFileFrame, peerAddr);
1616         }
1617 
1618         if (handleFrameRet != NSTACKX_EOK || type != NSTACKX_DFILE_FILE_DATA_FRAME) {
1619             /* For FILE_DATA frame, the memory is passed to file manager. */
1620             free(queueNode->frame);
1621             queueNode->frame = NULL;
1622         }
1623         free(queueNode);
1624     }
1625     ClearDFileFrameList(head);
1626 }
1627 
ReadEventHandle(void * arg)1628 static void ReadEventHandle(void *arg)
1629 {
1630     DFileSession *session = arg;
1631     List newHead;
1632     ListInitHead(&newHead);
1633     struct timespec before, now;
1634     if (!session->partReadFlag) {
1635         NSTACKX_ATOM_FETCH_DEC(&session->unprocessedReadEventCount);
1636     } else {
1637         ClockGetTime(CLOCK_MONOTONIC, &before);
1638     }
1639     while (session->inboundQueueSize && !session->closeFlag) {
1640         if (session->partReadFlag) {
1641             ClockGetTime(CLOCK_MONOTONIC, &now);
1642             if (GetTimeDiffMs(&now, &before) >= DEFAULT_WAIT_TIME_MS) {
1643                 break;
1644             }
1645         }
1646         if (PthreadMutexLock(&session->inboundQueueLock) != 0) {
1647             DFILE_LOGE(TAG, "PthreadMutexLock error");
1648             return;
1649         }
1650         ListMove(&session->inboundQueue, &newHead);
1651         session->recvBlockNumInner += session->inboundQueueSize;
1652         session->inboundQueueSize = 0;
1653         if (PthreadMutexUnlock(&session->inboundQueueLock) != 0) {
1654             DFILE_LOGE(TAG, "PthreadMutexUnlock error");
1655             break;
1656         }
1657         ProcessDFileFrameList(session, &newHead);
1658     }
1659     ClearDFileFrameList(&newHead);
1660 }
1661 
DFileAddInboundQueue(DFileSession * session,const uint8_t * frame,size_t frameLength,struct sockaddr_in * peerAddr,uint8_t socketIndex)1662 static int32_t DFileAddInboundQueue(DFileSession *session, const uint8_t *frame, size_t frameLength,
1663                                     struct sockaddr_in *peerAddr, uint8_t socketIndex)
1664 {
1665     if (session->inboundQueueSize > MAX_RECVBUF_COUNT) {
1666         if (session->inboundQueueSize % MAX_NOMEM_PRINT == 0) {
1667             DFILE_LOGI(TAG, "no mem inboundQueueSize:%llu", session->inboundQueueSize);
1668         }
1669         return NSTACKX_ENOMEM;
1670     }
1671     QueueNode *queueNode = CreateQueueNode(frame, frameLength, peerAddr, socketIndex);
1672     if (queueNode == NULL) {
1673         return NSTACKX_ENOMEM;
1674     }
1675 
1676     if (PthreadMutexLock(&session->inboundQueueLock) != 0) {
1677         DestroyQueueNode(queueNode);
1678         return NSTACKX_EFAILED;
1679     }
1680     ListInsertTail(&session->inboundQueue, &queueNode->list);
1681     session->inboundQueueSize++;
1682     session->recvBlockNumDirect++;
1683 
1684     if (PthreadMutexUnlock(&session->inboundQueueLock) != 0) {
1685         /* queue node is pushed to list, don't need destroy here. */
1686         return NSTACKX_EFAILED;
1687     }
1688 
1689     return NSTACKX_EOK;
1690 }
1691 
DFileReceiverUpdateSessionMeasureTime(DFileSession * session)1692 static void DFileReceiverUpdateSessionMeasureTime(DFileSession *session)
1693 {
1694     if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
1695         ClockGetTime(CLOCK_MONOTONIC, &session->measureBefore);
1696     }
1697 }
1698 
BindServerRecvThreadToTargetCpu(DFileSession * session)1699 static void BindServerRecvThreadToTargetCpu(DFileSession *session)
1700 {
1701     int32_t cpu;
1702     int32_t cpus = GetCpuNum();
1703     if (session->sessionType != DFILE_SESSION_TYPE_SERVER) {
1704         cpu = CPU_IDX_2;
1705         StartThreadBindCore(cpu);
1706         return;
1707     }
1708     if (cpus >= FIRST_CPU_NUM_LEVEL) {
1709         return;
1710     } else if (cpus >= SECOND_CPU_NUM_LEVEL) {
1711         cpu = CPU_IDX_1;
1712     } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
1713         cpu = CPU_IDX_0;
1714     } else {
1715         return;
1716     }
1717     StartThreadBindCore(cpu);
1718 }
1719 
PostReadEventToMainLoop(DFileSession * session)1720 static void PostReadEventToMainLoop(DFileSession *session)
1721 {
1722     if (NSTACKX_ATOM_FETCH(&(session->unprocessedReadEventCount)) >= MAX_UNPROCESSED_READ_EVENT_COUNT) {
1723         return;
1724     }
1725     NSTACKX_ATOM_FETCH_INC(&session->unprocessedReadEventCount);
1726     if (PostEvent(&session->eventNodeChain, session->epollfd, ReadEventHandle, session) != NSTACKX_EOK) {
1727         DFILE_LOGE(TAG, "post read event failed");
1728         NSTACKX_ATOM_FETCH_DEC(&session->unprocessedReadEventCount);
1729         session->mainLoopActiveReadFlag = NSTACKX_TRUE;
1730     } else {
1731         session->mainLoopActiveReadFlag = NSTACKX_FALSE;
1732     }
1733 }
1734 
DFileSessionHandleReadBuffer(DFileSession * session,const uint8_t * buf,size_t bufLen,struct sockaddr_in * peerAddr,uint8_t socketIndex)1735 int32_t DFileSessionHandleReadBuffer(DFileSession *session, const uint8_t *buf, size_t bufLen,
1736                                      struct sockaddr_in *peerAddr, uint8_t socketIndex)
1737 {
1738     DFileFrame *dFileFrame = NULL;
1739     if (DecodeDFileFrame(buf, bufLen, &dFileFrame) != NSTACKX_EOK) {
1740         /* discard packet with non-zero trans id during cancel stage */
1741         DFILE_LOGE(TAG, "drop malformed frame");
1742         return NSTACKX_EOK;
1743     }
1744 
1745     int32_t ret = DFileAddInboundQueue(session, buf, bufLen, peerAddr, socketIndex);
1746     if (ret == NSTACKX_ENOMEM) {
1747         return NSTACKX_EOK;
1748     }
1749     if (ret != NSTACKX_EOK) {
1750         DFILE_LOGE(TAG, "frame add in bound queue failed :%d", ret);
1751         return NSTACKX_EFAILED;
1752     }
1753     PostReadEventToMainLoop(session);
1754     DFileRecvCalculateRate(session, dFileFrame, peerAddr);
1755     return NSTACKX_EOK;
1756 }
1757 
DFileRecverPre(DFileSession * session)1758 static void DFileRecverPre(DFileSession *session)
1759 {
1760     SetThreadName(DFFILE_RECV_THREAD_NAME);
1761     DFileReceiverUpdateSessionMeasureTime(session);
1762     SetMaximumPriorityForThread();
1763     SetTidToBindInfo(session, POS_RECV_THERD_START);
1764 }
1765 
RcverWaitSocket(DFileSession * session,uint8_t * canRead)1766 int32_t RcverWaitSocket(DFileSession *session, uint8_t *canRead)
1767 {
1768     if (session->acceptFlag == 0) {
1769         return WaitSocketEvent(session, session->socket[0]->sockfd, DEFAULT_WAIT_TIME_MS, canRead, NULL);
1770     } else {
1771         return WaitSocketEvent(session, session->acceptSocket->sockfd, DEFAULT_WAIT_TIME_MS, canRead, NULL);
1772     }
1773 }
1774 
DFileSocketRecv(DFileSession * session)1775 int32_t DFileSocketRecv(DFileSession *session)
1776 {
1777     return DFileSocketRecvSP(session);
1778 }
1779 
DFileAcceptSocket(DFileSession * session)1780 int32_t DFileAcceptSocket(DFileSession *session)
1781 {
1782     session->acceptSocket = AcceptSocket(session->socket[0]);
1783     if (session->acceptSocket == NULL) {
1784         DFILE_LOGI(TAG, "accept socket failed");
1785         return NSTACKX_EFAILED;
1786     } else {
1787         DFILE_LOGI(TAG, "accept socket success");
1788         session->acceptFlag = 1;
1789         SetTcpKeepAlive(session->acceptSocket->sockfd);
1790     }
1791 
1792     AcceptSocketEvent();
1793 
1794     return NSTACKX_EOK;
1795 }
1796 
DFileReceiverHandle(void * arg)1797 void *DFileReceiverHandle(void *arg)
1798 {
1799     DFileSession *session = arg;
1800     uint8_t canRead = NSTACKX_FALSE;
1801     int32_t ret = NSTACKX_EAGAIN;
1802     uint8_t isBind = NSTACKX_FALSE;
1803 
1804     DFILE_LOGI(TAG, "recv thread start");
1805     DFileRecverPre(session);
1806     while (!session->closeFlag) {
1807         if (ret == NSTACKX_EAGAIN || !canRead) {
1808             ret = RcverWaitSocket(session, &canRead);
1809             if (ret != NSTACKX_EOK || session->closeFlag) {
1810                 break;
1811             }
1812         }
1813         if (!canRead) {
1814             continue;
1815         }
1816         if (isBind == NSTACKX_FALSE && session->transFlag == NSTACKX_TRUE) {
1817             BindServerRecvThreadToTargetCpu(session);
1818             isBind = NSTACKX_TRUE;
1819         }
1820 
1821         ret = DFileSocketRecv(session);
1822         if (ret != NSTACKX_EAGAIN && ret != NSTACKX_EOK) {
1823             PeerShuttedEvent();
1824             break;
1825         }
1826     }
1827     DFILE_LOGI(TAG, "Total recv blocks: direct %llu inner %llu", session->recvBlockNumDirect,
1828         session->recvBlockNumInner);
1829     if (ret < 0 && ret != NSTACKX_EAGAIN && ret != NSTACKX_PEER_CLOSE) {
1830         PostFatalEvent(session);
1831     }
1832 
1833     DFILE_LOGI(TAG, "session %u Exit receiver thread ret %d", session->sessionId, ret);
1834     return NULL;
1835 }
1836 
DFileControlHandle(void * arg)1837 void *DFileControlHandle(void *arg)
1838 {
1839     SetThreadName(DFFILE_CONTROL_THREAD_NAME);
1840     DFileSession *session = arg;
1841     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
1842         DFileSenderControlHandle(session);
1843     } else {
1844         DFileReceiverControlHandle(session);
1845     }
1846     return NULL;
1847 }
1848 
RealPathFileName(FileListInfo * fileListInfo)1849 static int32_t RealPathFileName(FileListInfo *fileListInfo)
1850 {
1851     uint32_t i;
1852     int32_t ret = NSTACKX_EOK;
1853     for (i = 0; i < fileListInfo->fileNum; i++) {
1854         char *tmpFileName = fileListInfo->files[i];
1855         char *tmpFileNameRes = realpath(tmpFileName, NULL);
1856         if (tmpFileNameRes == NULL) {
1857             ret = NSTACKX_EFAILED;
1858             break;
1859         }
1860         fileListInfo->files[i] = tmpFileNameRes;
1861         free(tmpFileName);
1862     }
1863     if (ret != NSTACKX_EOK) {
1864         DFILE_LOGE(TAG, "realpath failed");
1865     }
1866     return ret;
1867 }
1868 
FreeTransFileListInfo(FileListInfo * fileListInfo)1869 static void FreeTransFileListInfo(FileListInfo *fileListInfo)
1870 {
1871     free(fileListInfo->files);
1872     fileListInfo->files = NULL;
1873     if (fileListInfo->remotePath != NULL) {
1874         free(fileListInfo->remotePath);
1875         fileListInfo->remotePath = NULL;
1876     }
1877     free(fileListInfo);
1878 }
1879 
DFileStartTransInner(DFileSession * session,FileListInfo * fileListInfo)1880 static int32_t DFileStartTransInner(DFileSession *session, FileListInfo *fileListInfo)
1881 {
1882     uint16_t transId = session->lastDFileTransId + 1;
1883     if (transId == 0) { /* overflow */
1884         transId = 1;
1885     }
1886 
1887     PeerInfo *peerInfo = TransSelectPeerInfo(session);
1888     DFileTrans *trans = CreateTrans(transId, session, peerInfo, NSTACKX_TRUE);
1889     if (trans == NULL) {
1890         DFILE_LOGE(TAG, "CreateTrans error");
1891         return NSTACKX_ENOMEM;
1892     }
1893 
1894     if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
1895         DFILE_LOGE(TAG, "set trans mtu failed");
1896     }
1897     if (RealPathFileName(fileListInfo) != NSTACKX_EOK) {
1898         DFileTransDestroy(trans);
1899         return NSTACKX_EFAILED;
1900     }
1901     int32_t ret = DFileTransSendFiles(trans, fileListInfo);
1902     if (ret != NSTACKX_EOK) {
1903         DFileTransDestroy(trans);
1904         DFILE_LOGE(TAG, "DFileTransSendFiles fail, error: %d", ret);
1905         return ret;
1906     }
1907     ret = DFileTransAddExtraInfo(trans, fileListInfo->pathType, fileListInfo->noticeFileNameType,
1908                                  fileListInfo->userData);
1909     if (ret != NSTACKX_EOK) {
1910         DFILE_LOGE(TAG, "DFileTransAddExtraInfo fail");
1911         DFileTransDestroy(trans);
1912         return NSTACKX_EFAILED;
1913     }
1914     trans->fileList->tarFlag = fileListInfo->tarFlag;
1915     trans->fileList->smallFlag = fileListInfo->smallFlag;
1916     trans->fileList->tarFile = fileListInfo->tarFile;
1917     trans->fileList->noSyncFlag = fileListInfo->noSyncFlag;
1918 
1919     fileListInfo->userData = NULL;
1920     ListInsertTail(&(session->dFileTransChain), &(trans->list));
1921     session->lastDFileTransId = transId;
1922     if (fileListInfo->smallFlag == NSTACKX_TRUE) {
1923         session->smallListProcessingCnt++;
1924     } else {
1925         session->fileListProcessingCnt++;
1926     }
1927     /* Elements in ctx->fileListInfo->files[] are reused by dFileTranns, so don't need to free. */
1928     FreeTransFileListInfo(fileListInfo);
1929     return NSTACKX_EOK;
1930 }
1931 
DFileStartTrans(DFileSession * session,FileListInfo * fileListInfo)1932 int32_t DFileStartTrans(DFileSession *session, FileListInfo *fileListInfo)
1933 {
1934     if (PthreadMutexLock(&session->transIdLock) != 0) {
1935         DFILE_LOGE(TAG, "pthread mutex lock error");
1936         return NSTACKX_EFAILED;
1937     }
1938     int32_t ret = DFileStartTransInner(session, fileListInfo);
1939     if (PthreadMutexUnlock(&session->transIdLock) != 0) {
1940         DFILE_LOGE(TAG, "pthread mutex unlock error");
1941     }
1942     return ret;
1943 }
1944 
TerminateMainThreadInner(void * arg)1945 void TerminateMainThreadInner(void *arg)
1946 {
1947     DFileSession *session = (DFileSession *)arg;
1948     DFileSessionSetTerminateFlag(session);
1949 }
1950 
StartDFileThreadsInner(DFileSession * session)1951 int32_t StartDFileThreadsInner(DFileSession *session)
1952 {
1953     if (PthreadCreate(&(session->tid), NULL, DFileMainLoop, session)) {
1954         DFILE_LOGE(TAG, "Create mainloop thread failed");
1955         goto L_ERR_MAIN_LOOP_THREAD;
1956     }
1957 
1958     if (CreateSenderThread(session) != NSTACKX_EOK) {
1959         goto L_ERR_SENDER_THREAD;
1960     }
1961 
1962     if (PthreadCreate(&(session->receiverTid), NULL, DFileReceiverHandle, session)) {
1963         DFILE_LOGE(TAG, "Create receiver thread failed");
1964         goto L_ERR_RECEIVER_THREAD;
1965     }
1966 
1967     if (PthreadCreate(&(session->controlTid), NULL, DFileControlHandle, session)) {
1968         DFILE_LOGE(TAG, "Create control thread failed");
1969         goto L_ERR_CONTROL_THREAD;
1970     }
1971     return NSTACKX_EOK;
1972 L_ERR_CONTROL_THREAD:
1973     DFileSessionSetTerminateFlag(session);
1974     PthreadJoin(session->controlTid, NULL);
1975     session->receiverTid = INVALID_TID;
1976 L_ERR_RECEIVER_THREAD:
1977     DFileSessionSetTerminateFlag(session);
1978     PthreadJoin(session->senderTid[0], NULL);
1979     session->senderTid[0] = INVALID_TID;
1980     PostOutboundQueueWait(session);
1981 L_ERR_SENDER_THREAD:
1982     DFileSessionSetTerminateFlag(session);
1983     if (PostEvent(&session->eventNodeChain, session->epollfd, TerminateMainThreadInner, session) != NSTACKX_EOK) {
1984         DFILE_LOGE(TAG, "post terminate thread failed");
1985     }
1986     PthreadJoin(session->tid, NULL);
1987     session->tid = INVALID_TID;
1988 L_ERR_MAIN_LOOP_THREAD:
1989     return NSTACKX_EFAILED;
1990 }
1991 
FileManagerMsgHandle(FileManagerMsgType msgType,int32_t errCode,void * context)1992 static void FileManagerMsgHandle(FileManagerMsgType msgType, int32_t errCode, void *context)
1993 {
1994     DFileSession *session = context;
1995     if (msgType == FILE_MANAGER_INNER_ERROR) {
1996         DFILE_LOGE(TAG, "Session (%u) fatal error -- File Manager error: %d", session->sessionId, errCode);
1997         PostFatalEvent(session);
1998     }
1999 
2000     if (msgType == FILE_MANAGER_IN_PROGRESS) {
2001         NoticeSessionProgress(session);
2002     }
2003 }
2004 
CreateFileManager(DFileSession * session,const uint8_t * key,uint32_t keyLen,uint8_t isSender,uint16_t connType)2005 int32_t CreateFileManager(DFileSession *session, const uint8_t *key, uint32_t keyLen, uint8_t isSender,
2006     uint16_t connType)
2007 {
2008     FileManagerMsgPara msgPara;
2009     if (session == NULL) {
2010         DFILE_LOGE(TAG, "invalid input");
2011         return NSTACKX_EINVAL;
2012     }
2013     if (isSender && (connType != CONNECT_TYPE_P2P && connType != CONNECT_TYPE_WLAN)) {
2014         DFILE_LOGE(TAG, "connType for sender is illagal");
2015         return NSTACKX_EINVAL;
2016     }
2017     if (keyLen > 0) {
2018         if ((keyLen != AES_128_KEY_LENGTH && keyLen != CHACHA20_KEY_LENGTH) || key == NULL) {
2019             DFILE_LOGE(TAG, "error key or key len");
2020             return NSTACKX_EFAILED;
2021         }
2022         if (!IsCryptoIncluded()) {
2023             DFILE_LOGE(TAG, "crypto is not opened");
2024             return NSTACKX_EFAILED;
2025         }
2026     }
2027     msgPara.epollfd = session->epollfd;
2028     msgPara.eventNodeChain = &session->eventNodeChain;
2029     msgPara.msgReceiver = FileManagerMsgHandle;
2030     msgPara.context = session;
2031     session->fileManager = FileManagerCreate(isSender, &msgPara, key, keyLen, connType);
2032     if (session->fileManager == NULL) {
2033         DFILE_LOGE(TAG, "create filemanager failed");
2034         return NSTACKX_EFAILED;
2035     }
2036     return NSTACKX_EOK;
2037 }
2038 
DestroyReceiverPipe(DFileSession * session)2039 void DestroyReceiverPipe(DFileSession *session)
2040 {
2041     if (session->receiverPipe[PIPE_OUT] != INVALID_PIPE_DESC) {
2042         CloseDesc(session->receiverPipe[PIPE_OUT]);
2043         session->receiverPipe[PIPE_OUT] = INVALID_PIPE_DESC;
2044     }
2045     if (session->receiverPipe[PIPE_IN] != INVALID_PIPE_DESC) {
2046         CloseDesc(session->receiverPipe[PIPE_IN]);
2047         session->receiverPipe[PIPE_IN] = INVALID_PIPE_DESC;
2048     }
2049 }
2050 
DFileSetEvent(void * softObj,DFileEventFunc func)2051 void DFileSetEvent(void *softObj, DFileEventFunc func)
2052 {
2053     g_dfileEventFunc = func;
2054     (void)softObj;
2055 }
2056 
NSTACKX_DFileAssembleFunc(void * softObj,const DFileEvent * info)2057 void NSTACKX_DFileAssembleFunc(void *softObj, const DFileEvent *info)
2058 {
2059     if (g_dfileEventFunc != NULL) {
2060         g_dfileEventFunc(softObj, info);
2061     }
2062 }
2063