• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "nstackx_dfile_session.h"
17 
18 #include "nstackx_congestion.h"
19 #include "nstackx_dfile.h"
20 #include "nstackx_dfile_config.h"
21 #include "nstackx_dfile_frame.h"
22 #include "nstackx_dfile_send.h"
23 #include "nstackx_epoll.h"
24 #include "nstackx_error.h"
25 #include "nstackx_event.h"
26 #include "nstackx_log.h"
27 #ifdef MBEDTLS_INCLUDED
28 #include "nstackx_mbedtls.h"
29 #else
30 #include "nstackx_openssl.h"
31 #endif
32 #include "nstackx_timer.h"
33 #include "nstackx_dfile_control.h"
34 #include "nstackx_dfile_mp.h"
35 #include "securec.h"
36 #include "nstackx_util.h"
37 
38 #define TAG "nStackXDFile"
39 
40 #define DEFAULT_WAIT_TIME_MS               1000
41 #define MULTI_THREADS_SOCKET_WAIT_TIME_MS  1
42 #define DEFAULT_NEGOTIATE_TIMEOUT          1000
43 #define TCP_NEGOTIATE_TIMEOUT              10000
44 #define MAX_SERVER_NEGOTIATE_VALID_TIMEOUT (600 * DEFAULT_NEGOTIATE_TIMEOUT)
45 #define MAX_NEGOTIATE_TIMEOUT_COUNT        3
46 #define MAX_UNPROCESSED_READ_EVENT_COUNT   3
47 #define MAX_RECVBUF_COUNT                  130000
48 #define MAX_NOMEM_PRINT                    10000
49 
50 static void ReadEventHandle(void *arg);
51 static void ProcessSessionTrans(const DFileSession *session, uint16_t exceptTransId);
52 
CreateQueueNode(const uint8_t * frame,size_t length,const struct sockaddr_in * peerAddr,uint8_t socketIndex)53 static QueueNode *CreateQueueNode(const uint8_t *frame, size_t length,
54     const struct sockaddr_in *peerAddr, uint8_t socketIndex)
55 {
56     QueueNode *queueNode = NULL;
57 
58     if (frame == NULL || length == 0 || length > NSTACKX_MAX_FRAME_SIZE) {
59         return NULL;
60     }
61 
62     queueNode = calloc(1, sizeof(QueueNode));
63     if (queueNode == NULL) {
64         return NULL;
65     }
66 
67     queueNode->frame = calloc(1, length);
68     if (queueNode->frame == NULL) {
69         free(queueNode);
70         return NULL;
71     }
72     queueNode->length = length;
73     queueNode->sendLen = 0;
74 
75     if (memcpy_s(queueNode->frame, length, frame, length) != EOK) {
76         free(queueNode->frame);
77         free(queueNode);
78         return NULL;
79     }
80     if (peerAddr != NULL) {
81         if (memcpy_s(&queueNode->peerAddr, sizeof(struct sockaddr_in), peerAddr, sizeof(struct sockaddr_in)) != EOK) {
82             free(queueNode->frame);
83             free(queueNode);
84             return NULL;
85         }
86     }
87     queueNode->socketIndex = socketIndex;
88 
89     return queueNode;
90 }
91 
DestroyQueueNode(QueueNode * queueNode)92 void DestroyQueueNode(QueueNode *queueNode)
93 {
94     if (queueNode != NULL) {
95         free(queueNode->frame);
96         free(queueNode);
97     }
98 }
99 
NotifyMsgRecver(const DFileSession * session,DFileMsgType msgType,const DFileMsg * msg)100 void NotifyMsgRecver(const DFileSession *session, DFileMsgType msgType, const DFileMsg *msg)
101 {
102     if (session == NULL) {
103         LOGI(TAG, "session is NULL");
104         return;
105     }
106 
107     if (session->msgReceiver == NULL) {
108         LOGI(TAG, "msgReceiver is NULL");
109         return;
110     }
111 
112     session->msgReceiver(session->sessionId, msgType, msg);
113 }
114 
CalculateSessionTransferRatePrepare(DFileSession * session)115 void CalculateSessionTransferRatePrepare(DFileSession *session)
116 {
117 #ifdef NSTACKX_SMALL_FILE_SUPPORT
118     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && (!ListIsEmpty(&session->pendingFileLists)
119         || !ListIsEmpty(&session->smallFileLists))) {
120         return;
121     }
122 #else
123     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && !ListIsEmpty(&session->pendingFileLists)) {
124         return;
125     }
126 #endif
127     if (!ListIsEmpty(&session->dFileTransChain)) {
128         return;
129     }
130     LOGI(TAG, "begin to calulate transfer rate");
131     session->bytesTransferred = 0;
132     session->transCount = 0;
133     ClockGetTime(CLOCK_MONOTONIC, &session->startTs);
134 }
135 
136 #ifdef NSTACKX_SMALL_FILE_SUPPORT
SendSmallList(DFileSession * session)137 static int32_t SendSmallList(DFileSession *session)
138 {
139     FileListInfo *fileListInfo = NULL;
140     DFileMsg data;
141     if (session->fileListProcessingCnt > 0 || session->fileListPendingCnt == 0) {
142         while (!ListIsEmpty(&session->smallFileLists)) {
143             fileListInfo = (FileListInfo *)ListPopFront(&session->smallFileLists);
144             session->smallListPendingCnt--;
145             if (fileListInfo == NULL) {
146                 continue;
147             }
148             int32_t ret = DFileStartTrans(session, fileListInfo);
149             if (ret == NSTACKX_EOK) {
150                 return NSTACKX_TRUE;
151             }
152             LOGE(TAG, "DFileStartTrans fail, error: %d", ret);
153             (void)memset_s(&data, sizeof(data), 0, sizeof(data));
154             data.errorCode = ret;
155             data.fileList.files = (const char **)fileListInfo->files;
156             data.fileList.fileNum = fileListInfo->fileNum;
157             data.fileList.userData = fileListInfo->userData;
158             NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, &data);
159             DestroyFileListInfo(fileListInfo);
160         }
161     }
162     return NSTACKX_FALSE;
163 }
164 #endif
165 
SendPendingList(DFileSession * session)166 static void SendPendingList(DFileSession *session)
167 {
168     FileListInfo *fileListInfo = NULL;
169     DFileMsg data;
170     while (!ListIsEmpty(&session->pendingFileLists)) {
171         fileListInfo = (FileListInfo *)ListPopFront(&session->pendingFileLists);
172         session->fileListPendingCnt--;
173         if (fileListInfo == NULL) {
174             continue;
175         }
176         int32_t ret = DFileStartTrans(session, fileListInfo);
177         if (ret == NSTACKX_EOK) {
178             break;
179         }
180         LOGE(TAG, "DFileStartTrans fail, error: %d", ret);
181         (void)memset_s(&data, sizeof(data), 0, sizeof(data));
182         data.errorCode = ret;
183         data.fileList.files = (const char **)fileListInfo->files;
184         data.fileList.fileNum = fileListInfo->fileNum;
185         data.fileList.userData = fileListInfo->userData;
186         NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, &data);
187         DestroyFileListInfo(fileListInfo);
188     }
189 }
190 
SendSmallAndPendingList(DFileSession * session)191 static void SendSmallAndPendingList(DFileSession *session)
192 {
193 #ifdef NSTACKX_SMALL_FILE_SUPPORT
194     LOGI(TAG, "fileListPendingCnt %u fileListProcessingCnt %u smallListPendingCnt %u smallListProcessingCnt %u",
195         session->fileListPendingCnt, session->fileListProcessingCnt, session->smallListPendingCnt,
196         session->smallListProcessingCnt);
197     if (SendSmallList(session) != NSTACKX_TRUE) {
198         SendPendingList(session);
199     }
200 #else
201     LOGI(TAG, "fileListPendingCnt %u fileListProcessingCnt %u",
202         session->fileListPendingCnt, session->fileListProcessingCnt);
203     SendPendingList(session);
204 #endif
205 }
206 
NoticeSessionProgress(DFileSession * session)207 void NoticeSessionProgress(DFileSession *session)
208 {
209     DFileMsg data;
210     (void)memset_s(&data, sizeof(data), 0, sizeof(data));
211     if (FileManagerGetTotalBytes(session->fileManager, &data.transferUpdate.totalBytes) == NSTACKX_EOK &&
212         FileManagerGetBytesTransferred(session->fileManager, &data.transferUpdate.bytesTransferred) == NSTACKX_EOK
213         && data.transferUpdate.bytesTransferred <= data.transferUpdate.totalBytes
214         && data.transferUpdate.bytesTransferred > 0) {
215         NotifyMsgRecver(session, DFILE_ON_SESSION_IN_PROGRESS, &data);
216     }
217 }
218 
UpdateMsgProcessInfo(const DFileSession * session,struct DFileTrans * dFileTrans,DFileTransMsgType msgType,DFileTransMsg * msg)219 static void UpdateMsgProcessInfo(const DFileSession *session, struct DFileTrans *dFileTrans,
220                                  DFileTransMsgType msgType, DFileTransMsg *msg)
221 {
222     uint64_t totalBytes = 0;
223     uint64_t bytesTrans = 0;
224 
225     if (session == NULL || dFileTrans == NULL || msg == NULL ||
226         (msgType != DFILE_TRANS_MSG_FILE_RECEIVED && msgType != DFILE_TRANS_MSG_FILE_RECEIVE_FAIL &&
227         msgType != DFILE_TRANS_MSG_FILE_SEND_FAIL && msgType != DFILE_TRANS_MSG_FILE_SENT)) {
228         return;
229     }
230 
231     msg->transferUpdate.transId = dFileTrans->transId;
232 
233     if (msgType == DFILE_TRANS_MSG_FILE_RECEIVED ||  msgType == DFILE_TRANS_MSG_FILE_SENT) {
234         totalBytes = DFileTransGetTotalBytes(dFileTrans);
235         if (totalBytes > 0) {
236             bytesTrans = totalBytes;
237             goto L_END;
238         }
239     }
240 
241     if (FileManagerGetTransUpdateInfo(session->fileManager, dFileTrans->transId, &totalBytes, &bytesTrans) !=
242         NSTACKX_EOK) {
243         return;
244     }
245 
246 L_END:
247     msg->transferUpdate.totalBytes = totalBytes;
248     msg->transferUpdate.bytesTransferred = bytesTrans;
249     if (msgType == DFILE_TRANS_MSG_FILE_SENT) {
250         if (dFileTrans->fileList->vtransFlag) {
251             msg->fileList.transId = dFileTrans->fileList->vtransRealTransId;
252             msg->transferUpdate.transId = dFileTrans->fileList->vtransRealTransId;
253         }
254         NotifyMsgRecver(session, DFILE_ON_TRANS_IN_PROGRESS, msg);
255     }
256 }
257 
WakeSendThread(DFileSession * session,uint8_t isSender,uint8_t socketIndex)258 static void WakeSendThread(DFileSession *session, uint8_t isSender, uint8_t socketIndex)
259 {
260     SemPost(&session->outboundQueueWait[socketIndex]);
261     if (isSender && session->clientSendThreadNum > 1) {
262         for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
263             SemPost(&session->sendThreadPara[i].sendWait);
264         }
265     }
266 }
267 
CalculateSessionTransferRate(DFileSession * session,uint64_t totalBytes,DFileTransMsgType msgType)268 static void CalculateSessionTransferRate(DFileSession *session, uint64_t totalBytes, DFileTransMsgType msgType)
269 {
270     if (msgType != DFILE_TRANS_MSG_FILE_SENT && msgType != DFILE_TRANS_MSG_END) {
271         return;
272     }
273     if (totalBytes <= UINT64_MAX - session->bytesTransferred) {
274         session->bytesTransferred += totalBytes;
275     } else {
276         session->bytesTransferred = UINT64_MAX;
277     }
278     session->transCount++;
279     if (!ListIsEmpty(&session->dFileTransChain)) {
280         return;
281     }
282 #ifdef NSTACKX_SMALL_FILE_SUPPORT
283     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && (!ListIsEmpty(&session->pendingFileLists)
284         || !ListIsEmpty(&session->smallFileLists))) {
285         return;
286     }
287 #else
288     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && !ListIsEmpty(&session->pendingFileLists)) {
289         return;
290     }
291 #endif
292     struct timespec endTs;
293     ClockGetTime(CLOCK_MONOTONIC, &endTs);
294 
295     uint32_t spendTime = GetTimeDiffMs(&endTs, &session->startTs);
296     if (spendTime == 0) {
297         return;
298     }
299     const double rate = 1.0 * session->bytesTransferred / DFILE_MEGABYTES * MSEC_TICKS_PER_SEC / spendTime;
300     LOGI(TAG, "Total %u trans, %llu bytes, used %u ms. rate %.2f MB/s",
301          session->transCount, session->bytesTransferred, spendTime, rate);
302     DFileMsg msgData;
303     (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
304     msgData.rate = (uint32_t)rate;
305     NotifyMsgRecver(session, DFILE_ON_SESSION_TRANSFER_RATE, &msgData);
306 }
307 
CheckTransDone(DFileSession * session,struct DFileTrans * dFileTrans,DFileTransMsgType msgType)308 static void CheckTransDone(DFileSession *session, struct DFileTrans *dFileTrans, DFileTransMsgType msgType)
309 {
310     if (msgType == DFILE_TRANS_MSG_FILE_RECEIVE_FAIL || msgType == DFILE_TRANS_MSG_FILE_SENT ||
311         msgType == DFILE_TRANS_MSG_FILE_SEND_FAIL || msgType == DFILE_TRANS_MSG_END) {
312         uint8_t flag = dFileTrans->fileList->smallFlag;
313         if (SetTransIdState(session, dFileTrans->transId, STATE_TRANS_DONE) != NSTACKX_EOK) {
314             LOGE(TAG, "set trans id state fail");
315         }
316         ((PeerInfo *)dFileTrans->context)->currentTransCount--;
317         ListRemoveNode(&dFileTrans->list);
318         uint64_t totalBytes = DFileTransGetTotalBytes(dFileTrans);
319         DFileTransDestroy(dFileTrans);
320         if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
321             if (flag == NSTACKX_TRUE && session->smallListProcessingCnt > 0) {
322                 session->smallListProcessingCnt--;
323             } else if (session->fileListProcessingCnt > 0) {
324                 session->fileListProcessingCnt--;
325             }
326             SendSmallAndPendingList(session);
327         }
328         CalculateSessionTransferRate(session, totalBytes, msgType);
329     }
330 }
331 
DTransMsgReceiver(struct DFileTrans * dFileTrans,DFileTransMsgType msgType,DFileTransMsg * msg)332 static void DTransMsgReceiver(struct DFileTrans *dFileTrans, DFileTransMsgType msgType,
333                               DFileTransMsg *msg)
334 {
335     PeerInfo *peerInfo = dFileTrans->context;
336     DFileSession *session = peerInfo->session;
337 
338     UpdateMsgProcessInfo(session, dFileTrans, msgType, msg);
339 
340     switch (msgType) {
341         case DFILE_TRANS_MSG_FILE_SEND_DATA:
342             WakeSendThread(session, dFileTrans->isSender, peerInfo->socketIndex);
343             break;
344         case DFILE_TRANS_MSG_FILE_LIST_RECEIVED:
345             NotifyMsgRecver(session, DFILE_ON_FILE_LIST_RECEIVED, msg);
346             break;
347         case DFILE_TRANS_MSG_FILE_RECEIVED: /* Receiver receive all the file data */
348             NotifyMsgRecver(session, DFILE_ON_FILE_RECEIVE_SUCCESS, msg);
349             break;
350         case DFILE_TRANS_MSG_FILE_RECEIVED_TO_FAIL:
351         case DFILE_TRANS_MSG_FILE_RECEIVE_FAIL:
352             NotifyMsgRecver(session, DFILE_ON_FILE_RECEIVE_FAIL, msg);
353             break;
354         case DFILE_TRANS_MSG_FILE_SENT: /* Sender send TRANSFER DONE ACK frame and come to end */
355             NoticeSessionProgress(session);
356             NotifyMsgRecver(session, DFILE_ON_FILE_SEND_SUCCESS, msg);
357             break;
358         case DFILE_TRANS_MSG_FILE_SEND_FAIL:
359             NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, msg);
360             break;
361         case DFILE_TRANS_MSG_IN_PROGRESS:
362             NotifyMsgRecver(session, DFILE_ON_TRANS_IN_PROGRESS, msg);
363             break;
364         case DFILE_TRANS_MSG_FILE_SEND_ACK:
365             ProcessSessionTrans(session, dFileTrans->transId);
366             break;
367         default:
368             LOGI(TAG, "transId %u, recv DFileTrans event %d", dFileTrans->transId, msgType);
369     }
370 
371     CheckTransDone(session, dFileTrans, msgType);
372 }
373 
ServerSettingTimeoutHandle(void * data)374 static void ServerSettingTimeoutHandle(void *data)
375 {
376     PeerInfo *peerInfo = data;
377     ListRemoveNode(&peerInfo->list);
378     TimerDelete(peerInfo->settingTimer);
379     peerInfo->session->peerInfoCnt--;
380     free(peerInfo);
381     LOGI(TAG, "DFileServer Setting Negotationion timeout");
382 }
383 
ClientSettingTimeoutHandle(void * data)384 static void ClientSettingTimeoutHandle(void *data)
385 {
386     PeerInfo *peerInfo = data;
387     uint8_t cnt = peerInfo->settingTimeoutCnt++;
388     DFileMsg msgData;
389     uint32_t timeout = CapsTcp(peerInfo->session) ? TCP_NEGOTIATE_TIMEOUT : DEFAULT_NEGOTIATE_TIMEOUT;
390     (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
391     if (cnt >= MAX_NEGOTIATE_TIMEOUT_COUNT) {
392         TimerDelete(peerInfo->settingTimer);
393         peerInfo->settingTimer = NULL;
394         peerInfo->settingTimeoutCnt = 0;
395         msgData.errorCode = NSTACKX_EFAILED;
396         LOGI(TAG, "DFileClient Setting Negotationion timeout");
397         NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &msgData);
398     } else {
399         DFileSessionSendSetting(peerInfo);
400         LOGI(TAG, "Client Setting Negotationion timeout %u times", peerInfo->settingTimeoutCnt);
401         if (TimerSetTimeout(peerInfo->settingTimer, timeout, NSTACKX_FALSE) != NSTACKX_EOK) {
402             msgData.errorCode = NSTACKX_EFAILED;
403             NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &msgData);
404             LOGE(TAG, "Timer setting timer fail");
405         }
406     }
407 }
408 
SearchDFileTransNode(List * dFileTransChain,uint16_t transId)409 static DFileTrans *SearchDFileTransNode(List *dFileTransChain, uint16_t transId)
410 {
411     List *pos = NULL;
412     DFileTrans *trans = NULL;
413 
414     if (dFileTransChain == NULL || transId == 0) {
415         return NULL;
416     }
417 
418     LIST_FOR_EACH(pos, dFileTransChain) {
419         trans = (DFileTrans *)pos;
420         if (trans->transId == transId) {
421             return trans;
422         }
423     }
424     return NULL;
425 }
426 
SearchPeerInfoNode(const DFileSession * session,const struct sockaddr_in * peerAddr)427 static PeerInfo *SearchPeerInfoNode(const DFileSession *session, const struct sockaddr_in *peerAddr)
428 {
429     List *pos = NULL;
430     PeerInfo *peerInfo = NULL;
431     LIST_FOR_EACH(pos, &session->peerInfoChain) {
432         peerInfo = (PeerInfo *)pos;
433         if (memcmp(&peerInfo->dstAddr, peerAddr, sizeof(struct sockaddr_in)) == 0 &&
434             peerInfo->session->sessionId == session->sessionId) {
435             return peerInfo;
436         }
437     }
438     return NULL;
439 }
440 
DFileSessionSendSetting(PeerInfo * peerInfo)441 void DFileSessionSendSetting(PeerInfo *peerInfo)
442 {
443     uint32_t timeout = CapsTcp(peerInfo->session) ? TCP_NEGOTIATE_TIMEOUT : DEFAULT_NEGOTIATE_TIMEOUT;
444     uint8_t buf[NSTACKX_DEFAULT_FRAME_SIZE];
445     size_t frameLen = 0;
446     DFileMsg data;
447     (void)memset_s(&data, sizeof(data), 0, sizeof(data));
448     SettingFrame settingFramePara;
449     settingFramePara.connType = peerInfo->connType;
450     settingFramePara.mtu = peerInfo->localMtu;
451     settingFramePara.capability = peerInfo->session->capability & NSTACKX_CAPS_LINK_SEQUENCE;
452     settingFramePara.dataFrameSize = 0;
453     settingFramePara.capsCheck = NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
454     EncodeSettingFrame(buf, NSTACKX_DEFAULT_FRAME_SIZE, &frameLen, &settingFramePara);
455     int32_t ret = DFileWriteHandle(buf, frameLen, peerInfo);
456     if (ret != (int32_t)frameLen && ret != NSTACKX_EAGAIN) {
457         data.errorCode = NSTACKX_EFAILED;
458         NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
459         return;
460     }
461 
462     if (peerInfo->settingTimer != NULL) {
463         return;
464     }
465 
466     if (peerInfo->session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
467         peerInfo->settingTimer = TimerStart(peerInfo->session->epollfd, timeout,
468                                             NSTACKX_FALSE, ClientSettingTimeoutHandle, peerInfo);
469         if (peerInfo->settingTimer == NULL) {
470             LOGE(TAG, "setting timmer creat fail");
471             data.errorCode = NSTACKX_EFAILED;
472             NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
473         }
474     } else {
475         peerInfo->settingTimer = TimerStart(peerInfo->session->epollfd, MAX_SERVER_NEGOTIATE_VALID_TIMEOUT,
476                                             NSTACKX_FALSE, ServerSettingTimeoutHandle, peerInfo);
477         if (peerInfo->settingTimer == NULL) {
478             return;
479         }
480     }
481 }
482 
SetDFileSessionConfig(DFileSession * session,DFileConfig * dFileConfig,uint16_t connType,PeerInfo * peerInfo)483 static void SetDFileSessionConfig(DFileSession *session, DFileConfig *dFileConfig, uint16_t connType,
484     PeerInfo *peerInfo)
485 {
486     peerInfo->maxSendRate = dFileConfig->sendRate;
487     (void)memset_s(peerInfo->integralLossRate, INTEGRAL_TIME * sizeof(double), 0, INTEGRAL_TIME * sizeof(double));
488     peerInfo->fastStartCounter = 0;
489     ClockGetTime(CLOCK_MONOTONIC, &peerInfo->measureBefore);
490     peerInfo->dataFrameSize = dFileConfig->dataFrameSize;
491     if (connType == CONNECT_TYPE_WLAN) {
492         peerInfo->sendRate = dFileConfig->sendRate / NSTACKX_WLAN_INIT_SPEED_DIVISOR;
493     } else {
494         peerInfo->sendRate = dFileConfig->sendRate / NSTACKX_P2P_INIT_SPEED_DIVISOR;
495     }
496 
497 #ifndef NSTACKX_WITH_LITEOS
498     if ((!peerInfo->gotWifiRate) && (connType == CONNECT_TYPE_WLAN)) {
499         peerInfo->sendRate = (uint16_t)(NSTACKX_WLAN_INIT_RATE / MSEC_TICKS_PER_SEC * DATA_FRAME_SEND_INTERVAL_MS
500             / dFileConfig->dataFrameSize + NSTACKX_WLAN_COMPENSATION_RATE);
501     }
502 #endif
503     if (peerInfo->sendRate < NSTACKX_MIN_SENDRATE) {
504         peerInfo->sendRate = NSTACKX_MIN_SENDRATE;
505     }
506 
507     if (FileManagerSetMaxFrameLength(session->fileManager, peerInfo->dataFrameSize) != NSTACKX_EOK) {
508         LOGE(TAG, "failed to set max frame length");
509     }
510 
511     LOGI(TAG, "connType is %u set sendrate is %u maxSendRate is %u peerInfo->dataFrameSize is %u",
512          connType, peerInfo->sendRate, peerInfo->maxSendRate, peerInfo->dataFrameSize);
513     if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
514         if (FileManagerSetRecvParaWithConnType(session->fileManager, connType) != NSTACKX_EOK) {
515             LOGE(TAG, "failed to set recv para");
516         }
517     }
518 }
519 
DFileSessionSetPeerInfo(PeerInfo * peerInfo,SettingState state,SettingFrame * settingFrame)520 static void DFileSessionSetPeerInfo(PeerInfo *peerInfo, SettingState state, SettingFrame *settingFrame)
521 {
522     peerInfo->state = state;
523     peerInfo->mtu = settingFrame->mtu;
524     peerInfo->connType = settingFrame->connType;
525     uint16_t localMtu = peerInfo->localMtu;
526     peerInfo->mtuInuse = (localMtu < peerInfo->mtu) ? localMtu : peerInfo->mtu;
527     peerInfo->remoteSessionId = settingFrame->header.sessionId;
528 }
529 
DFileSessionHandleClientSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)530 static void DFileSessionHandleClientSetting(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
531 {
532     List *pos = NULL;
533     SettingFrame hostSettingFrame;
534     (void)memset_s(&hostSettingFrame, sizeof(hostSettingFrame), 0, sizeof(hostSettingFrame));
535     LOGI(TAG, "handle Setting Frame, DFileSessionType %u", session->sessionType);
536     /* unsupport version */
537     if (DecodeSettingFrame((SettingFrame *)dFileFrame, &hostSettingFrame) != NSTACKX_EOK || hostSettingFrame.mtu == 0) {
538         return;
539     }
540     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
541     if (peerInfo == NULL) {
542         LOGE(TAG, "recv unknow peer setting, maybe be attacked");
543         return;
544     }
545     peerInfo->remoteDFileVersion = hostSettingFrame.dFileVersion;
546     TimerDelete(peerInfo->settingTimer);
547     peerInfo->settingTimer = NULL;
548     DFileSessionSetPeerInfo(peerInfo, SETTING_NEGOTIATED, &hostSettingFrame);
549     LIST_FOR_EACH(pos, &session->dFileTransChain) {
550         DFileTrans *trans = (DFileTrans *)pos;
551         if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
552             LOGE(TAG, "set trans mtu failed");
553         }
554     }
555     DFileMsg data;
556     (void)memset_s(&data, sizeof(data), 0, sizeof(data));
557     data.errorCode = NSTACKX_EOK;
558     NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_SUCCESS, &data);
559 
560     DFileConfig dFileConfig;
561     (void)memset_s(&dFileConfig, sizeof(dFileConfig), 0, sizeof(dFileConfig));
562     if (GetDFileConfig(&dFileConfig, peerInfo->mtuInuse, hostSettingFrame.connType) == NSTACKX_EOK) {
563         SetDFileSessionConfig(session, &dFileConfig, hostSettingFrame.connType, peerInfo);
564     }
565     if (hostSettingFrame.capability & NSTACKX_CAPS_LINK_SEQUENCE) {
566         LOGI(TAG, "server replies not support Link Sequence");
567     } else {
568         LOGI(TAG, "server replies using normal ACK");
569         session->capability &= ~NSTACKX_CAPS_LINK_SEQUENCE;
570     }
571 }
572 
DFileGetMTU(SocketProtocol protocol)573 static uint16_t DFileGetMTU(SocketProtocol protocol)
574 {
575     /* for udp, return NSTACKX_DEFAULT_FRAME_SIZE, for D2D, need call D2D MTU interface */
576     uint16_t mtu = 0;
577     if (protocol == NSTACKX_PROTOCOL_UDP) {
578         mtu = NSTACKX_DEFAULT_FRAME_SIZE;
579     } else if (protocol == NSTACKX_PROTOCOL_D2D) {
580         LOGE(TAG, "d2d not support");
581     } else if (protocol == NSTACKX_PROTOCOL_TCP) {
582         mtu = NSTACKX_DEFAULT_FRAME_SIZE;
583     }
584 
585     return mtu;
586 }
587 
CreatePeerInfo(DFileSession * session,const struct sockaddr_in * dstAddr,uint16_t peerMtu,uint16_t connType,uint8_t socketIndex)588 PeerInfo *CreatePeerInfo(DFileSession *session, const struct sockaddr_in *dstAddr, uint16_t peerMtu,
589     uint16_t connType, uint8_t socketIndex)
590 {
591     if (session->peerInfoCnt >= MAX_PEERINFO_SIZE) {
592         return NULL;
593     }
594     PeerInfo *peerInfo = calloc(1, sizeof(PeerInfo));
595     if (peerInfo == NULL) {
596         return NULL;
597     }
598     peerInfo->session = session;
599     peerInfo->dstAddr = *dstAddr;
600     peerInfo->connType = connType;
601     peerInfo->socketIndex = socketIndex;
602     peerInfo->localMtu = DFileGetMTU(session->protocol);
603     session->peerInfoCnt++;
604     peerInfo->gotWifiRate = 0;
605     peerInfo->ackInterval = NSTACKX_INIT_ACK_INTERVAL;
606     peerInfo->rateStateInterval = NSTACKX_INIT_RATE_STAT_INTERVAL;
607 
608     if (GetInterfaceNameByIP(session->socket[socketIndex]->srcAddr.sin_addr.s_addr,
609         peerInfo->localInterfaceName, sizeof(peerInfo->localInterfaceName)) != NSTACKX_EOK) {
610         LOGE(TAG, "GetInterfaceNameByIP failed %d", errno);
611     }
612 
613     if (peerMtu == 0) {
614         return peerInfo;
615     }
616     peerInfo->mtu = peerMtu;
617     peerInfo->mtuInuse = (peerInfo->localMtu < peerInfo->mtu) ? peerInfo->localMtu : peerInfo->mtu;
618     return peerInfo;
619 }
620 
HandleLinkSeqCap(DFileSession * session,SettingFrame * hostSettingFrame)621 static void HandleLinkSeqCap(DFileSession *session, SettingFrame *hostSettingFrame)
622 {
623     if (hostSettingFrame->capability & NSTACKX_CAPS_LINK_SEQUENCE) {
624         LOGI(TAG, "client wants to enable Link Sequence");
625     } else {
626         LOGI(TAG, "client wants to use normal ACK");
627         session->capability &= ~NSTACKX_CAPS_LINK_SEQUENCE;
628     }
629 
630     if (hostSettingFrame->capability & NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK) {
631         LOGI(TAG, "client support recv feedback");
632         session->capsCheck |= NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
633     } else {
634         LOGI(TAG, "client do not support recv feedback");
635         session->capsCheck &= ~NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
636     }
637 }
638 
AllocPeerInfo(DFileSession * session,const struct sockaddr_in * peerAddr,const SettingFrame * frame,uint8_t socketIndex)639 static PeerInfo *AllocPeerInfo(DFileSession *session, const struct sockaddr_in *peerAddr, const SettingFrame *frame,
640     uint8_t socketIndex)
641 {
642     PeerInfo *peerInfo = NULL;
643 
644     peerInfo = CreatePeerInfo(session, peerAddr, frame->mtu, frame->connType, socketIndex);
645     if (peerInfo == NULL) {
646         return NULL;
647     }
648 
649     peerInfo->remoteSessionId = frame->header.sessionId;
650     peerInfo->state = SETTING_NEGOTIATING;
651     DFileSessionSendSetting(peerInfo);
652     if (peerInfo->settingTimer == NULL) {
653         free(peerInfo);
654         session->peerInfoCnt--;
655         return NULL;
656     }
657     ListInsertTail(&peerInfo->session->peerInfoChain, &peerInfo->list);
658     return peerInfo;
659 }
660 
DFileSessionHandleServerSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr,uint8_t socketIndex)661 static void DFileSessionHandleServerSetting(DFileSession *session, DFileFrame *dFileFrame,
662     struct sockaddr_in *peerAddr, uint8_t socketIndex)
663 {
664     SettingFrame hostSettingFrame;
665     DFileConfig dFileConfig;
666     (void)memset_s(&hostSettingFrame, sizeof(hostSettingFrame), 0, sizeof(hostSettingFrame));
667     (void)memset_s(&dFileConfig, sizeof(dFileConfig), 0, sizeof(dFileConfig));
668     LOGI(TAG, "handle Setting Frame, DFileSessionType %u", session->sessionType);
669     if (DecodeSettingFrame((SettingFrame *)dFileFrame, &hostSettingFrame) != NSTACKX_EOK || hostSettingFrame.mtu == 0) {
670         return;
671     }
672 
673     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
674     if (peerInfo != NULL) {
675         if (peerInfo->settingTimeoutCnt >= MAX_NEGOTIATE_TIMEOUT_COUNT) {
676             LOGE(TAG, "receive more than %d Setting for one peer, drop", MAX_NEGOTIATE_TIMEOUT_COUNT);
677             return;
678         } else {
679             DFileSessionSetPeerInfo(peerInfo, SETTING_NEGOTIATING, &hostSettingFrame);
680             DFileSessionSendSetting(peerInfo);
681             goto L_END;
682         }
683     }
684 
685     peerInfo = AllocPeerInfo(session, peerAddr, &hostSettingFrame, socketIndex);
686     if (peerInfo == NULL) {
687         return;
688     }
689 
690 L_END:
691     peerInfo->settingTimeoutCnt++;
692     LOGI(TAG, "DFileServer response Setting Frame. count %u", peerInfo->settingTimeoutCnt);
693     peerInfo->remoteDFileVersion = hostSettingFrame.dFileVersion;
694     if (GetDFileConfig(&dFileConfig, peerInfo->mtuInuse, hostSettingFrame.connType) == NSTACKX_EOK) {
695         SetDFileSessionConfig(session, &dFileConfig, hostSettingFrame.connType, peerInfo);
696     }
697     HandleLinkSeqCap(session, &hostSettingFrame);
698 }
699 
DFileSessionHandleSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr,uint8_t socketIndex)700 static void DFileSessionHandleSetting(DFileSession *session, DFileFrame *dFileFrame,
701     struct sockaddr_in *peerAddr, uint8_t socketIndex)
702 {
703     if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
704         DFileSessionHandleServerSetting(session, dFileFrame, peerAddr, socketIndex);
705     } else if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
706         DFileSessionHandleClientSetting(session, dFileFrame, peerAddr);
707     } else {
708         return;
709     }
710 }
711 
HandleWithoutSettingError(DFileSession * session,const struct sockaddr_in * peerAddr)712 static void HandleWithoutSettingError(DFileSession *session, const struct sockaddr_in *peerAddr)
713 {
714     List *pos = NULL;
715     DFileTrans *trans = NULL;
716 
717     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
718     if (peerInfo == NULL) {
719         LOGE(TAG, "recv unknow peer rst, maybe be attacked");
720         return;
721     }
722 
723     LIST_FOR_EACH(pos, &session->dFileTransChain) {
724         trans = (DFileTrans *)pos;
725         /*
726              * when client recv peer RST NSTACKX_DFILE_WITHOUT_SETTING_ERROR, Set Trans MTU 0,
727              * and will reset after new setting negotion.
728              */
729         if (DFileTransSetMtu(trans, 0) != NSTACKX_EOK) {
730             LOGE(TAG, "DFileTransSetMtu(trans, 0) failed %d", errno);
731         }
732     }
733 
734     LIST_FOR_EACH(pos, &session->peerInfoChain) {
735         peerInfo = (PeerInfo *)pos;
736         if (memcmp(&peerInfo->dstAddr, peerAddr, sizeof(struct sockaddr_in)) == 0 &&
737             peerInfo->session->sessionId == session->sessionId) {
738             TimerDelete(peerInfo->settingTimer);
739             peerInfo->settingTimer = NULL;
740             peerInfo->state = SETTING_NEGOTIATING;
741             LOGD(TAG, "Send Setting Frame");
742             DFileSessionSendSetting(peerInfo);
743             break;
744         }
745     }
746 }
747 
DFileSessionHandleRst(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)748 static void DFileSessionHandleRst(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
749 {
750     uint16_t errCode = 0;
751     if (DecodeRstFrame((RstFrame *)dFileFrame, &errCode, NULL, NULL) != NSTACKX_EOK) {
752         return;
753     }
754 
755     uint16_t transId = ntohs(dFileFrame->header.transId);
756     LOGD(TAG, "handle RST (%u) frame, transId %u", errCode, transId);
757 
758     switch (errCode) {
759         case NSTACKX_DFILE_WITHOUT_SETTING_ERROR:
760             HandleWithoutSettingError(session, peerAddr);
761             break;
762         default:
763             LOGE(TAG, "Unspported error code %u", errCode);
764             break;
765     }
766 }
767 
DFileSessionResolveBackPress(DFileSession * session,DataBackPressure backPress,uint32_t count)768 static void DFileSessionResolveBackPress(DFileSession *session, DataBackPressure backPress, uint32_t count)
769 {
770     uint32_t index;
771 
772     if (PthreadMutexLock(&session->backPressLock) != 0) {
773         LOGE(TAG, "pthread backPressLock mutex lock failed");
774         return;
775     }
776 
777     if (backPress.recvListOverIo == 1) {
778         for (index = 0; index < count; index++) {
779             session->stopSendCnt[index]++;
780         }
781     } else {
782         for (index = 0; index < count; index++) {
783             session->stopSendCnt[index] = 0;
784         }
785     }
786 
787     if (PthreadMutexUnlock(&session->backPressLock) != 0) {
788         LOGE(TAG, "pthread backPressLock mutex unlock failed");
789         return;
790     }
791 
792     return;
793 }
794 
DFileSessionHandleBackPressure(DFileSession * session,const DFileFrame * dFileFrame,const struct sockaddr_in * peerAddr)795 static void DFileSessionHandleBackPressure(DFileSession *session, const DFileFrame *dFileFrame,
796     const struct sockaddr_in *peerAddr)
797 {
798     DataBackPressure backPress;
799     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
800     if (peerInfo == NULL) {
801         LOGE(TAG, "can't get valid peerinfo");
802         return;
803     }
804 
805     if (DecodeBackPressFrame((BackPressureFrame *)dFileFrame, &backPress) != NSTACKX_EOK) {
806         return;
807     }
808 
809     DFileSessionResolveBackPress(session, backPress, session->clientSendThreadNum);
810 
811     LOGI(TAG, "handle back pressure recvListOverIo %u recvBufThreshold %u stopSendPeriod %u",
812          backPress.recvListOverIo, backPress.recvBufThreshold,
813          backPress.stopSendPeriod);
814 
815     return;
816 }
817 
CreateTrans(uint16_t transId,DFileSession * session,PeerInfo * peerInfo,uint8_t isSender)818 static DFileTrans *CreateTrans(uint16_t transId, DFileSession *session, PeerInfo *peerInfo, uint8_t isSender)
819 {
820     DFileTransPara transPara;
821 
822     if (peerInfo == NULL) {
823         return NULL;
824     }
825     (void)memset_s(&transPara, sizeof(transPara), 0, sizeof(transPara));
826     transPara.isSender = isSender;
827     transPara.transId = transId; /* for receiver, use transId of sender */
828     transPara.fileManager = session->fileManager;
829     transPara.writeHandle = DFileWriteHandle;
830     transPara.msgReceiver = DTransMsgReceiver;
831     transPara.connType = peerInfo->connType;
832     transPara.context = peerInfo;
833     transPara.session = session;
834     transPara.onRenameFile = session->onRenameFile;
835 
836     ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
837     return DFileTransCreate(&transPara);
838 }
839 
DFileSessionHandleFrame(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)840 static int32_t DFileSessionHandleFrame(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
841 {
842     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
843     if (peerInfo == NULL) {
844         LOGI(TAG, "can't find peerInfo");
845         return NSTACKX_EFAILED;
846     }
847 
848     if (peerInfo->session->sessionType == DFILE_SESSION_TYPE_SERVER && peerInfo->state == SETTING_NEGOTIATING) {
849         peerInfo->state = SETTING_NEGOTIATED;
850         TimerDelete(peerInfo->settingTimer);
851         peerInfo->settingTimer = NULL;
852     }
853 
854     uint16_t transId = ntohs(dFileFrame->header.transId);
855     if (transId == 0) {
856         LOGE(TAG, "transId is 0");
857         return NSTACKX_EFAILED;
858     }
859 
860     DFileTrans *trans = SearchDFileTransNode(&(session->dFileTransChain), transId);
861     if (trans == NULL) {
862         if (dFileFrame->header.type != NSTACKX_DFILE_FILE_HEADER_FRAME) {
863             /* Only HEADER frame can start dfile transfer (receiver) */
864             LOGE(TAG, "trans %u is NULL && type is %u", transId, dFileFrame->header.type);
865             return NSTACKX_EFAILED;
866         }
867         if (IsTransIdDone(session, transId) == NSTACKX_EOK) {
868             return NSTACKX_EFAILED;
869         }
870         trans = CreateTrans(transId, session, peerInfo, NSTACKX_FALSE);
871         if (trans == NULL) {
872             DFileMsg data;
873             (void)memset_s(&data, sizeof(data), 0, sizeof(data));
874             data.errorCode = NSTACKX_ENOMEM;
875             NotifyMsgRecver(session, DFILE_ON_FATAL_ERROR, &data);
876             LOGE(TAG, "trans is NULL");
877             return NSTACKX_EFAILED;
878         }
879         if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
880             LOGE(TAG, "set trans mtu failed");
881         }
882         CalculateSessionTransferRatePrepare(session);
883         ListInsertTail(&(session->dFileTransChain), &(trans->list));
884         peerInfo->currentTransCount++;
885     }
886 
887     return HandleDFileFrame(trans, dFileFrame);
888 }
889 
DFileWriteHandle(const uint8_t * frame,size_t len,void * context)890 int32_t DFileWriteHandle(const uint8_t *frame, size_t len, void *context)
891 {
892     PeerInfo *peerInfo = context;
893     DFileSession *session = peerInfo->session;
894     QueueNode *queueNode = NULL;
895     struct sockaddr_in peerAddr;
896 
897     peerAddr = peerInfo->dstAddr;
898     queueNode = CreateQueueNode(frame, len, &peerAddr, peerInfo->socketIndex);
899     if (queueNode == NULL) {
900         return NSTACKX_ENOMEM;
901     }
902 
903     if (PthreadMutexLock(&session->outboundQueueLock) != 0) {
904         LOGE(TAG, "pthread mutex lock failed");
905         free(queueNode->frame);
906         free(queueNode);
907         return NSTACKX_EFAILED;
908     }
909     ListInsertTail(&session->outboundQueue, &queueNode->list);
910     session->outboundQueueSize++;
911     if (PthreadMutexUnlock(&session->outboundQueueLock) != 0) {
912         /* Don't need to free node, as it's mount to outboundQueue */
913         LOGE(TAG, "pthread mutex unlock failed");
914         return NSTACKX_EFAILED;
915     }
916     SemPost(&session->outboundQueueWait[peerInfo->socketIndex]);
917     return (int32_t)len;
918 }
919 
BindMainLoopToTargetCpu(void)920 static void BindMainLoopToTargetCpu(void)
921 {
922     int32_t cpu;
923     int32_t cpus = GetCpuNum();
924     if (cpus >= FIRST_CPU_NUM_LEVEL) {
925         return;
926     } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
927         cpu = CPU_IDX_0;
928     } else {
929         return;
930     }
931     StartThreadBindCore(cpu);
932 }
933 
GetEpollWaitTimeOut(DFileSession * session)934 static int64_t GetEpollWaitTimeOut(DFileSession *session)
935 {
936     int64_t minTimeout = DEFAULT_WAIT_TIME_MS;
937     if (session->mainLoopActiveReadFlag && session->inboundQueueSize) {
938         minTimeout = 0;
939         return minTimeout;
940     }
941     List *pos = NULL;
942     DFileTrans *trans = NULL;
943     int64_t timeout;
944     LIST_FOR_EACH(pos, &session->dFileTransChain) {
945         trans = (DFileTrans *)pos;
946         timeout = DFileTransGetTimeout(trans);
947         if (timeout >= 0 && timeout < minTimeout) {
948             minTimeout = timeout;
949         }
950     }
951     if (minTimeout > DEFAULT_WAIT_TIME_MS) {
952         minTimeout = DEFAULT_WAIT_TIME_MS;
953     }
954     return minTimeout;
955 }
956 
ProcessSessionTrans(const DFileSession * session,uint16_t exceptTransId)957 static void ProcessSessionTrans(const DFileSession *session, uint16_t exceptTransId)
958 {
959     List *tmp = NULL;
960     List *pos = NULL;
961     LIST_FOR_EACH_SAFE(pos, tmp, &session->dFileTransChain) {
962         DFileTrans *trans = (DFileTrans *)pos;
963         if (trans->transId != exceptTransId) {
964             DFileTransProcess(trans);
965         }
966     }
967 }
968 
NotifyBindPort(const DFileSession * session)969 static void NotifyBindPort(const DFileSession *session)
970 {
971     DFileMsg data;
972     int32_t socketNum;
973     (void)memset_s(&data, sizeof(data), 0, sizeof(data));
974     socketNum = 1;
975 
976     for (int i = 0; i < socketNum; i++) {
977         data.sockAddr[i].sin_port = ntohs(session->socket[i]->srcAddr.sin_port);
978         data.sockAddr[i].sin_addr.s_addr = ntohl(session->socket[i]->srcAddr.sin_addr.s_addr);
979     }
980     NotifyMsgRecver(session, DFILE_ON_BIND, &data);
981 }
982 
DFileMainLoop(void * arg)983 void *DFileMainLoop(void *arg)
984 {
985     DFileSession *session = arg;
986     int32_t ret = NSTACKX_EOK;
987     DFileMsg msgData;
988     (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
989     uint8_t isBind = NSTACKX_FALSE;
990     LOGI(TAG, "main thread start");
991     SetThreadName(DFFILE_MAIN_THREAD_NAME);
992     SetMaximumPriorityForThread();
993     SetTidToBindInfo(session, POS_MAIN_THERD_START);
994     NotifyBindPort(session);
995     while (!session->closeFlag) {
996         int64_t minTimeout = GetEpollWaitTimeOut(session);
997         ret = EpollLoop(session->epollfd, (int32_t)minTimeout);
998         if (ret == NSTACKX_EFAILED) {
999             LOGE(TAG, "epoll wait failed");
1000             break;
1001         }
1002         if (isBind == NSTACKX_FALSE && session->transFlag == NSTACKX_TRUE) {
1003             BindMainLoopToTargetCpu();
1004             isBind = NSTACKX_TRUE;
1005         }
1006         ProcessSessionTrans(session, 0);
1007         if (session->mainLoopActiveReadFlag && session->inboundQueueSize) {
1008             session->partReadFlag = NSTACKX_TRUE;
1009             ReadEventHandle(session);
1010             session->partReadFlag = NSTACKX_FALSE;
1011         }
1012     }
1013 
1014     if (ret == NSTACKX_EFAILED || DFileSessionCheckFatalFlag(session)) {
1015         msgData.errorCode = NSTACKX_EFAILED;
1016         NotifyMsgRecver(session, DFILE_ON_FATAL_ERROR, &msgData);
1017     }
1018 
1019     /* Notify sender thread to terminate */
1020     PostOutboundQueueWait(session);
1021 
1022     /* Unblock "select" and notify receiver thread to terminate */
1023     NotifyPipeEvent(session);
1024     return NULL;
1025 }
1026 
AmendPeerInfoSendRate(PeerInfo * peerInfo)1027 static void AmendPeerInfoSendRate(PeerInfo *peerInfo)
1028 {
1029     peerInfo->amendSendRate = peerInfo->sendRate;
1030     LOGI(TAG, "current: sendrate %u, realsendframerate %u---new amendSendRate %d",
1031          peerInfo->sendRate, peerInfo->sendFrameRate, peerInfo->amendSendRate);
1032 }
1033 
DFileSendCalculateRate(DFileSession * session,PeerInfo * peerInfo)1034 static void DFileSendCalculateRate(DFileSession *session, PeerInfo *peerInfo)
1035 {
1036     uint64_t sendCount;
1037 
1038     if (session->sessionType != DFILE_SESSION_TYPE_CLIENT) {
1039         return;
1040     }
1041 
1042     struct timespec nowTime;
1043     ClockGetTime(CLOCK_MONOTONIC, &nowTime);
1044     uint64_t measureElapse = GetTimeDiffUs(&nowTime, &peerInfo->startTime);
1045     /* just calculate io rate */
1046     if (measureElapse > peerInfo->rateStateInterval) {
1047         /* ONLY PEER 0 calculate io rate */
1048         sendCount = (uint64_t)peerInfo->sendCount;
1049         if (peerInfo->socketIndex == 0) {
1050             session->fileManager->iorRate = (uint32_t)(session->fileManager->iorBytes *
1051                 NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1052             LOGI(TAG, "IO read rate: %u MB/s send list full times %u", session->fileManager->iorRate,
1053                 session->fileManager->sendListFullTimes);
1054             session->fileManager->sendListFullTimes = 0;
1055             session->fileManager->iorBytes = 0;
1056         }
1057 
1058         peerInfo->sendFrameRate = (uint32_t)(sendCount  * DATA_FRAME_SEND_INTERVAL_US / measureElapse);
1059         peerInfo->sendCountRateMB = (uint32_t)(sendCount * peerInfo->dataFrameSize *
1060             NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1061         if (peerInfo->qdiscSearchNum != 0) {
1062             peerInfo->qdiscAveLeft = peerInfo->qdiscAveLeft / peerInfo->qdiscSearchNum;
1063         }
1064 
1065         LOGI(TAG, "framesize %u maxsendrate %u sendRate %u, amendSendRate %d sendCount %llu,"
1066                   "measureElapse %llu sendFrameRate %u %uMB/s,"
1067                   "total send block num %llu eAgainCount %u send list empty times %u sleep times %u, noPendingData %u,"
1068                   "min qdisc %u max qdisc %u search num %u ave qdisc %u"
1069                   "totalRecvBlocks %llu socket:%u "
1070                   "overRun %llu maxRetryCountPerSec %u maxRetryCountLastSec %u wlanCatagory %u",
1071              peerInfo->dataFrameSize, peerInfo->maxSendRate, peerInfo->sendRate, peerInfo->amendSendRate,
1072              peerInfo->sendCount, measureElapse, peerInfo->sendFrameRate, peerInfo->sendCountRateMB,
1073              NSTACKX_ATOM_FETCH(&(session->totalSendBlocks)), peerInfo->eAgainCount,
1074              NSTACKX_ATOM_FETCH(&(session->sendBlockListEmptyTimes)), session->sleepTimes,
1075              NSTACKX_ATOM_FETCH(&(session->noPendingDataTimes)), peerInfo->qdiscMinLeft,
1076              peerInfo->qdiscMaxLeft, peerInfo->qdiscSearchNum, peerInfo->qdiscAveLeft,
1077              NSTACKX_ATOM_FETCH(&(session->totalRecvBlocks)), peerInfo->socketIndex,
1078              peerInfo->overRun, peerInfo->maxRetryCountPerSec, peerInfo->maxRetryCountLastSec, session->wlanCatagory);
1079         AmendPeerInfoSendRate(peerInfo);
1080         ClearSessionStats(session);
1081         ClearPeerinfoStats(peerInfo);
1082         ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1083     }
1084 }
1085 
UpdateAllTransRetryCount(DFileSession * session,PeerInfo * peerInfo)1086 void UpdateAllTransRetryCount(DFileSession *session, PeerInfo *peerInfo)
1087 {
1088     List *pos = NULL;
1089     DFileTrans *trans = NULL;
1090     uint32_t allRetryCount = 0;
1091     LIST_FOR_EACH(pos, &session->dFileTransChain) { /* for client , only one peer */
1092         trans = (DFileTrans *)pos;
1093         allRetryCount += trans->retryCount;
1094     }
1095     peerInfo->allDtransRetryCount = allRetryCount;
1096 }
1097 
DFileRecvCalculateRate(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)1098 static void DFileRecvCalculateRate(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
1099 {
1100     struct timespec nowTime;
1101     uint64_t recvCount;
1102     uint32_t timeOut;
1103 
1104     if (session->sessionType != DFILE_SESSION_TYPE_SERVER ||
1105         dFileFrame->header.type != NSTACKX_DFILE_FILE_DATA_FRAME) {
1106         return;
1107     }
1108 
1109     PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
1110     if (peerInfo == NULL) {
1111         return;
1112     }
1113 
1114     timeOut = (peerInfo->connType == CONNECT_TYPE_P2P) ? NSTACKX_P2P_MAX_CONTROL_FRAME_TIMEOUT :
1115         NSTACKX_WLAN_MAX_CONTROL_FRAME_TIMEOUT;
1116 
1117     peerInfo->recvCount++;
1118     ClockGetTime(CLOCK_MONOTONIC, &nowTime);
1119     uint64_t measureElapse = GetTimeDiffUs(&nowTime, &session->measureBefore);
1120     if (measureElapse > peerInfo->rateStateInterval) {
1121         session->fileManager->iowRate = (uint32_t)(session->fileManager->iowBytes *
1122             NSTACKX_MICRO_TICKS / measureElapse / DFILE_KILOBYTES);
1123         session->fileManager->iowCount = session->fileManager->iowBytes / peerInfo->dataFrameSize *
1124             (NSTACKX_MILLI_TICKS * timeOut - peerInfo->rateStateInterval) / measureElapse;
1125         if (session->fileManager->iowRate > session->fileManager->iowMaxRate) {
1126             session->fileManager->iowMaxRate = session->fileManager->iowRate;
1127         }
1128         LOGI(TAG, "measureElapse %llu iowBytes %llu iowCount %llu IO write rate : %u KB/s", measureElapse,
1129              session->fileManager->iowBytes, session->fileManager->iowCount, session->fileManager->iowRate);
1130         session->fileManager->iowBytes = 0;
1131         ClockGetTime(CLOCK_MONOTONIC, &session->measureBefore);
1132     }
1133 
1134     measureElapse = GetTimeDiffUs(&nowTime, &peerInfo->startTime);
1135     if (measureElapse > peerInfo->rateStateInterval) {
1136         recvCount = (uint64_t)peerInfo->recvCount;
1137         peerInfo->recvFrameRate = (uint32_t)(recvCount * DATA_FRAME_SEND_INTERVAL_US / measureElapse);
1138         peerInfo->recvCountRateMB = (uint32_t)(recvCount * peerInfo->dataFrameSize *
1139             NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1140         peerInfo->recvCount = 0;
1141         ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1142     }
1143 }
1144 
CheckElapseTime(const struct timespec * before,uint64_t overRun)1145 static uint64_t CheckElapseTime(const struct timespec *before, uint64_t overRun)
1146 {
1147     struct timespec now;
1148     ClockGetTime(CLOCK_MONOTONIC, &now);
1149     uint64_t elapseUs = GetTimeDiffUs(&now, before);
1150     elapseUs += overRun;
1151     if (elapseUs < DATA_FRAME_SEND_INTERVAL_US) {
1152         if (usleep((useconds_t)((uint64_t)DATA_FRAME_SEND_INTERVAL_US - elapseUs)) != NSTACKX_EOK) {
1153             LOGE(TAG, "usleep(DATA_FRAME_SEND_INTERVAL_US - elapseUs) failed %d", errno);
1154         }
1155         return 0;
1156     } else {
1157         uint64_t delta = (elapseUs - DATA_FRAME_SEND_INTERVAL_US);
1158         return delta;
1159     }
1160 }
1161 
BindClientSendThreadToTargetCpu(uint32_t idx)1162 static void BindClientSendThreadToTargetCpu(uint32_t idx)
1163 {
1164     int32_t cpu;
1165     int32_t cpus = GetCpuNum();
1166     if (cpus >= FIRST_CPU_NUM_LEVEL) {
1167         return;
1168     } else if (cpus >= SECOND_CPU_NUM_LEVEL) {
1169         cpu = CPU_IDX_1 + (int32_t)idx;
1170     } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
1171         cpu = CPU_IDX_1;
1172     } else {
1173         return;
1174     }
1175     if (cpu > cpus) {
1176         cpu = cpus - 1;
1177     }
1178     StartThreadBindCore(cpu);
1179 }
1180 
DFileSenderUpdateMeasureTime(DFileSession * session,uint8_t socketIndex)1181 static void DFileSenderUpdateMeasureTime(DFileSession *session, uint8_t socketIndex)
1182 {
1183     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
1184         PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
1185         if (peerInfo == NULL) {
1186             return;
1187         }
1188         ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1189         BindClientSendThreadToTargetCpu(0);
1190     }
1191 }
1192 
TerminateMainThreadFatalInner(void * arg)1193 static void TerminateMainThreadFatalInner(void *arg)
1194 {
1195     DFileSession *session = (DFileSession *)arg;
1196     DFileSessionSetFatalFlag(session);
1197 }
1198 
PostFatalEvent(DFileSession * session)1199 static void PostFatalEvent(DFileSession *session)
1200 {
1201     if (PostEvent(&session->eventNodeChain, session->epollfd, TerminateMainThreadFatalInner, session) != NSTACKX_EOK) {
1202         LOGE(TAG, "PostEvent TerminateMainThreadFatalInner failed");
1203         DFileSessionSetFatalFlag(session);
1204     }
1205 }
1206 
GetSocketWaitMs(uint32_t threadNum)1207 static uint32_t GetSocketWaitMs(uint32_t threadNum)
1208 {
1209     if (threadNum > 1) {
1210         return MULTI_THREADS_SOCKET_WAIT_TIME_MS;
1211     }
1212     return DEFAULT_WAIT_TIME_MS;
1213 }
1214 
SetSendThreadName(uint32_t threadIdx)1215 static void SetSendThreadName(uint32_t threadIdx)
1216 {
1217     char name[MAX_THREAD_NAME_LEN] = {0};
1218     if (sprintf_s(name, sizeof(name), "%s%u", DFFILE_SEND_THREAD_NAME_PREFIX, threadIdx) < 0) {
1219         LOGE(TAG, "sprintf send thead name failed");
1220     }
1221     SetThreadName(name);
1222 }
1223 
1224 typedef struct {
1225     struct DFileSession *session;
1226     uint32_t threadIdx;
1227 }SendThreadCtx;
1228 
DFileAddiSenderHandle(void * arg)1229 static void *DFileAddiSenderHandle(void *arg)
1230 {
1231     SendThreadCtx *sendThreadCtx = (SendThreadCtx *)arg;
1232     struct DFileSession *session = sendThreadCtx->session;
1233     uint32_t threadIdx = sendThreadCtx->threadIdx;
1234     free(sendThreadCtx);
1235     int32_t ret = NSTACKX_EOK;
1236     LOGI(TAG, "send thread %u start", threadIdx);
1237     SetSendThreadName(threadIdx);
1238     SetTidToBindInfo(session, threadIdx + POS_SEND_THERD_START);
1239     List unsent;
1240     uint8_t canWrite = NSTACKX_FALSE;
1241     uint32_t socketWaitMs = GetSocketWaitMs(session->clientSendThreadNum);
1242     BindClientSendThreadToTargetCpu(threadIdx + 1);
1243     ListInitHead(&unsent);
1244     while (!session->addiSenderCloseFlag) {
1245         if (ListIsEmpty(&unsent) && !FileManagerHasPendingData(session->fileManager)) {
1246             NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
1247             SemWait(&session->sendThreadPara[threadIdx].sendWait);
1248             if (session->addiSenderCloseFlag) {
1249                 break;
1250             }
1251         }
1252         if (ret == NSTACKX_EAGAIN) {
1253             ret = WaitSocketEvent(NULL, session->socket[0]->sockfd, socketWaitMs, NULL, &canWrite);
1254             if (ret != NSTACKX_EOK || session->closeFlag) {
1255                 break;
1256             }
1257             if (!canWrite) {
1258                 ret = NSTACKX_EAGAIN;
1259                 continue;
1260             }
1261         }
1262         ret = SendDataFrame(session, &unsent, threadIdx, 0);
1263         if ((ret < 0 && ret != NSTACKX_EAGAIN) || session->closeFlag) {
1264             break;
1265         }
1266         if (ret == NSTACKX_EAGAIN) {
1267             continue;
1268         }
1269         SemWait(&session->sendThreadPara[threadIdx].semNewCycle);
1270     }
1271     if (ret < 0 && ret != NSTACKX_EAGAIN) {
1272         PostFatalEvent(session);
1273     }
1274     DestroyIovList(&unsent, session, threadIdx);
1275     return NULL;
1276 }
1277 
CloseAddiSendThread(struct DFileSession * session)1278 static void CloseAddiSendThread(struct DFileSession *session)
1279 {
1280     if (session->sessionType == DFILE_SESSION_TYPE_SERVER || session->clientSendThreadNum <= 1) {
1281         return;
1282     }
1283     session->addiSenderCloseFlag = NSTACKX_TRUE;
1284     for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
1285         SemPost(&session->sendThreadPara[i].sendWait);
1286         SemPost(&session->sendThreadPara[i].semNewCycle);
1287         PthreadJoin(session->sendThreadPara[i].senderTid, NULL);
1288         SemDestroy(&session->sendThreadPara[i].sendWait);
1289         SemDestroy(&session->sendThreadPara[i].semNewCycle);
1290     }
1291 }
1292 
ErrorHandle(struct DFileSession * session,uint16_t cnt)1293 static void ErrorHandle(struct DFileSession *session, uint16_t cnt)
1294 {
1295     while (cnt > 0) {
1296         SemPost(&session->sendThreadPara[cnt - 1].sendWait);
1297         SemPost(&session->sendThreadPara[cnt - 1].semNewCycle);
1298         PthreadJoin(session->sendThreadPara[cnt - 1].senderTid, NULL);
1299         SemDestroy(&session->sendThreadPara[cnt - 1].sendWait);
1300         SemDestroy(&session->sendThreadPara[cnt - 1].semNewCycle);
1301         cnt--;
1302     }
1303 }
1304 
CreateAddiSendThread(struct DFileSession * session)1305 static int32_t CreateAddiSendThread(struct DFileSession *session)
1306 {
1307     uint16_t i;
1308     if (session->sessionType == DFILE_SESSION_TYPE_SERVER || session->clientSendThreadNum <= 1) {
1309         return NSTACKX_EOK;
1310     }
1311     session->addiSenderCloseFlag = NSTACKX_FALSE;
1312     for (i = 0; i < session->clientSendThreadNum - 1; i++) {
1313         SendThreadCtx *sendThreadCtx = (SendThreadCtx *)calloc(1, sizeof(SendThreadCtx));
1314         if (sendThreadCtx == NULL) {
1315             goto L_ERR_SENDER_THREAD;
1316         }
1317 
1318         if (SemInit(&session->sendThreadPara[i].sendWait, 0, 0) != 0) {
1319             free(sendThreadCtx);
1320             goto L_ERR_SENDER_THREAD;
1321         }
1322 
1323         if (SemInit(&session->sendThreadPara[i].semNewCycle, 0, 0) != 0) {
1324             SemDestroy(&session->sendThreadPara[i].sendWait);
1325             free(sendThreadCtx);
1326             goto L_ERR_SENDER_THREAD;
1327         }
1328 
1329         sendThreadCtx->session = session;
1330         sendThreadCtx->threadIdx = i;
1331         if (PthreadCreate(&(session->sendThreadPara[i].senderTid), NULL, DFileAddiSenderHandle, sendThreadCtx)) {
1332             LOGE(TAG, "Create sender thread failed");
1333             SemDestroy(&session->sendThreadPara[i].sendWait);
1334             SemDestroy(&session->sendThreadPara[i].semNewCycle);
1335             free(sendThreadCtx);
1336             goto L_ERR_SENDER_THREAD;
1337         }
1338     }
1339     return NSTACKX_EOK;
1340 
1341 L_ERR_SENDER_THREAD:
1342     session->addiSenderCloseFlag = NSTACKX_TRUE;
1343     ErrorHandle(session, i);
1344     return NSTACKX_EFAILED;
1345 }
1346 
UpdatePeerinfoQdiscInfo(PeerInfo * peerInfo,uint32_t qDiscLeft)1347 static void UpdatePeerinfoQdiscInfo(PeerInfo *peerInfo, uint32_t qDiscLeft)
1348 {
1349     if (peerInfo->qdiscMinLeft == 0) {
1350         peerInfo->qdiscMinLeft = (uint16_t)qDiscLeft;
1351     } else if (peerInfo->qdiscMinLeft > qDiscLeft) {
1352         peerInfo->qdiscMinLeft = (uint16_t)qDiscLeft;
1353     }
1354     if (peerInfo->qdiscMaxLeft < qDiscLeft) {
1355         peerInfo->qdiscMaxLeft = (uint16_t)qDiscLeft;
1356     }
1357     peerInfo->qdiscSearchNum++;
1358     peerInfo->qdiscAveLeft += qDiscLeft;
1359 }
1360 
UpdatePeerinfoAmendSendrateByQdisc(PeerInfo * peerInfo)1361 static void UpdatePeerinfoAmendSendrateByQdisc(PeerInfo *peerInfo)
1362 {
1363     uint32_t qDiscLeft;
1364     uint32_t qDiscLeftSendRate;
1365 
1366     if (GetQdiscLen(peerInfo->localInterfaceName, ROOT_QUEUE, &qDiscLeft) == NSTACKX_EOK) {
1367         if (peerInfo->mtuInuse == 0) {
1368             return;
1369         }
1370         uint32_t mtuNumInOneFrame = peerInfo->dataFrameSize / peerInfo->mtuInuse;
1371         if (mtuNumInOneFrame == 0) {
1372             mtuNumInOneFrame = 1;
1373         }
1374         UpdatePeerinfoQdiscInfo(peerInfo, qDiscLeft);
1375         qDiscLeftSendRate = qDiscLeft / mtuNumInOneFrame;
1376         if (peerInfo->sendRate >= qDiscLeftSendRate) {
1377             peerInfo->amendSendRate = (int32_t)qDiscLeftSendRate;
1378         } else {
1379             peerInfo->amendSendRate = peerInfo->sendRate;
1380         }
1381     }
1382     return;
1383 }
1384 
UpdateInfoAfterSend(DFileSession * session,PeerInfo * peerInfo,int32_t curAmendSendRate,struct timespec * before,uint8_t socketIndex)1385 static void UpdateInfoAfterSend(DFileSession *session, PeerInfo *peerInfo, int32_t curAmendSendRate,
1386     struct timespec *before, uint8_t socketIndex)
1387 {
1388     session->cycleRunning[socketIndex] = NSTACKX_FALSE;
1389     if (curAmendSendRate > 0 && peerInfo->intervalSendCount >= (uint32_t)curAmendSendRate) {
1390         if (peerInfo->decreaseStatus == 1) {
1391             peerInfo->overRun = 0;
1392             peerInfo->decreaseStatus = 0;
1393         }
1394         peerInfo->overRun = CheckElapseTime(before, peerInfo->overRun);
1395         if (peerInfo->overRun == 0) {
1396             session->sleepTimes++;
1397         }
1398     }
1399     peerInfo->intervalSendCount = 0;
1400 
1401     UpdatePeerinfoAmendSendrateByQdisc(peerInfo);
1402 }
1403 
DFileSessionSendFrame(DFileSession * session,QueueNode ** preQueueNode,List * unsent,struct timespec * before,uint8_t socketIndex)1404 static int32_t DFileSessionSendFrame(DFileSession *session, QueueNode **preQueueNode, List *unsent,
1405                                      struct timespec *before, uint8_t socketIndex)
1406 {
1407     int32_t ret;
1408 
1409     if (CapsTcp(session) && !ListIsEmpty(unsent)) {
1410         session->sendRemain = 1;
1411         ret = SendDataFrame(session, unsent, (uint32_t)(session->clientSendThreadNum - 1), socketIndex);
1412         session->sendRemain = 0;
1413         if ((ret == NSTACKX_EFAILED || ret == NSTACKX_EAGAIN) || session->closeFlag) {
1414             LOGI(TAG, "ret is %d", ret);
1415             return ret;
1416         }
1417     }
1418 
1419     ret = SendOutboundFrame(session, preQueueNode);
1420     if (session->sessionType != DFILE_SESSION_TYPE_CLIENT) {
1421         return ret;
1422     }
1423     PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
1424     if (peerInfo == NULL) {
1425         LOGE(TAG, "can't get valid peerinfo");
1426         return NSTACKX_EFAILED;
1427     }
1428     if (peerInfo->mtuInuse == 0) {
1429         return ret;
1430     }
1431 
1432     if (ret == NSTACKX_EOK) {
1433         if (!session->cycleRunning[socketIndex]) {
1434             session->cycleRunning[socketIndex] = NSTACKX_TRUE;
1435         }
1436         ret = SendDataFrame(session, unsent, (uint32_t)(session->clientSendThreadNum - 1), socketIndex);
1437     }
1438     int32_t curAmendSendRate = peerInfo->amendSendRate;
1439     DFileSendCalculateRate(session, peerInfo);
1440 
1441     if ((ret == NSTACKX_EFAILED || ret == NSTACKX_EAGAIN) || session->closeFlag) {
1442         LOGI(TAG, "ret is %d and peerInfo->intervalSendCount is %u", ret, peerInfo->intervalSendCount);
1443         return ret;
1444     }
1445 
1446     UpdateInfoAfterSend(session, peerInfo, curAmendSendRate, before, socketIndex);
1447 
1448     for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
1449         SemPost(&session->sendThreadPara[i].semNewCycle);
1450     }
1451     return ret;
1452 }
1453 
WaitNewSendData(const QueueNode * queueNode,const List * unsent,DFileSession * session,uint8_t socketIndex)1454 static void WaitNewSendData(const QueueNode *queueNode, const List *unsent, DFileSession *session, uint8_t socketIndex)
1455 {
1456     if (queueNode == NULL && ListIsEmpty(unsent) && !FileManagerHasPendingData(session->fileManager) &&
1457         !session->outboundQueueSize) {
1458         NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
1459         SemWait(&session->outboundQueueWait[socketIndex]);
1460     }
1461 }
1462 
DFileSenderPre(DFileSession * session,uint8_t socketIndex)1463 static void DFileSenderPre(DFileSession *session, uint8_t socketIndex)
1464 {
1465     SetSendThreadName((uint32_t)(session->clientSendThreadNum + socketIndex - 1));
1466     DFileSenderUpdateMeasureTime(session, socketIndex);
1467     SetMaximumPriorityForThread();
1468     uint32_t pos = (uint32_t)(session->clientSendThreadNum - 1 + socketIndex + POS_SEND_THERD_START);
1469     if (pos >= POS_SEND_THERD_START + NSTACKX_MAX_CLIENT_SEND_THREAD_NUM) {
1470         return;
1471     }
1472 
1473     SetTidToBindInfo(session, pos);
1474 }
1475 
DFileSenderClose(DFileSession * session,QueueNode * queueNode,List * unsent,void * arg)1476 void DFileSenderClose(DFileSession *session, QueueNode *queueNode, List *unsent, void *arg)
1477 {
1478     LOGI(TAG, "DFileSendCalculateRate: total send block num %llu.", session->totalSendBlocks);
1479     CloseAddiSendThread(session);
1480     DestroyIovList(unsent, session, session->clientSendThreadNum - 1U);
1481     DestroyQueueNode(queueNode);
1482     free(arg);
1483     return;
1484 }
1485 
DFileSenderHandle(void * arg)1486 void *DFileSenderHandle(void *arg)
1487 {
1488     DFileSession *session = ((SenderThreadPara*)arg)->session;
1489     uint8_t socketIndex = ((SenderThreadPara*)arg)->socketIndex;
1490     QueueNode *queueNode = NULL;
1491     List unsent;
1492     int32_t ret = NSTACKX_EOK;
1493     struct timespec before;
1494     uint8_t canWrite = NSTACKX_FALSE;
1495     uint32_t socketWaitMs = GetSocketWaitMs(session->clientSendThreadNum);
1496     uint8_t isBind = NSTACKX_FALSE;
1497 
1498     if (CreateAddiSendThread(session) != NSTACKX_EOK) {
1499         PostFatalEvent(session);
1500         return NULL;
1501     }
1502     ListInitHead(&unsent);
1503     DFileSenderPre(session, socketIndex);
1504     while (!session->closeFlag) {
1505         WaitNewSendData(queueNode, &unsent, session, socketIndex);
1506         if (session->closeFlag) {
1507             break;
1508         }
1509         if (!session->cycleRunning[socketIndex]) {
1510             ClockGetTime(CLOCK_MONOTONIC, &before);
1511         }
1512         if (ret == NSTACKX_EAGAIN) {
1513             ret = WaitSocketEvent(NULL, session->socket[socketIndex]->sockfd, socketWaitMs, NULL, &canWrite);
1514             if (ret != NSTACKX_EOK || session->closeFlag) {
1515                 break;
1516             }
1517             if (!canWrite) {
1518                 ret = NSTACKX_EAGAIN;
1519                 continue;
1520             }
1521         }
1522         if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && isBind == NSTACKX_FALSE &&
1523             session->transFlag == NSTACKX_TRUE) {
1524             BindClientSendThreadToTargetCpu(0);
1525             isBind = NSTACKX_TRUE;
1526         }
1527         ret = DFileSessionSendFrame(session, &queueNode, &unsent, &before, socketIndex);
1528         if (ret < 0 && ret != NSTACKX_EAGAIN) {
1529             PostFatalEvent(session);
1530             break;
1531         }
1532     }
1533     DFileSenderClose(session, queueNode, &unsent, arg);
1534     return NULL;
1535 }
1536 
ClearDFileFrameList(List * head)1537 static void ClearDFileFrameList(List *head)
1538 {
1539     if (head == NULL || ListIsEmpty(head)) {
1540         return;
1541     }
1542     List *tmp = NULL;
1543     List *pos = NULL;
1544     LIST_FOR_EACH_SAFE(pos, tmp, head) {
1545         QueueNode *node = (QueueNode *)pos;
1546         ListRemoveNode(&node->list);
1547         DestroyQueueNode(node);
1548     }
1549 }
1550 
CheckDfileType(uint8_t type)1551 static inline int32_t CheckDfileType(uint8_t type)
1552 {
1553     if (type >= NSTACKX_DFILE_TYPE_MAX || type < NSTACKX_DFILE_FILE_HEADER_FRAME) {
1554         return NSTACKX_EFAILED;
1555     }
1556     return NSTACKX_EOK;
1557 }
1558 
CheckSessionType(DFileSessionType type)1559 static inline int32_t CheckSessionType(DFileSessionType type)
1560 {
1561     if (type < DFILE_SESSION_TYPE_CLIENT || type > DFILE_SESSION_TYPE_SERVER) {
1562         return NSTACKX_EFAILED;
1563     }
1564     return NSTACKX_EOK;
1565 }
1566 
ProcessDFileFrameList(DFileSession * session,List * head)1567 static void ProcessDFileFrameList(DFileSession *session, List *head)
1568 {
1569     QueueNode *queueNode = NULL;
1570     DFileFrame *dFileFrame = NULL;
1571     struct sockaddr_in *peerAddr = NULL;
1572     int32_t handleFrameRet = NSTACKX_EOK;
1573     while (!ListIsEmpty(head) && !session->closeFlag) {
1574         queueNode = (QueueNode *)ListPopFront(head);
1575         if (queueNode == NULL) {
1576             continue;
1577         }
1578         dFileFrame = (DFileFrame *)(queueNode->frame);
1579         peerAddr = &queueNode->peerAddr;
1580         uint8_t type = dFileFrame->header.type;
1581 
1582         if (ntohs(dFileFrame->header.length) > NSTACKX_MAX_FRAME_SIZE - sizeof(DFileFrameHeader)) {
1583             LOGE(TAG, "header length %u is too big", ntohs(dFileFrame->header.length));
1584             DestroyQueueNode(queueNode);
1585             continue;
1586         }
1587         if (CheckDfileType(dFileFrame->header.type) != NSTACKX_EOK ||
1588             CheckSessionType(session->sessionType) != NSTACKX_EOK) {
1589             handleFrameRet = NSTACKX_EFAILED;
1590         } else if (dFileFrame->header.type == NSTACKX_DFILE_SETTING_FRAME) {
1591             DFileSessionHandleSetting(session, dFileFrame, peerAddr, queueNode->socketIndex);
1592         } else if (dFileFrame->header.type == NSTACKX_DFILE_RST_FRAME &&
1593                    dFileFrame->header.transId == 0) {
1594             DFileSessionHandleRst(session, dFileFrame, peerAddr);
1595         } else if (dFileFrame->header.type == NSTACKX_DFILE_FILE_BACK_PRESSURE_FRAME) {
1596             DFileSessionHandleBackPressure(session, dFileFrame, peerAddr);
1597         } else {
1598             /*
1599              * For NSTACKX_DFILE_FILE_DATA_FRAME, "dFileFrame" may be free when handling failed, and become invalid.
1600              */
1601             handleFrameRet = DFileSessionHandleFrame(session, dFileFrame, peerAddr);
1602         }
1603 
1604         if (handleFrameRet != NSTACKX_EOK || type != NSTACKX_DFILE_FILE_DATA_FRAME) {
1605             /* For FILE_DATA frame, the memory is passed to file manager. */
1606             free(queueNode->frame);
1607             queueNode->frame = NULL;
1608         }
1609         free(queueNode);
1610     }
1611     ClearDFileFrameList(head);
1612 }
1613 
ReadEventHandle(void * arg)1614 static void ReadEventHandle(void *arg)
1615 {
1616     DFileSession *session = arg;
1617     List newHead;
1618     ListInitHead(&newHead);
1619     struct timespec before, now;
1620     if (!session->partReadFlag) {
1621         NSTACKX_ATOM_FETCH_DEC(&session->unprocessedReadEventCount);
1622     } else {
1623         ClockGetTime(CLOCK_MONOTONIC, &before);
1624     }
1625     while (session->inboundQueueSize && !session->closeFlag) {
1626         if (session->partReadFlag) {
1627             ClockGetTime(CLOCK_MONOTONIC, &now);
1628             if (GetTimeDiffMs(&now, &before) >= DEFAULT_WAIT_TIME_MS) {
1629                 break;
1630             }
1631         }
1632         if (PthreadMutexLock(&session->inboundQueueLock) != 0) {
1633             LOGE(TAG, "PthreadMutexLock error");
1634             return;
1635         }
1636         ListMove(&session->inboundQueue, &newHead);
1637         session->recvBlockNumInner += session->inboundQueueSize;
1638         session->inboundQueueSize = 0;
1639         if (PthreadMutexUnlock(&session->inboundQueueLock) != 0) {
1640             LOGE(TAG, "PthreadMutexUnlock error");
1641             break;
1642         }
1643         ProcessDFileFrameList(session, &newHead);
1644     }
1645     ClearDFileFrameList(&newHead);
1646 }
1647 
DFileAddInboundQueue(DFileSession * session,const uint8_t * frame,size_t frameLength,struct sockaddr_in * peerAddr,uint8_t socketIndex)1648 static int32_t DFileAddInboundQueue(DFileSession *session, const uint8_t *frame, size_t frameLength,
1649                                     struct sockaddr_in *peerAddr, uint8_t socketIndex)
1650 {
1651     if (session->inboundQueueSize > MAX_RECVBUF_COUNT) {
1652         if (session->inboundQueueSize % MAX_NOMEM_PRINT == 0) {
1653             LOGI(TAG, "no mem inboundQueueSize:%llu", session->inboundQueueSize);
1654         }
1655         return NSTACKX_ENOMEM;
1656     }
1657     QueueNode *queueNode = CreateQueueNode(frame, frameLength, peerAddr, socketIndex);
1658     if (queueNode == NULL) {
1659         return NSTACKX_ENOMEM;
1660     }
1661 
1662     if (PthreadMutexLock(&session->inboundQueueLock) != 0) {
1663         DestroyQueueNode(queueNode);
1664         return NSTACKX_EFAILED;
1665     }
1666     ListInsertTail(&session->inboundQueue, &queueNode->list);
1667     session->inboundQueueSize++;
1668     session->recvBlockNumDirect++;
1669 
1670     if (PthreadMutexUnlock(&session->inboundQueueLock) != 0) {
1671         /* queue node is pushed to list, don't need to destory here. */
1672         return NSTACKX_EFAILED;
1673     }
1674 
1675     return NSTACKX_EOK;
1676 }
1677 
DFileReceiverUpdateSessionMeasureTime(DFileSession * session)1678 static void DFileReceiverUpdateSessionMeasureTime(DFileSession *session)
1679 {
1680     if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
1681         ClockGetTime(CLOCK_MONOTONIC, &session->measureBefore);
1682     }
1683 }
1684 
BindServerRecvThreadToTargetCpu(DFileSession * session)1685 static void BindServerRecvThreadToTargetCpu(DFileSession *session)
1686 {
1687     int32_t cpu;
1688     int32_t cpus = GetCpuNum();
1689     if (session->sessionType != DFILE_SESSION_TYPE_SERVER) {
1690         cpu = CPU_IDX_2;
1691         StartThreadBindCore(cpu);
1692         return;
1693     }
1694     if (cpus >= FIRST_CPU_NUM_LEVEL) {
1695         return;
1696     } else if (cpus >= SECOND_CPU_NUM_LEVEL) {
1697         cpu = CPU_IDX_1;
1698     } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
1699         cpu = CPU_IDX_0;
1700     } else {
1701         return;
1702     }
1703     StartThreadBindCore(cpu);
1704 }
1705 
PostReadEventToMainLoop(DFileSession * session)1706 static void PostReadEventToMainLoop(DFileSession *session)
1707 {
1708     if (NSTACKX_ATOM_FETCH(&(session->unprocessedReadEventCount)) >= MAX_UNPROCESSED_READ_EVENT_COUNT) {
1709         return;
1710     }
1711     NSTACKX_ATOM_FETCH_INC(&session->unprocessedReadEventCount);
1712     if (PostEvent(&session->eventNodeChain, session->epollfd, ReadEventHandle, session) != NSTACKX_EOK) {
1713         LOGE(TAG, "post read event failed");
1714         NSTACKX_ATOM_FETCH_DEC(&session->unprocessedReadEventCount);
1715         session->mainLoopActiveReadFlag = NSTACKX_TRUE;
1716     } else {
1717         session->mainLoopActiveReadFlag = NSTACKX_FALSE;
1718     }
1719 }
1720 
DFileSessionHandleReadBuffer(DFileSession * session,const uint8_t * buf,size_t bufLen,struct sockaddr_in * peerAddr,uint8_t socketIndex)1721 int32_t DFileSessionHandleReadBuffer(DFileSession *session, const uint8_t *buf, size_t bufLen,
1722                                      struct sockaddr_in *peerAddr, uint8_t socketIndex)
1723 {
1724     DFileFrame *dFileFrame = NULL;
1725     if (DecodeDFileFrame(buf, bufLen, &dFileFrame) != NSTACKX_EOK) {
1726         /* discard packet with non-zero trans id during cancel stage */
1727         LOGE(TAG, "drop malformed frame");
1728         return NSTACKX_EOK;
1729     }
1730 
1731     int32_t ret = DFileAddInboundQueue(session, buf, bufLen, peerAddr, socketIndex);
1732     if (ret == NSTACKX_ENOMEM) {
1733         return NSTACKX_EOK;
1734     }
1735     if (ret != NSTACKX_EOK) {
1736         LOGE(TAG, "frame add in bound queue failed :%d", ret);
1737         return NSTACKX_EFAILED;
1738     }
1739     PostReadEventToMainLoop(session);
1740     DFileRecvCalculateRate(session, dFileFrame, peerAddr);
1741     return NSTACKX_EOK;
1742 }
1743 
DFileRecverPre(DFileSession * session)1744 static void DFileRecverPre(DFileSession *session)
1745 {
1746     SetThreadName(DFFILE_RECV_THREAD_NAME);
1747     DFileReceiverUpdateSessionMeasureTime(session);
1748     SetMaximumPriorityForThread();
1749     SetTidToBindInfo(session, POS_RECV_THERD_START);
1750 }
1751 
RcverWaitSocket(DFileSession * session,uint8_t * canRead)1752 int32_t RcverWaitSocket(DFileSession *session, uint8_t *canRead)
1753 {
1754     if (session->acceptFlag == 0) {
1755         return WaitSocketEvent(session, session->socket[0]->sockfd, DEFAULT_WAIT_TIME_MS, canRead, NULL);
1756     } else {
1757         return WaitSocketEvent(session, session->acceptSocket->sockfd, DEFAULT_WAIT_TIME_MS, canRead, NULL);
1758     }
1759 }
1760 
DFileSocketRecv(DFileSession * session)1761 int32_t DFileSocketRecv(DFileSession *session)
1762 {
1763     return DFileSocketRecvSP(session);
1764 }
1765 
DFileAcceptSocket(DFileSession * session)1766 int32_t DFileAcceptSocket(DFileSession *session)
1767 {
1768     session->acceptSocket = AcceptSocket(session->socket[0]);
1769     if (session->acceptSocket == NULL) {
1770         LOGI(TAG, "accept socket failed");
1771         return NSTACKX_EFAILED;
1772     } else {
1773         LOGI(TAG, "accept socket success");
1774         session->acceptFlag = 1;
1775         SetTcpKeepAlive(session->acceptSocket->sockfd);
1776     }
1777 
1778     return NSTACKX_EOK;
1779 }
1780 
DFileReceiverHandle(void * arg)1781 void *DFileReceiverHandle(void *arg)
1782 {
1783     DFileSession *session = arg;
1784     uint8_t canRead = NSTACKX_FALSE;
1785     int32_t ret = NSTACKX_EAGAIN;
1786     uint8_t isBind = NSTACKX_FALSE;
1787 
1788     LOGI(TAG, "recv thread start");
1789     DFileRecverPre(session);
1790     while (!session->closeFlag) {
1791         if (ret == NSTACKX_EAGAIN || !canRead) {
1792             ret = RcverWaitSocket(session, &canRead);
1793             if (ret != NSTACKX_EOK || session->closeFlag) {
1794                 break;
1795             }
1796         }
1797         if (!canRead) {
1798             continue;
1799         }
1800         if (isBind == NSTACKX_FALSE && session->transFlag == NSTACKX_TRUE) {
1801             BindServerRecvThreadToTargetCpu(session);
1802             isBind = NSTACKX_TRUE;
1803         }
1804 
1805         ret = DFileSocketRecv(session);
1806         if (ret != NSTACKX_EAGAIN && ret != NSTACKX_EOK) {
1807             break;
1808         }
1809     }
1810     LOGI(TAG, "Total recv blocks: direct %llu inner %llu", session->recvBlockNumDirect, session->recvBlockNumInner);
1811     if (ret < 0 && ret != NSTACKX_EAGAIN && ret != NSTACKX_PEER_CLOSE) {
1812         PostFatalEvent(session);
1813     }
1814 
1815     LOGI(TAG, "session %u Exit receiver thread ret %d", session->sessionId, ret);
1816     return NULL;
1817 }
1818 
DFileControlHandle(void * arg)1819 void *DFileControlHandle(void *arg)
1820 {
1821     SetThreadName(DFFILE_CONTROL_THREAD_NAME);
1822     DFileSession *session = arg;
1823     if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
1824         DFileSenderControlHandle(session);
1825     } else {
1826         DFileReceiverControlHandle(session);
1827     }
1828     return NULL;
1829 }
1830 
RealPathFileName(FileListInfo * fileListInfo)1831 static int32_t RealPathFileName(FileListInfo *fileListInfo)
1832 {
1833     uint32_t i;
1834     int32_t ret = NSTACKX_EOK;
1835     for (i = 0; i < fileListInfo->fileNum; i++) {
1836         char *tmpFileName = fileListInfo->files[i];
1837         char *tmpFileNameRes = realpath(tmpFileName, NULL);
1838         if (tmpFileNameRes == NULL) {
1839             ret = NSTACKX_EFAILED;
1840             break;
1841         }
1842         fileListInfo->files[i] = tmpFileNameRes;
1843         free(tmpFileName);
1844     }
1845     if (ret != NSTACKX_EOK) {
1846         LOGE(TAG, "realpath failed");
1847     }
1848     return ret;
1849 }
1850 
FreeTransFileListInfo(FileListInfo * fileListInfo)1851 static void FreeTransFileListInfo(FileListInfo *fileListInfo)
1852 {
1853     free(fileListInfo->files);
1854     fileListInfo->files = NULL;
1855     if (fileListInfo->remotePath != NULL) {
1856         free(fileListInfo->remotePath);
1857         fileListInfo->remotePath = NULL;
1858     }
1859     free(fileListInfo);
1860     fileListInfo = NULL;
1861 }
1862 
DFileStartTransInner(DFileSession * session,FileListInfo * fileListInfo)1863 static int32_t DFileStartTransInner(DFileSession *session, FileListInfo *fileListInfo)
1864 {
1865     uint16_t transId = session->lastDFileTransId + 1;
1866     if (transId == 0) { /* overflow */
1867         transId = 1;
1868     }
1869 
1870     PeerInfo *peerInfo = TransSelectPeerInfo(session);
1871     DFileTrans *trans = CreateTrans(transId, session, peerInfo, NSTACKX_TRUE);
1872     if (trans == NULL) {
1873         LOGE(TAG, "CreateTrans error");
1874         return NSTACKX_ENOMEM;
1875     }
1876 
1877     if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
1878         LOGE(TAG, "set trans mtu failed");
1879     }
1880     if (RealPathFileName(fileListInfo) != NSTACKX_EOK) {
1881         DFileTransDestroy(trans);
1882         return NSTACKX_EFAILED;
1883     }
1884     int32_t ret = DFileTransSendFiles(trans, fileListInfo);
1885     if (ret != NSTACKX_EOK) {
1886         DFileTransDestroy(trans);
1887         LOGE(TAG, "DFileTransSendFiles fail, error: %d", ret);
1888         return ret;
1889     }
1890     ret = DFileTransAddExtraInfo(trans, fileListInfo->pathType, fileListInfo->noticeFileNameType,
1891                                  fileListInfo->userData);
1892     if (ret != NSTACKX_EOK) {
1893         LOGE(TAG, "DFileTransAddExtraInfo fail");
1894         DFileTransDestroy(trans);
1895         return NSTACKX_EFAILED;
1896     }
1897     trans->fileList->tarFlag = fileListInfo->tarFlag;
1898     trans->fileList->smallFlag = fileListInfo->smallFlag;
1899     trans->fileList->tarFile = fileListInfo->tarFile;
1900     trans->fileList->noSyncFlag = fileListInfo->noSyncFlag;
1901 
1902     fileListInfo->userData = NULL;
1903     ListInsertTail(&(session->dFileTransChain), &(trans->list));
1904     session->lastDFileTransId = transId;
1905     if (fileListInfo->smallFlag == NSTACKX_TRUE) {
1906         session->smallListProcessingCnt++;
1907     } else {
1908         session->fileListProcessingCnt++;
1909     }
1910     /* Elements in ctx->fileListInfo->files[] are reused by dFileTranns, so don't need to free. */
1911     FreeTransFileListInfo(fileListInfo);
1912     return NSTACKX_EOK;
1913 }
1914 
DFileStartTrans(DFileSession * session,FileListInfo * fileListInfo)1915 int32_t DFileStartTrans(DFileSession *session, FileListInfo *fileListInfo)
1916 {
1917     if (PthreadMutexLock(&session->transIdLock) != 0) {
1918         LOGE(TAG, "pthread mutex lock error");
1919         return NSTACKX_EFAILED;
1920     }
1921     int32_t ret = DFileStartTransInner(session, fileListInfo);
1922     if (PthreadMutexUnlock(&session->transIdLock) != 0) {
1923         LOGE(TAG, "pthread mutex unlock error");
1924     }
1925     return ret;
1926 }
1927 
TerminateMainThreadInner(void * arg)1928 void TerminateMainThreadInner(void *arg)
1929 {
1930     DFileSession *session = (DFileSession *)arg;
1931     DFileSessionSetTerminateFlag(session);
1932 }
1933 
StartDFileThreadsInner(DFileSession * session)1934 int32_t StartDFileThreadsInner(DFileSession *session)
1935 {
1936     if (PthreadCreate(&(session->tid), NULL, DFileMainLoop, session)) {
1937         LOGE(TAG, "Create mainloop thread failed");
1938         goto L_ERR_MAIN_LOOP_THREAD;
1939     }
1940 
1941     if (CreateSenderThread(session) != NSTACKX_EOK) {
1942         goto L_ERR_SENDER_THREAD;
1943     }
1944 
1945     if (PthreadCreate(&(session->receiverTid), NULL, DFileReceiverHandle, session)) {
1946         LOGE(TAG, "Create receiver thread failed");
1947         goto L_ERR_RECEIVER_THREAD;
1948     }
1949 
1950     if (PthreadCreate(&(session->controlTid), NULL, DFileControlHandle, session)) {
1951         LOGE(TAG, "Create control thread failed");
1952         goto L_ERR_CONTROL_THREAD;
1953     }
1954     return NSTACKX_EOK;
1955 L_ERR_CONTROL_THREAD:
1956     DFileSessionSetTerminateFlag(session);
1957     PthreadJoin(session->controlTid, NULL);
1958     session->receiverTid = INVALID_TID;
1959 L_ERR_RECEIVER_THREAD:
1960     DFileSessionSetTerminateFlag(session);
1961     PthreadJoin(session->senderTid[0], NULL);
1962     session->senderTid[0] = INVALID_TID;
1963     PostOutboundQueueWait(session);
1964 L_ERR_SENDER_THREAD:
1965     DFileSessionSetTerminateFlag(session);
1966     if (PostEvent(&session->eventNodeChain, session->epollfd, TerminateMainThreadInner, session) != NSTACKX_EOK) {
1967         LOGE(TAG, "post terminate thread failed");
1968     }
1969     PthreadJoin(session->tid, NULL);
1970     session->tid = INVALID_TID;
1971 L_ERR_MAIN_LOOP_THREAD:
1972     return NSTACKX_EFAILED;
1973 }
1974 
FileManagerMsgHandle(FileManagerMsgType msgType,int32_t errCode,void * context)1975 static void FileManagerMsgHandle(FileManagerMsgType msgType, int32_t errCode, void *context)
1976 {
1977     DFileSession *session = context;
1978     if (msgType == FILE_MANAGER_INNER_ERROR) {
1979         LOGE(TAG, "Session (%u) fatal error -- File Manager error: %d", session->sessionId, errCode);
1980         PostFatalEvent(session);
1981     }
1982 
1983     if (msgType == FILE_MANAGER_IN_PROGRESS) {
1984         NoticeSessionProgress(session);
1985     }
1986 }
1987 
CreateFileManager(DFileSession * session,const uint8_t * key,uint32_t keyLen,uint8_t isSender,uint16_t connType)1988 int32_t CreateFileManager(DFileSession *session, const uint8_t *key, uint32_t keyLen, uint8_t isSender,
1989     uint16_t connType)
1990 {
1991     FileManagerMsgPara msgPara;
1992     if (session == NULL) {
1993         LOGE(TAG, "invalid input");
1994         return NSTACKX_EINVAL;
1995     }
1996     if (isSender && (connType != CONNECT_TYPE_P2P && connType != CONNECT_TYPE_WLAN)) {
1997         LOGE(TAG, "connType for sender is illagal");
1998         return NSTACKX_EINVAL;
1999     }
2000     if (keyLen > 0) {
2001         if (keyLen != AES_128_KEY_LENGTH || key == NULL) {
2002             LOGE(TAG, "error key or key len");
2003             return NSTACKX_EFAILED;
2004         }
2005         if (!IsCryptoIncluded()) {
2006             LOGE(TAG, "crypto is not opened");
2007             return NSTACKX_EFAILED;
2008         }
2009     }
2010     msgPara.epollfd = session->epollfd;
2011     msgPara.eventNodeChain = &session->eventNodeChain;
2012     msgPara.msgReceiver = FileManagerMsgHandle;
2013     msgPara.context = session;
2014     session->fileManager = FileManagerCreate(isSender, &msgPara, key, keyLen, connType);
2015     if (session->fileManager == NULL) {
2016         LOGE(TAG, "create filemanager failed");
2017         return NSTACKX_EFAILED;
2018     }
2019     return NSTACKX_EOK;
2020 }
2021 
DestroyReceiverPipe(DFileSession * session)2022 void DestroyReceiverPipe(DFileSession *session)
2023 {
2024     if (session->receiverPipe[PIPE_OUT] != INVALID_PIPE_DESC) {
2025         CloseDesc(session->receiverPipe[PIPE_OUT]);
2026         session->receiverPipe[PIPE_OUT] = INVALID_PIPE_DESC;
2027     }
2028     if (session->receiverPipe[PIPE_IN] != INVALID_PIPE_DESC) {
2029         CloseDesc(session->receiverPipe[PIPE_IN]);
2030         session->receiverPipe[PIPE_IN] = INVALID_PIPE_DESC;
2031     }
2032 }
2033 
2034