1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "nstackx_dfile_session.h"
17
18 #include "nstackx_congestion.h"
19 #include "nstackx_dfile.h"
20 #include "nstackx_dfile_config.h"
21 #include "nstackx_dfile_frame.h"
22 #include "nstackx_dfile_send.h"
23 #include "nstackx_epoll.h"
24 #include "nstackx_error.h"
25 #include "nstackx_event.h"
26 #include "nstackx_log.h"
27 #ifdef MBEDTLS_INCLUDED
28 #include "nstackx_mbedtls.h"
29 #else
30 #include "nstackx_openssl.h"
31 #endif
32 #include "nstackx_timer.h"
33 #include "nstackx_dfile_control.h"
34 #include "nstackx_dfile_mp.h"
35 #include "securec.h"
36 #include "nstackx_util.h"
37
38 #define TAG "nStackXDFile"
39
40 #define DEFAULT_WAIT_TIME_MS 1000
41 #define MULTI_THREADS_SOCKET_WAIT_TIME_MS 1
42 #define DEFAULT_NEGOTIATE_TIMEOUT 1000
43 #define TCP_NEGOTIATE_TIMEOUT 10000
44 #define MAX_SERVER_NEGOTIATE_VALID_TIMEOUT (600 * DEFAULT_NEGOTIATE_TIMEOUT)
45 #define MAX_NEGOTIATE_TIMEOUT_COUNT 3
46 #define MAX_UNPROCESSED_READ_EVENT_COUNT 3
47 #define MAX_RECVBUF_COUNT 130000
48 #define MAX_NOMEM_PRINT 10000
49
50 static void ReadEventHandle(void *arg);
51 static void ProcessSessionTrans(const DFileSession *session, uint16_t exceptTransId);
52
CreateQueueNode(const uint8_t * frame,size_t length,const struct sockaddr_in * peerAddr,uint8_t socketIndex)53 static QueueNode *CreateQueueNode(const uint8_t *frame, size_t length,
54 const struct sockaddr_in *peerAddr, uint8_t socketIndex)
55 {
56 QueueNode *queueNode = NULL;
57
58 if (frame == NULL || length == 0 || length > NSTACKX_MAX_FRAME_SIZE) {
59 return NULL;
60 }
61
62 queueNode = calloc(1, sizeof(QueueNode));
63 if (queueNode == NULL) {
64 return NULL;
65 }
66
67 queueNode->frame = calloc(1, length);
68 if (queueNode->frame == NULL) {
69 free(queueNode);
70 return NULL;
71 }
72 queueNode->length = length;
73 queueNode->sendLen = 0;
74
75 if (memcpy_s(queueNode->frame, length, frame, length) != EOK) {
76 free(queueNode->frame);
77 free(queueNode);
78 return NULL;
79 }
80 if (peerAddr != NULL) {
81 if (memcpy_s(&queueNode->peerAddr, sizeof(struct sockaddr_in), peerAddr, sizeof(struct sockaddr_in)) != EOK) {
82 free(queueNode->frame);
83 free(queueNode);
84 return NULL;
85 }
86 }
87 queueNode->socketIndex = socketIndex;
88
89 return queueNode;
90 }
91
DestroyQueueNode(QueueNode * queueNode)92 void DestroyQueueNode(QueueNode *queueNode)
93 {
94 if (queueNode != NULL) {
95 free(queueNode->frame);
96 free(queueNode);
97 }
98 }
99
NotifyMsgRecver(const DFileSession * session,DFileMsgType msgType,const DFileMsg * msg)100 void NotifyMsgRecver(const DFileSession *session, DFileMsgType msgType, const DFileMsg *msg)
101 {
102 if (session == NULL) {
103 LOGI(TAG, "session is NULL");
104 return;
105 }
106
107 if (session->msgReceiver == NULL) {
108 LOGI(TAG, "msgReceiver is NULL");
109 return;
110 }
111
112 session->msgReceiver(session->sessionId, msgType, msg);
113 }
114
CalculateSessionTransferRatePrepare(DFileSession * session)115 void CalculateSessionTransferRatePrepare(DFileSession *session)
116 {
117 #ifdef NSTACKX_SMALL_FILE_SUPPORT
118 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && (!ListIsEmpty(&session->pendingFileLists)
119 || !ListIsEmpty(&session->smallFileLists))) {
120 return;
121 }
122 #else
123 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && !ListIsEmpty(&session->pendingFileLists)) {
124 return;
125 }
126 #endif
127 if (!ListIsEmpty(&session->dFileTransChain)) {
128 return;
129 }
130 LOGI(TAG, "begin to calulate transfer rate");
131 session->bytesTransferred = 0;
132 session->transCount = 0;
133 ClockGetTime(CLOCK_MONOTONIC, &session->startTs);
134 }
135
136 #ifdef NSTACKX_SMALL_FILE_SUPPORT
SendSmallList(DFileSession * session)137 static int32_t SendSmallList(DFileSession *session)
138 {
139 FileListInfo *fileListInfo = NULL;
140 DFileMsg data;
141 if (session->fileListProcessingCnt > 0 || session->fileListPendingCnt == 0) {
142 while (!ListIsEmpty(&session->smallFileLists)) {
143 fileListInfo = (FileListInfo *)ListPopFront(&session->smallFileLists);
144 session->smallListPendingCnt--;
145 if (fileListInfo == NULL) {
146 continue;
147 }
148 int32_t ret = DFileStartTrans(session, fileListInfo);
149 if (ret == NSTACKX_EOK) {
150 return NSTACKX_TRUE;
151 }
152 LOGE(TAG, "DFileStartTrans fail, error: %d", ret);
153 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
154 data.errorCode = ret;
155 data.fileList.files = (const char **)fileListInfo->files;
156 data.fileList.fileNum = fileListInfo->fileNum;
157 data.fileList.userData = fileListInfo->userData;
158 NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, &data);
159 DestroyFileListInfo(fileListInfo);
160 }
161 }
162 return NSTACKX_FALSE;
163 }
164 #endif
165
SendPendingList(DFileSession * session)166 static void SendPendingList(DFileSession *session)
167 {
168 FileListInfo *fileListInfo = NULL;
169 DFileMsg data;
170 while (!ListIsEmpty(&session->pendingFileLists)) {
171 fileListInfo = (FileListInfo *)ListPopFront(&session->pendingFileLists);
172 session->fileListPendingCnt--;
173 if (fileListInfo == NULL) {
174 continue;
175 }
176 int32_t ret = DFileStartTrans(session, fileListInfo);
177 if (ret == NSTACKX_EOK) {
178 break;
179 }
180 LOGE(TAG, "DFileStartTrans fail, error: %d", ret);
181 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
182 data.errorCode = ret;
183 data.fileList.files = (const char **)fileListInfo->files;
184 data.fileList.fileNum = fileListInfo->fileNum;
185 data.fileList.userData = fileListInfo->userData;
186 NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, &data);
187 DestroyFileListInfo(fileListInfo);
188 }
189 }
190
SendSmallAndPendingList(DFileSession * session)191 static void SendSmallAndPendingList(DFileSession *session)
192 {
193 #ifdef NSTACKX_SMALL_FILE_SUPPORT
194 LOGI(TAG, "fileListPendingCnt %u fileListProcessingCnt %u smallListPendingCnt %u smallListProcessingCnt %u",
195 session->fileListPendingCnt, session->fileListProcessingCnt, session->smallListPendingCnt,
196 session->smallListProcessingCnt);
197 if (SendSmallList(session) != NSTACKX_TRUE) {
198 SendPendingList(session);
199 }
200 #else
201 LOGI(TAG, "fileListPendingCnt %u fileListProcessingCnt %u",
202 session->fileListPendingCnt, session->fileListProcessingCnt);
203 SendPendingList(session);
204 #endif
205 }
206
NoticeSessionProgress(DFileSession * session)207 void NoticeSessionProgress(DFileSession *session)
208 {
209 DFileMsg data;
210 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
211 if (FileManagerGetTotalBytes(session->fileManager, &data.transferUpdate.totalBytes) == NSTACKX_EOK &&
212 FileManagerGetBytesTransferred(session->fileManager, &data.transferUpdate.bytesTransferred) == NSTACKX_EOK
213 && data.transferUpdate.bytesTransferred <= data.transferUpdate.totalBytes
214 && data.transferUpdate.bytesTransferred > 0) {
215 NotifyMsgRecver(session, DFILE_ON_SESSION_IN_PROGRESS, &data);
216 }
217 }
218
UpdateMsgProcessInfo(const DFileSession * session,struct DFileTrans * dFileTrans,DFileTransMsgType msgType,DFileTransMsg * msg)219 static void UpdateMsgProcessInfo(const DFileSession *session, struct DFileTrans *dFileTrans,
220 DFileTransMsgType msgType, DFileTransMsg *msg)
221 {
222 uint64_t totalBytes = 0;
223 uint64_t bytesTrans = 0;
224
225 if (session == NULL || dFileTrans == NULL || msg == NULL ||
226 (msgType != DFILE_TRANS_MSG_FILE_RECEIVED && msgType != DFILE_TRANS_MSG_FILE_RECEIVE_FAIL &&
227 msgType != DFILE_TRANS_MSG_FILE_SEND_FAIL && msgType != DFILE_TRANS_MSG_FILE_SENT)) {
228 return;
229 }
230
231 msg->transferUpdate.transId = dFileTrans->transId;
232
233 if (msgType == DFILE_TRANS_MSG_FILE_RECEIVED || msgType == DFILE_TRANS_MSG_FILE_SENT) {
234 totalBytes = DFileTransGetTotalBytes(dFileTrans);
235 if (totalBytes > 0) {
236 bytesTrans = totalBytes;
237 goto L_END;
238 }
239 }
240
241 if (FileManagerGetTransUpdateInfo(session->fileManager, dFileTrans->transId, &totalBytes, &bytesTrans) !=
242 NSTACKX_EOK) {
243 return;
244 }
245
246 L_END:
247 msg->transferUpdate.totalBytes = totalBytes;
248 msg->transferUpdate.bytesTransferred = bytesTrans;
249 if (msgType == DFILE_TRANS_MSG_FILE_SENT) {
250 if (dFileTrans->fileList->vtransFlag) {
251 msg->fileList.transId = dFileTrans->fileList->vtransRealTransId;
252 msg->transferUpdate.transId = dFileTrans->fileList->vtransRealTransId;
253 }
254 NotifyMsgRecver(session, DFILE_ON_TRANS_IN_PROGRESS, msg);
255 }
256 }
257
WakeSendThread(DFileSession * session,uint8_t isSender,uint8_t socketIndex)258 static void WakeSendThread(DFileSession *session, uint8_t isSender, uint8_t socketIndex)
259 {
260 SemPost(&session->outboundQueueWait[socketIndex]);
261 if (isSender && session->clientSendThreadNum > 1) {
262 for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
263 SemPost(&session->sendThreadPara[i].sendWait);
264 }
265 }
266 }
267
CalculateSessionTransferRate(DFileSession * session,uint64_t totalBytes,DFileTransMsgType msgType)268 static void CalculateSessionTransferRate(DFileSession *session, uint64_t totalBytes, DFileTransMsgType msgType)
269 {
270 if (msgType != DFILE_TRANS_MSG_FILE_SENT && msgType != DFILE_TRANS_MSG_END) {
271 return;
272 }
273 if (totalBytes <= UINT64_MAX - session->bytesTransferred) {
274 session->bytesTransferred += totalBytes;
275 } else {
276 session->bytesTransferred = UINT64_MAX;
277 }
278 session->transCount++;
279 if (!ListIsEmpty(&session->dFileTransChain)) {
280 return;
281 }
282 #ifdef NSTACKX_SMALL_FILE_SUPPORT
283 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && (!ListIsEmpty(&session->pendingFileLists)
284 || !ListIsEmpty(&session->smallFileLists))) {
285 return;
286 }
287 #else
288 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && !ListIsEmpty(&session->pendingFileLists)) {
289 return;
290 }
291 #endif
292 struct timespec endTs;
293 ClockGetTime(CLOCK_MONOTONIC, &endTs);
294
295 uint32_t spendTime = GetTimeDiffMs(&endTs, &session->startTs);
296 if (spendTime == 0) {
297 return;
298 }
299 const double rate = 1.0 * session->bytesTransferred / DFILE_MEGABYTES * MSEC_TICKS_PER_SEC / spendTime;
300 LOGI(TAG, "Total %u trans, %llu bytes, used %u ms. rate %.2f MB/s",
301 session->transCount, session->bytesTransferred, spendTime, rate);
302 DFileMsg msgData;
303 (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
304 msgData.rate = (uint32_t)rate;
305 NotifyMsgRecver(session, DFILE_ON_SESSION_TRANSFER_RATE, &msgData);
306 }
307
CheckTransDone(DFileSession * session,struct DFileTrans * dFileTrans,DFileTransMsgType msgType)308 static void CheckTransDone(DFileSession *session, struct DFileTrans *dFileTrans, DFileTransMsgType msgType)
309 {
310 if (msgType == DFILE_TRANS_MSG_FILE_RECEIVE_FAIL || msgType == DFILE_TRANS_MSG_FILE_SENT ||
311 msgType == DFILE_TRANS_MSG_FILE_SEND_FAIL || msgType == DFILE_TRANS_MSG_END) {
312 uint8_t flag = dFileTrans->fileList->smallFlag;
313 if (SetTransIdState(session, dFileTrans->transId, STATE_TRANS_DONE) != NSTACKX_EOK) {
314 LOGE(TAG, "set trans id state fail");
315 }
316 ((PeerInfo *)dFileTrans->context)->currentTransCount--;
317 ListRemoveNode(&dFileTrans->list);
318 uint64_t totalBytes = DFileTransGetTotalBytes(dFileTrans);
319 DFileTransDestroy(dFileTrans);
320 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
321 if (flag == NSTACKX_TRUE && session->smallListProcessingCnt > 0) {
322 session->smallListProcessingCnt--;
323 } else if (session->fileListProcessingCnt > 0) {
324 session->fileListProcessingCnt--;
325 }
326 SendSmallAndPendingList(session);
327 }
328 CalculateSessionTransferRate(session, totalBytes, msgType);
329 }
330 }
331
DTransMsgReceiver(struct DFileTrans * dFileTrans,DFileTransMsgType msgType,DFileTransMsg * msg)332 static void DTransMsgReceiver(struct DFileTrans *dFileTrans, DFileTransMsgType msgType,
333 DFileTransMsg *msg)
334 {
335 PeerInfo *peerInfo = dFileTrans->context;
336 DFileSession *session = peerInfo->session;
337
338 UpdateMsgProcessInfo(session, dFileTrans, msgType, msg);
339
340 switch (msgType) {
341 case DFILE_TRANS_MSG_FILE_SEND_DATA:
342 WakeSendThread(session, dFileTrans->isSender, peerInfo->socketIndex);
343 break;
344 case DFILE_TRANS_MSG_FILE_LIST_RECEIVED:
345 NotifyMsgRecver(session, DFILE_ON_FILE_LIST_RECEIVED, msg);
346 break;
347 case DFILE_TRANS_MSG_FILE_RECEIVED: /* Receiver receive all the file data */
348 NotifyMsgRecver(session, DFILE_ON_FILE_RECEIVE_SUCCESS, msg);
349 break;
350 case DFILE_TRANS_MSG_FILE_RECEIVED_TO_FAIL:
351 case DFILE_TRANS_MSG_FILE_RECEIVE_FAIL:
352 NotifyMsgRecver(session, DFILE_ON_FILE_RECEIVE_FAIL, msg);
353 break;
354 case DFILE_TRANS_MSG_FILE_SENT: /* Sender send TRANSFER DONE ACK frame and come to end */
355 NoticeSessionProgress(session);
356 NotifyMsgRecver(session, DFILE_ON_FILE_SEND_SUCCESS, msg);
357 break;
358 case DFILE_TRANS_MSG_FILE_SEND_FAIL:
359 NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, msg);
360 break;
361 case DFILE_TRANS_MSG_IN_PROGRESS:
362 NotifyMsgRecver(session, DFILE_ON_TRANS_IN_PROGRESS, msg);
363 break;
364 case DFILE_TRANS_MSG_FILE_SEND_ACK:
365 ProcessSessionTrans(session, dFileTrans->transId);
366 break;
367 default:
368 LOGI(TAG, "transId %u, recv DFileTrans event %d", dFileTrans->transId, msgType);
369 }
370
371 CheckTransDone(session, dFileTrans, msgType);
372 }
373
ServerSettingTimeoutHandle(void * data)374 static void ServerSettingTimeoutHandle(void *data)
375 {
376 PeerInfo *peerInfo = data;
377 ListRemoveNode(&peerInfo->list);
378 TimerDelete(peerInfo->settingTimer);
379 peerInfo->session->peerInfoCnt--;
380 free(peerInfo);
381 LOGI(TAG, "DFileServer Setting Negotationion timeout");
382 }
383
ClientSettingTimeoutHandle(void * data)384 static void ClientSettingTimeoutHandle(void *data)
385 {
386 PeerInfo *peerInfo = data;
387 uint8_t cnt = peerInfo->settingTimeoutCnt++;
388 DFileMsg msgData;
389 uint32_t timeout = CapsTcp(peerInfo->session) ? TCP_NEGOTIATE_TIMEOUT : DEFAULT_NEGOTIATE_TIMEOUT;
390 (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
391 if (cnt >= MAX_NEGOTIATE_TIMEOUT_COUNT) {
392 TimerDelete(peerInfo->settingTimer);
393 peerInfo->settingTimer = NULL;
394 peerInfo->settingTimeoutCnt = 0;
395 msgData.errorCode = NSTACKX_EFAILED;
396 LOGI(TAG, "DFileClient Setting Negotationion timeout");
397 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &msgData);
398 } else {
399 DFileSessionSendSetting(peerInfo);
400 LOGI(TAG, "Client Setting Negotationion timeout %u times", peerInfo->settingTimeoutCnt);
401 if (TimerSetTimeout(peerInfo->settingTimer, timeout, NSTACKX_FALSE) != NSTACKX_EOK) {
402 msgData.errorCode = NSTACKX_EFAILED;
403 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &msgData);
404 LOGE(TAG, "Timer setting timer fail");
405 }
406 }
407 }
408
SearchDFileTransNode(List * dFileTransChain,uint16_t transId)409 static DFileTrans *SearchDFileTransNode(List *dFileTransChain, uint16_t transId)
410 {
411 List *pos = NULL;
412 DFileTrans *trans = NULL;
413
414 if (dFileTransChain == NULL || transId == 0) {
415 return NULL;
416 }
417
418 LIST_FOR_EACH(pos, dFileTransChain) {
419 trans = (DFileTrans *)pos;
420 if (trans->transId == transId) {
421 return trans;
422 }
423 }
424 return NULL;
425 }
426
SearchPeerInfoNode(const DFileSession * session,const struct sockaddr_in * peerAddr)427 static PeerInfo *SearchPeerInfoNode(const DFileSession *session, const struct sockaddr_in *peerAddr)
428 {
429 List *pos = NULL;
430 PeerInfo *peerInfo = NULL;
431 LIST_FOR_EACH(pos, &session->peerInfoChain) {
432 peerInfo = (PeerInfo *)pos;
433 if (memcmp(&peerInfo->dstAddr, peerAddr, sizeof(struct sockaddr_in)) == 0 &&
434 peerInfo->session->sessionId == session->sessionId) {
435 return peerInfo;
436 }
437 }
438 return NULL;
439 }
440
DFileSessionSendSetting(PeerInfo * peerInfo)441 void DFileSessionSendSetting(PeerInfo *peerInfo)
442 {
443 uint32_t timeout = CapsTcp(peerInfo->session) ? TCP_NEGOTIATE_TIMEOUT : DEFAULT_NEGOTIATE_TIMEOUT;
444 uint8_t buf[NSTACKX_DEFAULT_FRAME_SIZE];
445 size_t frameLen = 0;
446 DFileMsg data;
447 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
448 SettingFrame settingFramePara;
449 settingFramePara.connType = peerInfo->connType;
450 settingFramePara.mtu = peerInfo->localMtu;
451 settingFramePara.capability = peerInfo->session->capability & NSTACKX_CAPS_LINK_SEQUENCE;
452 settingFramePara.dataFrameSize = 0;
453 settingFramePara.capsCheck = NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
454 EncodeSettingFrame(buf, NSTACKX_DEFAULT_FRAME_SIZE, &frameLen, &settingFramePara);
455 int32_t ret = DFileWriteHandle(buf, frameLen, peerInfo);
456 if (ret != (int32_t)frameLen && ret != NSTACKX_EAGAIN) {
457 data.errorCode = NSTACKX_EFAILED;
458 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
459 return;
460 }
461
462 if (peerInfo->settingTimer != NULL) {
463 return;
464 }
465
466 if (peerInfo->session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
467 peerInfo->settingTimer = TimerStart(peerInfo->session->epollfd, timeout,
468 NSTACKX_FALSE, ClientSettingTimeoutHandle, peerInfo);
469 if (peerInfo->settingTimer == NULL) {
470 LOGE(TAG, "setting timmer creat fail");
471 data.errorCode = NSTACKX_EFAILED;
472 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
473 }
474 } else {
475 peerInfo->settingTimer = TimerStart(peerInfo->session->epollfd, MAX_SERVER_NEGOTIATE_VALID_TIMEOUT,
476 NSTACKX_FALSE, ServerSettingTimeoutHandle, peerInfo);
477 if (peerInfo->settingTimer == NULL) {
478 return;
479 }
480 }
481 }
482
SetDFileSessionConfig(DFileSession * session,DFileConfig * dFileConfig,uint16_t connType,PeerInfo * peerInfo)483 static void SetDFileSessionConfig(DFileSession *session, DFileConfig *dFileConfig, uint16_t connType,
484 PeerInfo *peerInfo)
485 {
486 peerInfo->maxSendRate = dFileConfig->sendRate;
487 (void)memset_s(peerInfo->integralLossRate, INTEGRAL_TIME * sizeof(double), 0, INTEGRAL_TIME * sizeof(double));
488 peerInfo->fastStartCounter = 0;
489 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->measureBefore);
490 peerInfo->dataFrameSize = dFileConfig->dataFrameSize;
491 if (connType == CONNECT_TYPE_WLAN) {
492 peerInfo->sendRate = dFileConfig->sendRate / NSTACKX_WLAN_INIT_SPEED_DIVISOR;
493 } else {
494 peerInfo->sendRate = dFileConfig->sendRate / NSTACKX_P2P_INIT_SPEED_DIVISOR;
495 }
496
497 #ifndef NSTACKX_WITH_LITEOS
498 if ((!peerInfo->gotWifiRate) && (connType == CONNECT_TYPE_WLAN)) {
499 peerInfo->sendRate = (uint16_t)(NSTACKX_WLAN_INIT_RATE / MSEC_TICKS_PER_SEC * DATA_FRAME_SEND_INTERVAL_MS
500 / dFileConfig->dataFrameSize + NSTACKX_WLAN_COMPENSATION_RATE);
501 }
502 #endif
503 if (peerInfo->sendRate < NSTACKX_MIN_SENDRATE) {
504 peerInfo->sendRate = NSTACKX_MIN_SENDRATE;
505 }
506
507 if (FileManagerSetMaxFrameLength(session->fileManager, peerInfo->dataFrameSize) != NSTACKX_EOK) {
508 LOGE(TAG, "failed to set max frame length");
509 }
510
511 LOGI(TAG, "connType is %u set sendrate is %u maxSendRate is %u peerInfo->dataFrameSize is %u",
512 connType, peerInfo->sendRate, peerInfo->maxSendRate, peerInfo->dataFrameSize);
513 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
514 if (FileManagerSetRecvParaWithConnType(session->fileManager, connType) != NSTACKX_EOK) {
515 LOGE(TAG, "failed to set recv para");
516 }
517 }
518 }
519
DFileSessionSetPeerInfo(PeerInfo * peerInfo,SettingState state,SettingFrame * settingFrame)520 static void DFileSessionSetPeerInfo(PeerInfo *peerInfo, SettingState state, SettingFrame *settingFrame)
521 {
522 peerInfo->state = state;
523 peerInfo->mtu = settingFrame->mtu;
524 peerInfo->connType = settingFrame->connType;
525 uint16_t localMtu = peerInfo->localMtu;
526 peerInfo->mtuInuse = (localMtu < peerInfo->mtu) ? localMtu : peerInfo->mtu;
527 peerInfo->remoteSessionId = settingFrame->header.sessionId;
528 }
529
DFileSessionHandleClientSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)530 static void DFileSessionHandleClientSetting(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
531 {
532 List *pos = NULL;
533 SettingFrame hostSettingFrame;
534 (void)memset_s(&hostSettingFrame, sizeof(hostSettingFrame), 0, sizeof(hostSettingFrame));
535 LOGI(TAG, "handle Setting Frame, DFileSessionType %u", session->sessionType);
536 /* unsupport version */
537 if (DecodeSettingFrame((SettingFrame *)dFileFrame, &hostSettingFrame) != NSTACKX_EOK || hostSettingFrame.mtu == 0) {
538 return;
539 }
540 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
541 if (peerInfo == NULL) {
542 LOGE(TAG, "recv unknow peer setting, maybe be attacked");
543 return;
544 }
545 peerInfo->remoteDFileVersion = hostSettingFrame.dFileVersion;
546 TimerDelete(peerInfo->settingTimer);
547 peerInfo->settingTimer = NULL;
548 DFileSessionSetPeerInfo(peerInfo, SETTING_NEGOTIATED, &hostSettingFrame);
549 LIST_FOR_EACH(pos, &session->dFileTransChain) {
550 DFileTrans *trans = (DFileTrans *)pos;
551 if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
552 LOGE(TAG, "set trans mtu failed");
553 }
554 }
555 DFileMsg data;
556 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
557 data.errorCode = NSTACKX_EOK;
558 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_SUCCESS, &data);
559
560 DFileConfig dFileConfig;
561 (void)memset_s(&dFileConfig, sizeof(dFileConfig), 0, sizeof(dFileConfig));
562 if (GetDFileConfig(&dFileConfig, peerInfo->mtuInuse, hostSettingFrame.connType) == NSTACKX_EOK) {
563 SetDFileSessionConfig(session, &dFileConfig, hostSettingFrame.connType, peerInfo);
564 }
565 if (hostSettingFrame.capability & NSTACKX_CAPS_LINK_SEQUENCE) {
566 LOGI(TAG, "server replies not support Link Sequence");
567 } else {
568 LOGI(TAG, "server replies using normal ACK");
569 session->capability &= ~NSTACKX_CAPS_LINK_SEQUENCE;
570 }
571 }
572
DFileGetMTU(SocketProtocol protocol)573 static uint16_t DFileGetMTU(SocketProtocol protocol)
574 {
575 /* for udp, return NSTACKX_DEFAULT_FRAME_SIZE, for D2D, need call D2D MTU interface */
576 uint16_t mtu = 0;
577 if (protocol == NSTACKX_PROTOCOL_UDP) {
578 mtu = NSTACKX_DEFAULT_FRAME_SIZE;
579 } else if (protocol == NSTACKX_PROTOCOL_D2D) {
580 LOGE(TAG, "d2d not support");
581 } else if (protocol == NSTACKX_PROTOCOL_TCP) {
582 mtu = NSTACKX_DEFAULT_FRAME_SIZE;
583 }
584
585 return mtu;
586 }
587
CreatePeerInfo(DFileSession * session,const struct sockaddr_in * dstAddr,uint16_t peerMtu,uint16_t connType,uint8_t socketIndex)588 PeerInfo *CreatePeerInfo(DFileSession *session, const struct sockaddr_in *dstAddr, uint16_t peerMtu,
589 uint16_t connType, uint8_t socketIndex)
590 {
591 if (session->peerInfoCnt >= MAX_PEERINFO_SIZE) {
592 return NULL;
593 }
594 PeerInfo *peerInfo = calloc(1, sizeof(PeerInfo));
595 if (peerInfo == NULL) {
596 return NULL;
597 }
598 peerInfo->session = session;
599 peerInfo->dstAddr = *dstAddr;
600 peerInfo->connType = connType;
601 peerInfo->socketIndex = socketIndex;
602 peerInfo->localMtu = DFileGetMTU(session->protocol);
603 session->peerInfoCnt++;
604 peerInfo->gotWifiRate = 0;
605 peerInfo->ackInterval = NSTACKX_INIT_ACK_INTERVAL;
606 peerInfo->rateStateInterval = NSTACKX_INIT_RATE_STAT_INTERVAL;
607
608 if (GetInterfaceNameByIP(session->socket[socketIndex]->srcAddr.sin_addr.s_addr,
609 peerInfo->localInterfaceName, sizeof(peerInfo->localInterfaceName)) != NSTACKX_EOK) {
610 LOGE(TAG, "GetInterfaceNameByIP failed %d", errno);
611 }
612
613 if (peerMtu == 0) {
614 return peerInfo;
615 }
616 peerInfo->mtu = peerMtu;
617 peerInfo->mtuInuse = (peerInfo->localMtu < peerInfo->mtu) ? peerInfo->localMtu : peerInfo->mtu;
618 return peerInfo;
619 }
620
HandleLinkSeqCap(DFileSession * session,SettingFrame * hostSettingFrame)621 static void HandleLinkSeqCap(DFileSession *session, SettingFrame *hostSettingFrame)
622 {
623 if (hostSettingFrame->capability & NSTACKX_CAPS_LINK_SEQUENCE) {
624 LOGI(TAG, "client wants to enable Link Sequence");
625 } else {
626 LOGI(TAG, "client wants to use normal ACK");
627 session->capability &= ~NSTACKX_CAPS_LINK_SEQUENCE;
628 }
629
630 if (hostSettingFrame->capability & NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK) {
631 LOGI(TAG, "client support recv feedback");
632 session->capsCheck |= NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
633 } else {
634 LOGI(TAG, "client do not support recv feedback");
635 session->capsCheck &= ~NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
636 }
637 }
638
AllocPeerInfo(DFileSession * session,const struct sockaddr_in * peerAddr,const SettingFrame * frame,uint8_t socketIndex)639 static PeerInfo *AllocPeerInfo(DFileSession *session, const struct sockaddr_in *peerAddr, const SettingFrame *frame,
640 uint8_t socketIndex)
641 {
642 PeerInfo *peerInfo = NULL;
643
644 peerInfo = CreatePeerInfo(session, peerAddr, frame->mtu, frame->connType, socketIndex);
645 if (peerInfo == NULL) {
646 return NULL;
647 }
648
649 peerInfo->remoteSessionId = frame->header.sessionId;
650 peerInfo->state = SETTING_NEGOTIATING;
651 DFileSessionSendSetting(peerInfo);
652 if (peerInfo->settingTimer == NULL) {
653 free(peerInfo);
654 session->peerInfoCnt--;
655 return NULL;
656 }
657 ListInsertTail(&peerInfo->session->peerInfoChain, &peerInfo->list);
658 return peerInfo;
659 }
660
DFileSessionHandleServerSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr,uint8_t socketIndex)661 static void DFileSessionHandleServerSetting(DFileSession *session, DFileFrame *dFileFrame,
662 struct sockaddr_in *peerAddr, uint8_t socketIndex)
663 {
664 SettingFrame hostSettingFrame;
665 DFileConfig dFileConfig;
666 (void)memset_s(&hostSettingFrame, sizeof(hostSettingFrame), 0, sizeof(hostSettingFrame));
667 (void)memset_s(&dFileConfig, sizeof(dFileConfig), 0, sizeof(dFileConfig));
668 LOGI(TAG, "handle Setting Frame, DFileSessionType %u", session->sessionType);
669 if (DecodeSettingFrame((SettingFrame *)dFileFrame, &hostSettingFrame) != NSTACKX_EOK || hostSettingFrame.mtu == 0) {
670 return;
671 }
672
673 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
674 if (peerInfo != NULL) {
675 if (peerInfo->settingTimeoutCnt >= MAX_NEGOTIATE_TIMEOUT_COUNT) {
676 LOGE(TAG, "receive more than %d Setting for one peer, drop", MAX_NEGOTIATE_TIMEOUT_COUNT);
677 return;
678 } else {
679 DFileSessionSetPeerInfo(peerInfo, SETTING_NEGOTIATING, &hostSettingFrame);
680 DFileSessionSendSetting(peerInfo);
681 goto L_END;
682 }
683 }
684
685 peerInfo = AllocPeerInfo(session, peerAddr, &hostSettingFrame, socketIndex);
686 if (peerInfo == NULL) {
687 return;
688 }
689
690 L_END:
691 peerInfo->settingTimeoutCnt++;
692 LOGI(TAG, "DFileServer response Setting Frame. count %u", peerInfo->settingTimeoutCnt);
693 peerInfo->remoteDFileVersion = hostSettingFrame.dFileVersion;
694 if (GetDFileConfig(&dFileConfig, peerInfo->mtuInuse, hostSettingFrame.connType) == NSTACKX_EOK) {
695 SetDFileSessionConfig(session, &dFileConfig, hostSettingFrame.connType, peerInfo);
696 }
697 HandleLinkSeqCap(session, &hostSettingFrame);
698 }
699
DFileSessionHandleSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr,uint8_t socketIndex)700 static void DFileSessionHandleSetting(DFileSession *session, DFileFrame *dFileFrame,
701 struct sockaddr_in *peerAddr, uint8_t socketIndex)
702 {
703 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
704 DFileSessionHandleServerSetting(session, dFileFrame, peerAddr, socketIndex);
705 } else if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
706 DFileSessionHandleClientSetting(session, dFileFrame, peerAddr);
707 } else {
708 return;
709 }
710 }
711
HandleWithoutSettingError(DFileSession * session,const struct sockaddr_in * peerAddr)712 static void HandleWithoutSettingError(DFileSession *session, const struct sockaddr_in *peerAddr)
713 {
714 List *pos = NULL;
715 DFileTrans *trans = NULL;
716
717 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
718 if (peerInfo == NULL) {
719 LOGE(TAG, "recv unknow peer rst, maybe be attacked");
720 return;
721 }
722
723 LIST_FOR_EACH(pos, &session->dFileTransChain) {
724 trans = (DFileTrans *)pos;
725 /*
726 * when client recv peer RST NSTACKX_DFILE_WITHOUT_SETTING_ERROR, Set Trans MTU 0,
727 * and will reset after new setting negotion.
728 */
729 if (DFileTransSetMtu(trans, 0) != NSTACKX_EOK) {
730 LOGE(TAG, "DFileTransSetMtu(trans, 0) failed %d", errno);
731 }
732 }
733
734 LIST_FOR_EACH(pos, &session->peerInfoChain) {
735 peerInfo = (PeerInfo *)pos;
736 if (memcmp(&peerInfo->dstAddr, peerAddr, sizeof(struct sockaddr_in)) == 0 &&
737 peerInfo->session->sessionId == session->sessionId) {
738 TimerDelete(peerInfo->settingTimer);
739 peerInfo->settingTimer = NULL;
740 peerInfo->state = SETTING_NEGOTIATING;
741 LOGD(TAG, "Send Setting Frame");
742 DFileSessionSendSetting(peerInfo);
743 break;
744 }
745 }
746 }
747
DFileSessionHandleRst(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)748 static void DFileSessionHandleRst(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
749 {
750 uint16_t errCode = 0;
751 if (DecodeRstFrame((RstFrame *)dFileFrame, &errCode, NULL, NULL) != NSTACKX_EOK) {
752 return;
753 }
754
755 uint16_t transId = ntohs(dFileFrame->header.transId);
756 LOGD(TAG, "handle RST (%u) frame, transId %u", errCode, transId);
757
758 switch (errCode) {
759 case NSTACKX_DFILE_WITHOUT_SETTING_ERROR:
760 HandleWithoutSettingError(session, peerAddr);
761 break;
762 default:
763 LOGE(TAG, "Unspported error code %u", errCode);
764 break;
765 }
766 }
767
DFileSessionResolveBackPress(DFileSession * session,DataBackPressure backPress,uint32_t count)768 static void DFileSessionResolveBackPress(DFileSession *session, DataBackPressure backPress, uint32_t count)
769 {
770 uint32_t index;
771
772 if (PthreadMutexLock(&session->backPressLock) != 0) {
773 LOGE(TAG, "pthread backPressLock mutex lock failed");
774 return;
775 }
776
777 if (backPress.recvListOverIo == 1) {
778 for (index = 0; index < count; index++) {
779 session->stopSendCnt[index]++;
780 }
781 } else {
782 for (index = 0; index < count; index++) {
783 session->stopSendCnt[index] = 0;
784 }
785 }
786
787 if (PthreadMutexUnlock(&session->backPressLock) != 0) {
788 LOGE(TAG, "pthread backPressLock mutex unlock failed");
789 return;
790 }
791
792 return;
793 }
794
DFileSessionHandleBackPressure(DFileSession * session,const DFileFrame * dFileFrame,const struct sockaddr_in * peerAddr)795 static void DFileSessionHandleBackPressure(DFileSession *session, const DFileFrame *dFileFrame,
796 const struct sockaddr_in *peerAddr)
797 {
798 DataBackPressure backPress;
799 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
800 if (peerInfo == NULL) {
801 LOGE(TAG, "can't get valid peerinfo");
802 return;
803 }
804
805 if (DecodeBackPressFrame((BackPressureFrame *)dFileFrame, &backPress) != NSTACKX_EOK) {
806 return;
807 }
808
809 DFileSessionResolveBackPress(session, backPress, session->clientSendThreadNum);
810
811 LOGI(TAG, "handle back pressure recvListOverIo %u recvBufThreshold %u stopSendPeriod %u",
812 backPress.recvListOverIo, backPress.recvBufThreshold,
813 backPress.stopSendPeriod);
814
815 return;
816 }
817
CreateTrans(uint16_t transId,DFileSession * session,PeerInfo * peerInfo,uint8_t isSender)818 static DFileTrans *CreateTrans(uint16_t transId, DFileSession *session, PeerInfo *peerInfo, uint8_t isSender)
819 {
820 DFileTransPara transPara;
821
822 if (peerInfo == NULL) {
823 return NULL;
824 }
825 (void)memset_s(&transPara, sizeof(transPara), 0, sizeof(transPara));
826 transPara.isSender = isSender;
827 transPara.transId = transId; /* for receiver, use transId of sender */
828 transPara.fileManager = session->fileManager;
829 transPara.writeHandle = DFileWriteHandle;
830 transPara.msgReceiver = DTransMsgReceiver;
831 transPara.connType = peerInfo->connType;
832 transPara.context = peerInfo;
833 transPara.session = session;
834 transPara.onRenameFile = session->onRenameFile;
835
836 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
837 return DFileTransCreate(&transPara);
838 }
839
DFileSessionHandleFrame(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)840 static int32_t DFileSessionHandleFrame(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
841 {
842 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
843 if (peerInfo == NULL) {
844 LOGI(TAG, "can't find peerInfo");
845 return NSTACKX_EFAILED;
846 }
847
848 if (peerInfo->session->sessionType == DFILE_SESSION_TYPE_SERVER && peerInfo->state == SETTING_NEGOTIATING) {
849 peerInfo->state = SETTING_NEGOTIATED;
850 TimerDelete(peerInfo->settingTimer);
851 peerInfo->settingTimer = NULL;
852 }
853
854 uint16_t transId = ntohs(dFileFrame->header.transId);
855 if (transId == 0) {
856 LOGE(TAG, "transId is 0");
857 return NSTACKX_EFAILED;
858 }
859
860 DFileTrans *trans = SearchDFileTransNode(&(session->dFileTransChain), transId);
861 if (trans == NULL) {
862 if (dFileFrame->header.type != NSTACKX_DFILE_FILE_HEADER_FRAME) {
863 /* Only HEADER frame can start dfile transfer (receiver) */
864 LOGE(TAG, "trans %u is NULL && type is %u", transId, dFileFrame->header.type);
865 return NSTACKX_EFAILED;
866 }
867 if (IsTransIdDone(session, transId) == NSTACKX_EOK) {
868 return NSTACKX_EFAILED;
869 }
870 trans = CreateTrans(transId, session, peerInfo, NSTACKX_FALSE);
871 if (trans == NULL) {
872 DFileMsg data;
873 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
874 data.errorCode = NSTACKX_ENOMEM;
875 NotifyMsgRecver(session, DFILE_ON_FATAL_ERROR, &data);
876 LOGE(TAG, "trans is NULL");
877 return NSTACKX_EFAILED;
878 }
879 if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
880 LOGE(TAG, "set trans mtu failed");
881 }
882 CalculateSessionTransferRatePrepare(session);
883 ListInsertTail(&(session->dFileTransChain), &(trans->list));
884 peerInfo->currentTransCount++;
885 }
886
887 return HandleDFileFrame(trans, dFileFrame);
888 }
889
DFileWriteHandle(const uint8_t * frame,size_t len,void * context)890 int32_t DFileWriteHandle(const uint8_t *frame, size_t len, void *context)
891 {
892 PeerInfo *peerInfo = context;
893 DFileSession *session = peerInfo->session;
894 QueueNode *queueNode = NULL;
895 struct sockaddr_in peerAddr;
896
897 peerAddr = peerInfo->dstAddr;
898 queueNode = CreateQueueNode(frame, len, &peerAddr, peerInfo->socketIndex);
899 if (queueNode == NULL) {
900 return NSTACKX_ENOMEM;
901 }
902
903 if (PthreadMutexLock(&session->outboundQueueLock) != 0) {
904 LOGE(TAG, "pthread mutex lock failed");
905 free(queueNode->frame);
906 free(queueNode);
907 return NSTACKX_EFAILED;
908 }
909 ListInsertTail(&session->outboundQueue, &queueNode->list);
910 session->outboundQueueSize++;
911 if (PthreadMutexUnlock(&session->outboundQueueLock) != 0) {
912 /* Don't need to free node, as it's mount to outboundQueue */
913 LOGE(TAG, "pthread mutex unlock failed");
914 return NSTACKX_EFAILED;
915 }
916 SemPost(&session->outboundQueueWait[peerInfo->socketIndex]);
917 return (int32_t)len;
918 }
919
BindMainLoopToTargetCpu(void)920 static void BindMainLoopToTargetCpu(void)
921 {
922 int32_t cpu;
923 int32_t cpus = GetCpuNum();
924 if (cpus >= FIRST_CPU_NUM_LEVEL) {
925 return;
926 } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
927 cpu = CPU_IDX_0;
928 } else {
929 return;
930 }
931 StartThreadBindCore(cpu);
932 }
933
GetEpollWaitTimeOut(DFileSession * session)934 static int64_t GetEpollWaitTimeOut(DFileSession *session)
935 {
936 int64_t minTimeout = DEFAULT_WAIT_TIME_MS;
937 if (session->mainLoopActiveReadFlag && session->inboundQueueSize) {
938 minTimeout = 0;
939 return minTimeout;
940 }
941 List *pos = NULL;
942 DFileTrans *trans = NULL;
943 int64_t timeout;
944 LIST_FOR_EACH(pos, &session->dFileTransChain) {
945 trans = (DFileTrans *)pos;
946 timeout = DFileTransGetTimeout(trans);
947 if (timeout >= 0 && timeout < minTimeout) {
948 minTimeout = timeout;
949 }
950 }
951 if (minTimeout > DEFAULT_WAIT_TIME_MS) {
952 minTimeout = DEFAULT_WAIT_TIME_MS;
953 }
954 return minTimeout;
955 }
956
ProcessSessionTrans(const DFileSession * session,uint16_t exceptTransId)957 static void ProcessSessionTrans(const DFileSession *session, uint16_t exceptTransId)
958 {
959 List *tmp = NULL;
960 List *pos = NULL;
961 LIST_FOR_EACH_SAFE(pos, tmp, &session->dFileTransChain) {
962 DFileTrans *trans = (DFileTrans *)pos;
963 if (trans->transId != exceptTransId) {
964 DFileTransProcess(trans);
965 }
966 }
967 }
968
NotifyBindPort(const DFileSession * session)969 static void NotifyBindPort(const DFileSession *session)
970 {
971 DFileMsg data;
972 int32_t socketNum;
973 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
974 socketNum = 1;
975
976 for (int i = 0; i < socketNum; i++) {
977 data.sockAddr[i].sin_port = ntohs(session->socket[i]->srcAddr.sin_port);
978 data.sockAddr[i].sin_addr.s_addr = ntohl(session->socket[i]->srcAddr.sin_addr.s_addr);
979 }
980 NotifyMsgRecver(session, DFILE_ON_BIND, &data);
981 }
982
DFileMainLoop(void * arg)983 void *DFileMainLoop(void *arg)
984 {
985 DFileSession *session = arg;
986 int32_t ret = NSTACKX_EOK;
987 DFileMsg msgData;
988 (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
989 uint8_t isBind = NSTACKX_FALSE;
990 LOGI(TAG, "main thread start");
991 SetThreadName(DFFILE_MAIN_THREAD_NAME);
992 SetMaximumPriorityForThread();
993 SetTidToBindInfo(session, POS_MAIN_THERD_START);
994 NotifyBindPort(session);
995 while (!session->closeFlag) {
996 int64_t minTimeout = GetEpollWaitTimeOut(session);
997 ret = EpollLoop(session->epollfd, (int32_t)minTimeout);
998 if (ret == NSTACKX_EFAILED) {
999 LOGE(TAG, "epoll wait failed");
1000 break;
1001 }
1002 if (isBind == NSTACKX_FALSE && session->transFlag == NSTACKX_TRUE) {
1003 BindMainLoopToTargetCpu();
1004 isBind = NSTACKX_TRUE;
1005 }
1006 ProcessSessionTrans(session, 0);
1007 if (session->mainLoopActiveReadFlag && session->inboundQueueSize) {
1008 session->partReadFlag = NSTACKX_TRUE;
1009 ReadEventHandle(session);
1010 session->partReadFlag = NSTACKX_FALSE;
1011 }
1012 }
1013
1014 if (ret == NSTACKX_EFAILED || DFileSessionCheckFatalFlag(session)) {
1015 msgData.errorCode = NSTACKX_EFAILED;
1016 NotifyMsgRecver(session, DFILE_ON_FATAL_ERROR, &msgData);
1017 }
1018
1019 /* Notify sender thread to terminate */
1020 PostOutboundQueueWait(session);
1021
1022 /* Unblock "select" and notify receiver thread to terminate */
1023 NotifyPipeEvent(session);
1024 return NULL;
1025 }
1026
AmendPeerInfoSendRate(PeerInfo * peerInfo)1027 static void AmendPeerInfoSendRate(PeerInfo *peerInfo)
1028 {
1029 peerInfo->amendSendRate = peerInfo->sendRate;
1030 LOGI(TAG, "current: sendrate %u, realsendframerate %u---new amendSendRate %d",
1031 peerInfo->sendRate, peerInfo->sendFrameRate, peerInfo->amendSendRate);
1032 }
1033
DFileSendCalculateRate(DFileSession * session,PeerInfo * peerInfo)1034 static void DFileSendCalculateRate(DFileSession *session, PeerInfo *peerInfo)
1035 {
1036 uint64_t sendCount;
1037
1038 if (session->sessionType != DFILE_SESSION_TYPE_CLIENT) {
1039 return;
1040 }
1041
1042 struct timespec nowTime;
1043 ClockGetTime(CLOCK_MONOTONIC, &nowTime);
1044 uint64_t measureElapse = GetTimeDiffUs(&nowTime, &peerInfo->startTime);
1045 /* just calculate io rate */
1046 if (measureElapse > peerInfo->rateStateInterval) {
1047 /* ONLY PEER 0 calculate io rate */
1048 sendCount = (uint64_t)peerInfo->sendCount;
1049 if (peerInfo->socketIndex == 0) {
1050 session->fileManager->iorRate = (uint32_t)(session->fileManager->iorBytes *
1051 NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1052 LOGI(TAG, "IO read rate: %u MB/s send list full times %u", session->fileManager->iorRate,
1053 session->fileManager->sendListFullTimes);
1054 session->fileManager->sendListFullTimes = 0;
1055 session->fileManager->iorBytes = 0;
1056 }
1057
1058 peerInfo->sendFrameRate = (uint32_t)(sendCount * DATA_FRAME_SEND_INTERVAL_US / measureElapse);
1059 peerInfo->sendCountRateMB = (uint32_t)(sendCount * peerInfo->dataFrameSize *
1060 NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1061 if (peerInfo->qdiscSearchNum != 0) {
1062 peerInfo->qdiscAveLeft = peerInfo->qdiscAveLeft / peerInfo->qdiscSearchNum;
1063 }
1064
1065 LOGI(TAG, "framesize %u maxsendrate %u sendRate %u, amendSendRate %d sendCount %llu,"
1066 "measureElapse %llu sendFrameRate %u %uMB/s,"
1067 "total send block num %llu eAgainCount %u send list empty times %u sleep times %u, noPendingData %u,"
1068 "min qdisc %u max qdisc %u search num %u ave qdisc %u"
1069 "totalRecvBlocks %llu socket:%u "
1070 "overRun %llu maxRetryCountPerSec %u maxRetryCountLastSec %u wlanCatagory %u",
1071 peerInfo->dataFrameSize, peerInfo->maxSendRate, peerInfo->sendRate, peerInfo->amendSendRate,
1072 peerInfo->sendCount, measureElapse, peerInfo->sendFrameRate, peerInfo->sendCountRateMB,
1073 NSTACKX_ATOM_FETCH(&(session->totalSendBlocks)), peerInfo->eAgainCount,
1074 NSTACKX_ATOM_FETCH(&(session->sendBlockListEmptyTimes)), session->sleepTimes,
1075 NSTACKX_ATOM_FETCH(&(session->noPendingDataTimes)), peerInfo->qdiscMinLeft,
1076 peerInfo->qdiscMaxLeft, peerInfo->qdiscSearchNum, peerInfo->qdiscAveLeft,
1077 NSTACKX_ATOM_FETCH(&(session->totalRecvBlocks)), peerInfo->socketIndex,
1078 peerInfo->overRun, peerInfo->maxRetryCountPerSec, peerInfo->maxRetryCountLastSec, session->wlanCatagory);
1079 AmendPeerInfoSendRate(peerInfo);
1080 ClearSessionStats(session);
1081 ClearPeerinfoStats(peerInfo);
1082 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1083 }
1084 }
1085
UpdateAllTransRetryCount(DFileSession * session,PeerInfo * peerInfo)1086 void UpdateAllTransRetryCount(DFileSession *session, PeerInfo *peerInfo)
1087 {
1088 List *pos = NULL;
1089 DFileTrans *trans = NULL;
1090 uint32_t allRetryCount = 0;
1091 LIST_FOR_EACH(pos, &session->dFileTransChain) { /* for client , only one peer */
1092 trans = (DFileTrans *)pos;
1093 allRetryCount += trans->retryCount;
1094 }
1095 peerInfo->allDtransRetryCount = allRetryCount;
1096 }
1097
DFileRecvCalculateRate(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)1098 static void DFileRecvCalculateRate(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
1099 {
1100 struct timespec nowTime;
1101 uint64_t recvCount;
1102 uint32_t timeOut;
1103
1104 if (session->sessionType != DFILE_SESSION_TYPE_SERVER ||
1105 dFileFrame->header.type != NSTACKX_DFILE_FILE_DATA_FRAME) {
1106 return;
1107 }
1108
1109 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
1110 if (peerInfo == NULL) {
1111 return;
1112 }
1113
1114 timeOut = (peerInfo->connType == CONNECT_TYPE_P2P) ? NSTACKX_P2P_MAX_CONTROL_FRAME_TIMEOUT :
1115 NSTACKX_WLAN_MAX_CONTROL_FRAME_TIMEOUT;
1116
1117 peerInfo->recvCount++;
1118 ClockGetTime(CLOCK_MONOTONIC, &nowTime);
1119 uint64_t measureElapse = GetTimeDiffUs(&nowTime, &session->measureBefore);
1120 if (measureElapse > peerInfo->rateStateInterval) {
1121 session->fileManager->iowRate = (uint32_t)(session->fileManager->iowBytes *
1122 NSTACKX_MICRO_TICKS / measureElapse / DFILE_KILOBYTES);
1123 session->fileManager->iowCount = session->fileManager->iowBytes / peerInfo->dataFrameSize *
1124 (NSTACKX_MILLI_TICKS * timeOut - peerInfo->rateStateInterval) / measureElapse;
1125 if (session->fileManager->iowRate > session->fileManager->iowMaxRate) {
1126 session->fileManager->iowMaxRate = session->fileManager->iowRate;
1127 }
1128 LOGI(TAG, "measureElapse %llu iowBytes %llu iowCount %llu IO write rate : %u KB/s", measureElapse,
1129 session->fileManager->iowBytes, session->fileManager->iowCount, session->fileManager->iowRate);
1130 session->fileManager->iowBytes = 0;
1131 ClockGetTime(CLOCK_MONOTONIC, &session->measureBefore);
1132 }
1133
1134 measureElapse = GetTimeDiffUs(&nowTime, &peerInfo->startTime);
1135 if (measureElapse > peerInfo->rateStateInterval) {
1136 recvCount = (uint64_t)peerInfo->recvCount;
1137 peerInfo->recvFrameRate = (uint32_t)(recvCount * DATA_FRAME_SEND_INTERVAL_US / measureElapse);
1138 peerInfo->recvCountRateMB = (uint32_t)(recvCount * peerInfo->dataFrameSize *
1139 NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1140 peerInfo->recvCount = 0;
1141 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1142 }
1143 }
1144
CheckElapseTime(const struct timespec * before,uint64_t overRun)1145 static uint64_t CheckElapseTime(const struct timespec *before, uint64_t overRun)
1146 {
1147 struct timespec now;
1148 ClockGetTime(CLOCK_MONOTONIC, &now);
1149 uint64_t elapseUs = GetTimeDiffUs(&now, before);
1150 elapseUs += overRun;
1151 if (elapseUs < DATA_FRAME_SEND_INTERVAL_US) {
1152 if (usleep((useconds_t)((uint64_t)DATA_FRAME_SEND_INTERVAL_US - elapseUs)) != NSTACKX_EOK) {
1153 LOGE(TAG, "usleep(DATA_FRAME_SEND_INTERVAL_US - elapseUs) failed %d", errno);
1154 }
1155 return 0;
1156 } else {
1157 uint64_t delta = (elapseUs - DATA_FRAME_SEND_INTERVAL_US);
1158 return delta;
1159 }
1160 }
1161
BindClientSendThreadToTargetCpu(uint32_t idx)1162 static void BindClientSendThreadToTargetCpu(uint32_t idx)
1163 {
1164 int32_t cpu;
1165 int32_t cpus = GetCpuNum();
1166 if (cpus >= FIRST_CPU_NUM_LEVEL) {
1167 return;
1168 } else if (cpus >= SECOND_CPU_NUM_LEVEL) {
1169 cpu = CPU_IDX_1 + (int32_t)idx;
1170 } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
1171 cpu = CPU_IDX_1;
1172 } else {
1173 return;
1174 }
1175 if (cpu > cpus) {
1176 cpu = cpus - 1;
1177 }
1178 StartThreadBindCore(cpu);
1179 }
1180
DFileSenderUpdateMeasureTime(DFileSession * session,uint8_t socketIndex)1181 static void DFileSenderUpdateMeasureTime(DFileSession *session, uint8_t socketIndex)
1182 {
1183 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
1184 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
1185 if (peerInfo == NULL) {
1186 return;
1187 }
1188 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1189 BindClientSendThreadToTargetCpu(0);
1190 }
1191 }
1192
TerminateMainThreadFatalInner(void * arg)1193 static void TerminateMainThreadFatalInner(void *arg)
1194 {
1195 DFileSession *session = (DFileSession *)arg;
1196 DFileSessionSetFatalFlag(session);
1197 }
1198
PostFatalEvent(DFileSession * session)1199 static void PostFatalEvent(DFileSession *session)
1200 {
1201 if (PostEvent(&session->eventNodeChain, session->epollfd, TerminateMainThreadFatalInner, session) != NSTACKX_EOK) {
1202 LOGE(TAG, "PostEvent TerminateMainThreadFatalInner failed");
1203 DFileSessionSetFatalFlag(session);
1204 }
1205 }
1206
GetSocketWaitMs(uint32_t threadNum)1207 static uint32_t GetSocketWaitMs(uint32_t threadNum)
1208 {
1209 if (threadNum > 1) {
1210 return MULTI_THREADS_SOCKET_WAIT_TIME_MS;
1211 }
1212 return DEFAULT_WAIT_TIME_MS;
1213 }
1214
SetSendThreadName(uint32_t threadIdx)1215 static void SetSendThreadName(uint32_t threadIdx)
1216 {
1217 char name[MAX_THREAD_NAME_LEN] = {0};
1218 if (sprintf_s(name, sizeof(name), "%s%u", DFFILE_SEND_THREAD_NAME_PREFIX, threadIdx) < 0) {
1219 LOGE(TAG, "sprintf send thead name failed");
1220 }
1221 SetThreadName(name);
1222 }
1223
1224 typedef struct {
1225 struct DFileSession *session;
1226 uint32_t threadIdx;
1227 }SendThreadCtx;
1228
DFileAddiSenderHandle(void * arg)1229 static void *DFileAddiSenderHandle(void *arg)
1230 {
1231 SendThreadCtx *sendThreadCtx = (SendThreadCtx *)arg;
1232 struct DFileSession *session = sendThreadCtx->session;
1233 uint32_t threadIdx = sendThreadCtx->threadIdx;
1234 free(sendThreadCtx);
1235 int32_t ret = NSTACKX_EOK;
1236 LOGI(TAG, "send thread %u start", threadIdx);
1237 SetSendThreadName(threadIdx);
1238 SetTidToBindInfo(session, threadIdx + POS_SEND_THERD_START);
1239 List unsent;
1240 uint8_t canWrite = NSTACKX_FALSE;
1241 uint32_t socketWaitMs = GetSocketWaitMs(session->clientSendThreadNum);
1242 BindClientSendThreadToTargetCpu(threadIdx + 1);
1243 ListInitHead(&unsent);
1244 while (!session->addiSenderCloseFlag) {
1245 if (ListIsEmpty(&unsent) && !FileManagerHasPendingData(session->fileManager)) {
1246 NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
1247 SemWait(&session->sendThreadPara[threadIdx].sendWait);
1248 if (session->addiSenderCloseFlag) {
1249 break;
1250 }
1251 }
1252 if (ret == NSTACKX_EAGAIN) {
1253 ret = WaitSocketEvent(NULL, session->socket[0]->sockfd, socketWaitMs, NULL, &canWrite);
1254 if (ret != NSTACKX_EOK || session->closeFlag) {
1255 break;
1256 }
1257 if (!canWrite) {
1258 ret = NSTACKX_EAGAIN;
1259 continue;
1260 }
1261 }
1262 ret = SendDataFrame(session, &unsent, threadIdx, 0);
1263 if ((ret < 0 && ret != NSTACKX_EAGAIN) || session->closeFlag) {
1264 break;
1265 }
1266 if (ret == NSTACKX_EAGAIN) {
1267 continue;
1268 }
1269 SemWait(&session->sendThreadPara[threadIdx].semNewCycle);
1270 }
1271 if (ret < 0 && ret != NSTACKX_EAGAIN) {
1272 PostFatalEvent(session);
1273 }
1274 DestroyIovList(&unsent, session, threadIdx);
1275 return NULL;
1276 }
1277
CloseAddiSendThread(struct DFileSession * session)1278 static void CloseAddiSendThread(struct DFileSession *session)
1279 {
1280 if (session->sessionType == DFILE_SESSION_TYPE_SERVER || session->clientSendThreadNum <= 1) {
1281 return;
1282 }
1283 session->addiSenderCloseFlag = NSTACKX_TRUE;
1284 for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
1285 SemPost(&session->sendThreadPara[i].sendWait);
1286 SemPost(&session->sendThreadPara[i].semNewCycle);
1287 PthreadJoin(session->sendThreadPara[i].senderTid, NULL);
1288 SemDestroy(&session->sendThreadPara[i].sendWait);
1289 SemDestroy(&session->sendThreadPara[i].semNewCycle);
1290 }
1291 }
1292
ErrorHandle(struct DFileSession * session,uint16_t cnt)1293 static void ErrorHandle(struct DFileSession *session, uint16_t cnt)
1294 {
1295 while (cnt > 0) {
1296 SemPost(&session->sendThreadPara[cnt - 1].sendWait);
1297 SemPost(&session->sendThreadPara[cnt - 1].semNewCycle);
1298 PthreadJoin(session->sendThreadPara[cnt - 1].senderTid, NULL);
1299 SemDestroy(&session->sendThreadPara[cnt - 1].sendWait);
1300 SemDestroy(&session->sendThreadPara[cnt - 1].semNewCycle);
1301 cnt--;
1302 }
1303 }
1304
CreateAddiSendThread(struct DFileSession * session)1305 static int32_t CreateAddiSendThread(struct DFileSession *session)
1306 {
1307 uint16_t i;
1308 if (session->sessionType == DFILE_SESSION_TYPE_SERVER || session->clientSendThreadNum <= 1) {
1309 return NSTACKX_EOK;
1310 }
1311 session->addiSenderCloseFlag = NSTACKX_FALSE;
1312 for (i = 0; i < session->clientSendThreadNum - 1; i++) {
1313 SendThreadCtx *sendThreadCtx = (SendThreadCtx *)calloc(1, sizeof(SendThreadCtx));
1314 if (sendThreadCtx == NULL) {
1315 goto L_ERR_SENDER_THREAD;
1316 }
1317
1318 if (SemInit(&session->sendThreadPara[i].sendWait, 0, 0) != 0) {
1319 free(sendThreadCtx);
1320 goto L_ERR_SENDER_THREAD;
1321 }
1322
1323 if (SemInit(&session->sendThreadPara[i].semNewCycle, 0, 0) != 0) {
1324 SemDestroy(&session->sendThreadPara[i].sendWait);
1325 free(sendThreadCtx);
1326 goto L_ERR_SENDER_THREAD;
1327 }
1328
1329 sendThreadCtx->session = session;
1330 sendThreadCtx->threadIdx = i;
1331 if (PthreadCreate(&(session->sendThreadPara[i].senderTid), NULL, DFileAddiSenderHandle, sendThreadCtx)) {
1332 LOGE(TAG, "Create sender thread failed");
1333 SemDestroy(&session->sendThreadPara[i].sendWait);
1334 SemDestroy(&session->sendThreadPara[i].semNewCycle);
1335 free(sendThreadCtx);
1336 goto L_ERR_SENDER_THREAD;
1337 }
1338 }
1339 return NSTACKX_EOK;
1340
1341 L_ERR_SENDER_THREAD:
1342 session->addiSenderCloseFlag = NSTACKX_TRUE;
1343 ErrorHandle(session, i);
1344 return NSTACKX_EFAILED;
1345 }
1346
UpdatePeerinfoQdiscInfo(PeerInfo * peerInfo,uint32_t qDiscLeft)1347 static void UpdatePeerinfoQdiscInfo(PeerInfo *peerInfo, uint32_t qDiscLeft)
1348 {
1349 if (peerInfo->qdiscMinLeft == 0) {
1350 peerInfo->qdiscMinLeft = (uint16_t)qDiscLeft;
1351 } else if (peerInfo->qdiscMinLeft > qDiscLeft) {
1352 peerInfo->qdiscMinLeft = (uint16_t)qDiscLeft;
1353 }
1354 if (peerInfo->qdiscMaxLeft < qDiscLeft) {
1355 peerInfo->qdiscMaxLeft = (uint16_t)qDiscLeft;
1356 }
1357 peerInfo->qdiscSearchNum++;
1358 peerInfo->qdiscAveLeft += qDiscLeft;
1359 }
1360
UpdatePeerinfoAmendSendrateByQdisc(PeerInfo * peerInfo)1361 static void UpdatePeerinfoAmendSendrateByQdisc(PeerInfo *peerInfo)
1362 {
1363 uint32_t qDiscLeft;
1364 uint32_t qDiscLeftSendRate;
1365
1366 if (GetQdiscLen(peerInfo->localInterfaceName, ROOT_QUEUE, &qDiscLeft) == NSTACKX_EOK) {
1367 if (peerInfo->mtuInuse == 0) {
1368 return;
1369 }
1370 uint32_t mtuNumInOneFrame = peerInfo->dataFrameSize / peerInfo->mtuInuse;
1371 if (mtuNumInOneFrame == 0) {
1372 mtuNumInOneFrame = 1;
1373 }
1374 UpdatePeerinfoQdiscInfo(peerInfo, qDiscLeft);
1375 qDiscLeftSendRate = qDiscLeft / mtuNumInOneFrame;
1376 if (peerInfo->sendRate >= qDiscLeftSendRate) {
1377 peerInfo->amendSendRate = (int32_t)qDiscLeftSendRate;
1378 } else {
1379 peerInfo->amendSendRate = peerInfo->sendRate;
1380 }
1381 }
1382 return;
1383 }
1384
UpdateInfoAfterSend(DFileSession * session,PeerInfo * peerInfo,int32_t curAmendSendRate,struct timespec * before,uint8_t socketIndex)1385 static void UpdateInfoAfterSend(DFileSession *session, PeerInfo *peerInfo, int32_t curAmendSendRate,
1386 struct timespec *before, uint8_t socketIndex)
1387 {
1388 session->cycleRunning[socketIndex] = NSTACKX_FALSE;
1389 if (curAmendSendRate > 0 && peerInfo->intervalSendCount >= (uint32_t)curAmendSendRate) {
1390 if (peerInfo->decreaseStatus == 1) {
1391 peerInfo->overRun = 0;
1392 peerInfo->decreaseStatus = 0;
1393 }
1394 peerInfo->overRun = CheckElapseTime(before, peerInfo->overRun);
1395 if (peerInfo->overRun == 0) {
1396 session->sleepTimes++;
1397 }
1398 }
1399 peerInfo->intervalSendCount = 0;
1400
1401 UpdatePeerinfoAmendSendrateByQdisc(peerInfo);
1402 }
1403
DFileSessionSendFrame(DFileSession * session,QueueNode ** preQueueNode,List * unsent,struct timespec * before,uint8_t socketIndex)1404 static int32_t DFileSessionSendFrame(DFileSession *session, QueueNode **preQueueNode, List *unsent,
1405 struct timespec *before, uint8_t socketIndex)
1406 {
1407 int32_t ret;
1408
1409 if (CapsTcp(session) && !ListIsEmpty(unsent)) {
1410 session->sendRemain = 1;
1411 ret = SendDataFrame(session, unsent, (uint32_t)(session->clientSendThreadNum - 1), socketIndex);
1412 session->sendRemain = 0;
1413 if ((ret == NSTACKX_EFAILED || ret == NSTACKX_EAGAIN) || session->closeFlag) {
1414 LOGI(TAG, "ret is %d", ret);
1415 return ret;
1416 }
1417 }
1418
1419 ret = SendOutboundFrame(session, preQueueNode);
1420 if (session->sessionType != DFILE_SESSION_TYPE_CLIENT) {
1421 return ret;
1422 }
1423 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
1424 if (peerInfo == NULL) {
1425 LOGE(TAG, "can't get valid peerinfo");
1426 return NSTACKX_EFAILED;
1427 }
1428 if (peerInfo->mtuInuse == 0) {
1429 return ret;
1430 }
1431
1432 if (ret == NSTACKX_EOK) {
1433 if (!session->cycleRunning[socketIndex]) {
1434 session->cycleRunning[socketIndex] = NSTACKX_TRUE;
1435 }
1436 ret = SendDataFrame(session, unsent, (uint32_t)(session->clientSendThreadNum - 1), socketIndex);
1437 }
1438 int32_t curAmendSendRate = peerInfo->amendSendRate;
1439 DFileSendCalculateRate(session, peerInfo);
1440
1441 if ((ret == NSTACKX_EFAILED || ret == NSTACKX_EAGAIN) || session->closeFlag) {
1442 LOGI(TAG, "ret is %d and peerInfo->intervalSendCount is %u", ret, peerInfo->intervalSendCount);
1443 return ret;
1444 }
1445
1446 UpdateInfoAfterSend(session, peerInfo, curAmendSendRate, before, socketIndex);
1447
1448 for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
1449 SemPost(&session->sendThreadPara[i].semNewCycle);
1450 }
1451 return ret;
1452 }
1453
WaitNewSendData(const QueueNode * queueNode,const List * unsent,DFileSession * session,uint8_t socketIndex)1454 static void WaitNewSendData(const QueueNode *queueNode, const List *unsent, DFileSession *session, uint8_t socketIndex)
1455 {
1456 if (queueNode == NULL && ListIsEmpty(unsent) && !FileManagerHasPendingData(session->fileManager) &&
1457 !session->outboundQueueSize) {
1458 NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
1459 SemWait(&session->outboundQueueWait[socketIndex]);
1460 }
1461 }
1462
DFileSenderPre(DFileSession * session,uint8_t socketIndex)1463 static void DFileSenderPre(DFileSession *session, uint8_t socketIndex)
1464 {
1465 SetSendThreadName((uint32_t)(session->clientSendThreadNum + socketIndex - 1));
1466 DFileSenderUpdateMeasureTime(session, socketIndex);
1467 SetMaximumPriorityForThread();
1468 uint32_t pos = (uint32_t)(session->clientSendThreadNum - 1 + socketIndex + POS_SEND_THERD_START);
1469 if (pos >= POS_SEND_THERD_START + NSTACKX_MAX_CLIENT_SEND_THREAD_NUM) {
1470 return;
1471 }
1472
1473 SetTidToBindInfo(session, pos);
1474 }
1475
DFileSenderClose(DFileSession * session,QueueNode * queueNode,List * unsent,void * arg)1476 void DFileSenderClose(DFileSession *session, QueueNode *queueNode, List *unsent, void *arg)
1477 {
1478 LOGI(TAG, "DFileSendCalculateRate: total send block num %llu.", session->totalSendBlocks);
1479 CloseAddiSendThread(session);
1480 DestroyIovList(unsent, session, session->clientSendThreadNum - 1U);
1481 DestroyQueueNode(queueNode);
1482 free(arg);
1483 return;
1484 }
1485
DFileSenderHandle(void * arg)1486 void *DFileSenderHandle(void *arg)
1487 {
1488 DFileSession *session = ((SenderThreadPara*)arg)->session;
1489 uint8_t socketIndex = ((SenderThreadPara*)arg)->socketIndex;
1490 QueueNode *queueNode = NULL;
1491 List unsent;
1492 int32_t ret = NSTACKX_EOK;
1493 struct timespec before;
1494 uint8_t canWrite = NSTACKX_FALSE;
1495 uint32_t socketWaitMs = GetSocketWaitMs(session->clientSendThreadNum);
1496 uint8_t isBind = NSTACKX_FALSE;
1497
1498 if (CreateAddiSendThread(session) != NSTACKX_EOK) {
1499 PostFatalEvent(session);
1500 return NULL;
1501 }
1502 ListInitHead(&unsent);
1503 DFileSenderPre(session, socketIndex);
1504 while (!session->closeFlag) {
1505 WaitNewSendData(queueNode, &unsent, session, socketIndex);
1506 if (session->closeFlag) {
1507 break;
1508 }
1509 if (!session->cycleRunning[socketIndex]) {
1510 ClockGetTime(CLOCK_MONOTONIC, &before);
1511 }
1512 if (ret == NSTACKX_EAGAIN) {
1513 ret = WaitSocketEvent(NULL, session->socket[socketIndex]->sockfd, socketWaitMs, NULL, &canWrite);
1514 if (ret != NSTACKX_EOK || session->closeFlag) {
1515 break;
1516 }
1517 if (!canWrite) {
1518 ret = NSTACKX_EAGAIN;
1519 continue;
1520 }
1521 }
1522 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && isBind == NSTACKX_FALSE &&
1523 session->transFlag == NSTACKX_TRUE) {
1524 BindClientSendThreadToTargetCpu(0);
1525 isBind = NSTACKX_TRUE;
1526 }
1527 ret = DFileSessionSendFrame(session, &queueNode, &unsent, &before, socketIndex);
1528 if (ret < 0 && ret != NSTACKX_EAGAIN) {
1529 PostFatalEvent(session);
1530 break;
1531 }
1532 }
1533 DFileSenderClose(session, queueNode, &unsent, arg);
1534 return NULL;
1535 }
1536
ClearDFileFrameList(List * head)1537 static void ClearDFileFrameList(List *head)
1538 {
1539 if (head == NULL || ListIsEmpty(head)) {
1540 return;
1541 }
1542 List *tmp = NULL;
1543 List *pos = NULL;
1544 LIST_FOR_EACH_SAFE(pos, tmp, head) {
1545 QueueNode *node = (QueueNode *)pos;
1546 ListRemoveNode(&node->list);
1547 DestroyQueueNode(node);
1548 }
1549 }
1550
CheckDfileType(uint8_t type)1551 static inline int32_t CheckDfileType(uint8_t type)
1552 {
1553 if (type >= NSTACKX_DFILE_TYPE_MAX || type < NSTACKX_DFILE_FILE_HEADER_FRAME) {
1554 return NSTACKX_EFAILED;
1555 }
1556 return NSTACKX_EOK;
1557 }
1558
CheckSessionType(DFileSessionType type)1559 static inline int32_t CheckSessionType(DFileSessionType type)
1560 {
1561 if (type < DFILE_SESSION_TYPE_CLIENT || type > DFILE_SESSION_TYPE_SERVER) {
1562 return NSTACKX_EFAILED;
1563 }
1564 return NSTACKX_EOK;
1565 }
1566
ProcessDFileFrameList(DFileSession * session,List * head)1567 static void ProcessDFileFrameList(DFileSession *session, List *head)
1568 {
1569 QueueNode *queueNode = NULL;
1570 DFileFrame *dFileFrame = NULL;
1571 struct sockaddr_in *peerAddr = NULL;
1572 int32_t handleFrameRet = NSTACKX_EOK;
1573 while (!ListIsEmpty(head) && !session->closeFlag) {
1574 queueNode = (QueueNode *)ListPopFront(head);
1575 if (queueNode == NULL) {
1576 continue;
1577 }
1578 dFileFrame = (DFileFrame *)(queueNode->frame);
1579 peerAddr = &queueNode->peerAddr;
1580 uint8_t type = dFileFrame->header.type;
1581
1582 if (ntohs(dFileFrame->header.length) > NSTACKX_MAX_FRAME_SIZE - sizeof(DFileFrameHeader)) {
1583 LOGE(TAG, "header length %u is too big", ntohs(dFileFrame->header.length));
1584 DestroyQueueNode(queueNode);
1585 continue;
1586 }
1587 if (CheckDfileType(dFileFrame->header.type) != NSTACKX_EOK ||
1588 CheckSessionType(session->sessionType) != NSTACKX_EOK) {
1589 handleFrameRet = NSTACKX_EFAILED;
1590 } else if (dFileFrame->header.type == NSTACKX_DFILE_SETTING_FRAME) {
1591 DFileSessionHandleSetting(session, dFileFrame, peerAddr, queueNode->socketIndex);
1592 } else if (dFileFrame->header.type == NSTACKX_DFILE_RST_FRAME &&
1593 dFileFrame->header.transId == 0) {
1594 DFileSessionHandleRst(session, dFileFrame, peerAddr);
1595 } else if (dFileFrame->header.type == NSTACKX_DFILE_FILE_BACK_PRESSURE_FRAME) {
1596 DFileSessionHandleBackPressure(session, dFileFrame, peerAddr);
1597 } else {
1598 /*
1599 * For NSTACKX_DFILE_FILE_DATA_FRAME, "dFileFrame" may be free when handling failed, and become invalid.
1600 */
1601 handleFrameRet = DFileSessionHandleFrame(session, dFileFrame, peerAddr);
1602 }
1603
1604 if (handleFrameRet != NSTACKX_EOK || type != NSTACKX_DFILE_FILE_DATA_FRAME) {
1605 /* For FILE_DATA frame, the memory is passed to file manager. */
1606 free(queueNode->frame);
1607 queueNode->frame = NULL;
1608 }
1609 free(queueNode);
1610 }
1611 ClearDFileFrameList(head);
1612 }
1613
ReadEventHandle(void * arg)1614 static void ReadEventHandle(void *arg)
1615 {
1616 DFileSession *session = arg;
1617 List newHead;
1618 ListInitHead(&newHead);
1619 struct timespec before, now;
1620 if (!session->partReadFlag) {
1621 NSTACKX_ATOM_FETCH_DEC(&session->unprocessedReadEventCount);
1622 } else {
1623 ClockGetTime(CLOCK_MONOTONIC, &before);
1624 }
1625 while (session->inboundQueueSize && !session->closeFlag) {
1626 if (session->partReadFlag) {
1627 ClockGetTime(CLOCK_MONOTONIC, &now);
1628 if (GetTimeDiffMs(&now, &before) >= DEFAULT_WAIT_TIME_MS) {
1629 break;
1630 }
1631 }
1632 if (PthreadMutexLock(&session->inboundQueueLock) != 0) {
1633 LOGE(TAG, "PthreadMutexLock error");
1634 return;
1635 }
1636 ListMove(&session->inboundQueue, &newHead);
1637 session->recvBlockNumInner += session->inboundQueueSize;
1638 session->inboundQueueSize = 0;
1639 if (PthreadMutexUnlock(&session->inboundQueueLock) != 0) {
1640 LOGE(TAG, "PthreadMutexUnlock error");
1641 break;
1642 }
1643 ProcessDFileFrameList(session, &newHead);
1644 }
1645 ClearDFileFrameList(&newHead);
1646 }
1647
DFileAddInboundQueue(DFileSession * session,const uint8_t * frame,size_t frameLength,struct sockaddr_in * peerAddr,uint8_t socketIndex)1648 static int32_t DFileAddInboundQueue(DFileSession *session, const uint8_t *frame, size_t frameLength,
1649 struct sockaddr_in *peerAddr, uint8_t socketIndex)
1650 {
1651 if (session->inboundQueueSize > MAX_RECVBUF_COUNT) {
1652 if (session->inboundQueueSize % MAX_NOMEM_PRINT == 0) {
1653 LOGI(TAG, "no mem inboundQueueSize:%llu", session->inboundQueueSize);
1654 }
1655 return NSTACKX_ENOMEM;
1656 }
1657 QueueNode *queueNode = CreateQueueNode(frame, frameLength, peerAddr, socketIndex);
1658 if (queueNode == NULL) {
1659 return NSTACKX_ENOMEM;
1660 }
1661
1662 if (PthreadMutexLock(&session->inboundQueueLock) != 0) {
1663 DestroyQueueNode(queueNode);
1664 return NSTACKX_EFAILED;
1665 }
1666 ListInsertTail(&session->inboundQueue, &queueNode->list);
1667 session->inboundQueueSize++;
1668 session->recvBlockNumDirect++;
1669
1670 if (PthreadMutexUnlock(&session->inboundQueueLock) != 0) {
1671 /* queue node is pushed to list, don't need to destory here. */
1672 return NSTACKX_EFAILED;
1673 }
1674
1675 return NSTACKX_EOK;
1676 }
1677
DFileReceiverUpdateSessionMeasureTime(DFileSession * session)1678 static void DFileReceiverUpdateSessionMeasureTime(DFileSession *session)
1679 {
1680 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
1681 ClockGetTime(CLOCK_MONOTONIC, &session->measureBefore);
1682 }
1683 }
1684
BindServerRecvThreadToTargetCpu(DFileSession * session)1685 static void BindServerRecvThreadToTargetCpu(DFileSession *session)
1686 {
1687 int32_t cpu;
1688 int32_t cpus = GetCpuNum();
1689 if (session->sessionType != DFILE_SESSION_TYPE_SERVER) {
1690 cpu = CPU_IDX_2;
1691 StartThreadBindCore(cpu);
1692 return;
1693 }
1694 if (cpus >= FIRST_CPU_NUM_LEVEL) {
1695 return;
1696 } else if (cpus >= SECOND_CPU_NUM_LEVEL) {
1697 cpu = CPU_IDX_1;
1698 } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
1699 cpu = CPU_IDX_0;
1700 } else {
1701 return;
1702 }
1703 StartThreadBindCore(cpu);
1704 }
1705
PostReadEventToMainLoop(DFileSession * session)1706 static void PostReadEventToMainLoop(DFileSession *session)
1707 {
1708 if (NSTACKX_ATOM_FETCH(&(session->unprocessedReadEventCount)) >= MAX_UNPROCESSED_READ_EVENT_COUNT) {
1709 return;
1710 }
1711 NSTACKX_ATOM_FETCH_INC(&session->unprocessedReadEventCount);
1712 if (PostEvent(&session->eventNodeChain, session->epollfd, ReadEventHandle, session) != NSTACKX_EOK) {
1713 LOGE(TAG, "post read event failed");
1714 NSTACKX_ATOM_FETCH_DEC(&session->unprocessedReadEventCount);
1715 session->mainLoopActiveReadFlag = NSTACKX_TRUE;
1716 } else {
1717 session->mainLoopActiveReadFlag = NSTACKX_FALSE;
1718 }
1719 }
1720
DFileSessionHandleReadBuffer(DFileSession * session,const uint8_t * buf,size_t bufLen,struct sockaddr_in * peerAddr,uint8_t socketIndex)1721 int32_t DFileSessionHandleReadBuffer(DFileSession *session, const uint8_t *buf, size_t bufLen,
1722 struct sockaddr_in *peerAddr, uint8_t socketIndex)
1723 {
1724 DFileFrame *dFileFrame = NULL;
1725 if (DecodeDFileFrame(buf, bufLen, &dFileFrame) != NSTACKX_EOK) {
1726 /* discard packet with non-zero trans id during cancel stage */
1727 LOGE(TAG, "drop malformed frame");
1728 return NSTACKX_EOK;
1729 }
1730
1731 int32_t ret = DFileAddInboundQueue(session, buf, bufLen, peerAddr, socketIndex);
1732 if (ret == NSTACKX_ENOMEM) {
1733 return NSTACKX_EOK;
1734 }
1735 if (ret != NSTACKX_EOK) {
1736 LOGE(TAG, "frame add in bound queue failed :%d", ret);
1737 return NSTACKX_EFAILED;
1738 }
1739 PostReadEventToMainLoop(session);
1740 DFileRecvCalculateRate(session, dFileFrame, peerAddr);
1741 return NSTACKX_EOK;
1742 }
1743
DFileRecverPre(DFileSession * session)1744 static void DFileRecverPre(DFileSession *session)
1745 {
1746 SetThreadName(DFFILE_RECV_THREAD_NAME);
1747 DFileReceiverUpdateSessionMeasureTime(session);
1748 SetMaximumPriorityForThread();
1749 SetTidToBindInfo(session, POS_RECV_THERD_START);
1750 }
1751
RcverWaitSocket(DFileSession * session,uint8_t * canRead)1752 int32_t RcverWaitSocket(DFileSession *session, uint8_t *canRead)
1753 {
1754 if (session->acceptFlag == 0) {
1755 return WaitSocketEvent(session, session->socket[0]->sockfd, DEFAULT_WAIT_TIME_MS, canRead, NULL);
1756 } else {
1757 return WaitSocketEvent(session, session->acceptSocket->sockfd, DEFAULT_WAIT_TIME_MS, canRead, NULL);
1758 }
1759 }
1760
DFileSocketRecv(DFileSession * session)1761 int32_t DFileSocketRecv(DFileSession *session)
1762 {
1763 return DFileSocketRecvSP(session);
1764 }
1765
DFileAcceptSocket(DFileSession * session)1766 int32_t DFileAcceptSocket(DFileSession *session)
1767 {
1768 session->acceptSocket = AcceptSocket(session->socket[0]);
1769 if (session->acceptSocket == NULL) {
1770 LOGI(TAG, "accept socket failed");
1771 return NSTACKX_EFAILED;
1772 } else {
1773 LOGI(TAG, "accept socket success");
1774 session->acceptFlag = 1;
1775 SetTcpKeepAlive(session->acceptSocket->sockfd);
1776 }
1777
1778 return NSTACKX_EOK;
1779 }
1780
DFileReceiverHandle(void * arg)1781 void *DFileReceiverHandle(void *arg)
1782 {
1783 DFileSession *session = arg;
1784 uint8_t canRead = NSTACKX_FALSE;
1785 int32_t ret = NSTACKX_EAGAIN;
1786 uint8_t isBind = NSTACKX_FALSE;
1787
1788 LOGI(TAG, "recv thread start");
1789 DFileRecverPre(session);
1790 while (!session->closeFlag) {
1791 if (ret == NSTACKX_EAGAIN || !canRead) {
1792 ret = RcverWaitSocket(session, &canRead);
1793 if (ret != NSTACKX_EOK || session->closeFlag) {
1794 break;
1795 }
1796 }
1797 if (!canRead) {
1798 continue;
1799 }
1800 if (isBind == NSTACKX_FALSE && session->transFlag == NSTACKX_TRUE) {
1801 BindServerRecvThreadToTargetCpu(session);
1802 isBind = NSTACKX_TRUE;
1803 }
1804
1805 ret = DFileSocketRecv(session);
1806 if (ret != NSTACKX_EAGAIN && ret != NSTACKX_EOK) {
1807 break;
1808 }
1809 }
1810 LOGI(TAG, "Total recv blocks: direct %llu inner %llu", session->recvBlockNumDirect, session->recvBlockNumInner);
1811 if (ret < 0 && ret != NSTACKX_EAGAIN && ret != NSTACKX_PEER_CLOSE) {
1812 PostFatalEvent(session);
1813 }
1814
1815 LOGI(TAG, "session %u Exit receiver thread ret %d", session->sessionId, ret);
1816 return NULL;
1817 }
1818
DFileControlHandle(void * arg)1819 void *DFileControlHandle(void *arg)
1820 {
1821 SetThreadName(DFFILE_CONTROL_THREAD_NAME);
1822 DFileSession *session = arg;
1823 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
1824 DFileSenderControlHandle(session);
1825 } else {
1826 DFileReceiverControlHandle(session);
1827 }
1828 return NULL;
1829 }
1830
RealPathFileName(FileListInfo * fileListInfo)1831 static int32_t RealPathFileName(FileListInfo *fileListInfo)
1832 {
1833 uint32_t i;
1834 int32_t ret = NSTACKX_EOK;
1835 for (i = 0; i < fileListInfo->fileNum; i++) {
1836 char *tmpFileName = fileListInfo->files[i];
1837 char *tmpFileNameRes = realpath(tmpFileName, NULL);
1838 if (tmpFileNameRes == NULL) {
1839 ret = NSTACKX_EFAILED;
1840 break;
1841 }
1842 fileListInfo->files[i] = tmpFileNameRes;
1843 free(tmpFileName);
1844 }
1845 if (ret != NSTACKX_EOK) {
1846 LOGE(TAG, "realpath failed");
1847 }
1848 return ret;
1849 }
1850
FreeTransFileListInfo(FileListInfo * fileListInfo)1851 static void FreeTransFileListInfo(FileListInfo *fileListInfo)
1852 {
1853 free(fileListInfo->files);
1854 fileListInfo->files = NULL;
1855 if (fileListInfo->remotePath != NULL) {
1856 free(fileListInfo->remotePath);
1857 fileListInfo->remotePath = NULL;
1858 }
1859 free(fileListInfo);
1860 fileListInfo = NULL;
1861 }
1862
DFileStartTransInner(DFileSession * session,FileListInfo * fileListInfo)1863 static int32_t DFileStartTransInner(DFileSession *session, FileListInfo *fileListInfo)
1864 {
1865 uint16_t transId = session->lastDFileTransId + 1;
1866 if (transId == 0) { /* overflow */
1867 transId = 1;
1868 }
1869
1870 PeerInfo *peerInfo = TransSelectPeerInfo(session);
1871 DFileTrans *trans = CreateTrans(transId, session, peerInfo, NSTACKX_TRUE);
1872 if (trans == NULL) {
1873 LOGE(TAG, "CreateTrans error");
1874 return NSTACKX_ENOMEM;
1875 }
1876
1877 if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
1878 LOGE(TAG, "set trans mtu failed");
1879 }
1880 if (RealPathFileName(fileListInfo) != NSTACKX_EOK) {
1881 DFileTransDestroy(trans);
1882 return NSTACKX_EFAILED;
1883 }
1884 int32_t ret = DFileTransSendFiles(trans, fileListInfo);
1885 if (ret != NSTACKX_EOK) {
1886 DFileTransDestroy(trans);
1887 LOGE(TAG, "DFileTransSendFiles fail, error: %d", ret);
1888 return ret;
1889 }
1890 ret = DFileTransAddExtraInfo(trans, fileListInfo->pathType, fileListInfo->noticeFileNameType,
1891 fileListInfo->userData);
1892 if (ret != NSTACKX_EOK) {
1893 LOGE(TAG, "DFileTransAddExtraInfo fail");
1894 DFileTransDestroy(trans);
1895 return NSTACKX_EFAILED;
1896 }
1897 trans->fileList->tarFlag = fileListInfo->tarFlag;
1898 trans->fileList->smallFlag = fileListInfo->smallFlag;
1899 trans->fileList->tarFile = fileListInfo->tarFile;
1900 trans->fileList->noSyncFlag = fileListInfo->noSyncFlag;
1901
1902 fileListInfo->userData = NULL;
1903 ListInsertTail(&(session->dFileTransChain), &(trans->list));
1904 session->lastDFileTransId = transId;
1905 if (fileListInfo->smallFlag == NSTACKX_TRUE) {
1906 session->smallListProcessingCnt++;
1907 } else {
1908 session->fileListProcessingCnt++;
1909 }
1910 /* Elements in ctx->fileListInfo->files[] are reused by dFileTranns, so don't need to free. */
1911 FreeTransFileListInfo(fileListInfo);
1912 return NSTACKX_EOK;
1913 }
1914
DFileStartTrans(DFileSession * session,FileListInfo * fileListInfo)1915 int32_t DFileStartTrans(DFileSession *session, FileListInfo *fileListInfo)
1916 {
1917 if (PthreadMutexLock(&session->transIdLock) != 0) {
1918 LOGE(TAG, "pthread mutex lock error");
1919 return NSTACKX_EFAILED;
1920 }
1921 int32_t ret = DFileStartTransInner(session, fileListInfo);
1922 if (PthreadMutexUnlock(&session->transIdLock) != 0) {
1923 LOGE(TAG, "pthread mutex unlock error");
1924 }
1925 return ret;
1926 }
1927
TerminateMainThreadInner(void * arg)1928 void TerminateMainThreadInner(void *arg)
1929 {
1930 DFileSession *session = (DFileSession *)arg;
1931 DFileSessionSetTerminateFlag(session);
1932 }
1933
StartDFileThreadsInner(DFileSession * session)1934 int32_t StartDFileThreadsInner(DFileSession *session)
1935 {
1936 if (PthreadCreate(&(session->tid), NULL, DFileMainLoop, session)) {
1937 LOGE(TAG, "Create mainloop thread failed");
1938 goto L_ERR_MAIN_LOOP_THREAD;
1939 }
1940
1941 if (CreateSenderThread(session) != NSTACKX_EOK) {
1942 goto L_ERR_SENDER_THREAD;
1943 }
1944
1945 if (PthreadCreate(&(session->receiverTid), NULL, DFileReceiverHandle, session)) {
1946 LOGE(TAG, "Create receiver thread failed");
1947 goto L_ERR_RECEIVER_THREAD;
1948 }
1949
1950 if (PthreadCreate(&(session->controlTid), NULL, DFileControlHandle, session)) {
1951 LOGE(TAG, "Create control thread failed");
1952 goto L_ERR_CONTROL_THREAD;
1953 }
1954 return NSTACKX_EOK;
1955 L_ERR_CONTROL_THREAD:
1956 DFileSessionSetTerminateFlag(session);
1957 PthreadJoin(session->controlTid, NULL);
1958 session->receiverTid = INVALID_TID;
1959 L_ERR_RECEIVER_THREAD:
1960 DFileSessionSetTerminateFlag(session);
1961 PthreadJoin(session->senderTid[0], NULL);
1962 session->senderTid[0] = INVALID_TID;
1963 PostOutboundQueueWait(session);
1964 L_ERR_SENDER_THREAD:
1965 DFileSessionSetTerminateFlag(session);
1966 if (PostEvent(&session->eventNodeChain, session->epollfd, TerminateMainThreadInner, session) != NSTACKX_EOK) {
1967 LOGE(TAG, "post terminate thread failed");
1968 }
1969 PthreadJoin(session->tid, NULL);
1970 session->tid = INVALID_TID;
1971 L_ERR_MAIN_LOOP_THREAD:
1972 return NSTACKX_EFAILED;
1973 }
1974
FileManagerMsgHandle(FileManagerMsgType msgType,int32_t errCode,void * context)1975 static void FileManagerMsgHandle(FileManagerMsgType msgType, int32_t errCode, void *context)
1976 {
1977 DFileSession *session = context;
1978 if (msgType == FILE_MANAGER_INNER_ERROR) {
1979 LOGE(TAG, "Session (%u) fatal error -- File Manager error: %d", session->sessionId, errCode);
1980 PostFatalEvent(session);
1981 }
1982
1983 if (msgType == FILE_MANAGER_IN_PROGRESS) {
1984 NoticeSessionProgress(session);
1985 }
1986 }
1987
CreateFileManager(DFileSession * session,const uint8_t * key,uint32_t keyLen,uint8_t isSender,uint16_t connType)1988 int32_t CreateFileManager(DFileSession *session, const uint8_t *key, uint32_t keyLen, uint8_t isSender,
1989 uint16_t connType)
1990 {
1991 FileManagerMsgPara msgPara;
1992 if (session == NULL) {
1993 LOGE(TAG, "invalid input");
1994 return NSTACKX_EINVAL;
1995 }
1996 if (isSender && (connType != CONNECT_TYPE_P2P && connType != CONNECT_TYPE_WLAN)) {
1997 LOGE(TAG, "connType for sender is illagal");
1998 return NSTACKX_EINVAL;
1999 }
2000 if (keyLen > 0) {
2001 if (keyLen != AES_128_KEY_LENGTH || key == NULL) {
2002 LOGE(TAG, "error key or key len");
2003 return NSTACKX_EFAILED;
2004 }
2005 if (!IsCryptoIncluded()) {
2006 LOGE(TAG, "crypto is not opened");
2007 return NSTACKX_EFAILED;
2008 }
2009 }
2010 msgPara.epollfd = session->epollfd;
2011 msgPara.eventNodeChain = &session->eventNodeChain;
2012 msgPara.msgReceiver = FileManagerMsgHandle;
2013 msgPara.context = session;
2014 session->fileManager = FileManagerCreate(isSender, &msgPara, key, keyLen, connType);
2015 if (session->fileManager == NULL) {
2016 LOGE(TAG, "create filemanager failed");
2017 return NSTACKX_EFAILED;
2018 }
2019 return NSTACKX_EOK;
2020 }
2021
DestroyReceiverPipe(DFileSession * session)2022 void DestroyReceiverPipe(DFileSession *session)
2023 {
2024 if (session->receiverPipe[PIPE_OUT] != INVALID_PIPE_DESC) {
2025 CloseDesc(session->receiverPipe[PIPE_OUT]);
2026 session->receiverPipe[PIPE_OUT] = INVALID_PIPE_DESC;
2027 }
2028 if (session->receiverPipe[PIPE_IN] != INVALID_PIPE_DESC) {
2029 CloseDesc(session->receiverPipe[PIPE_IN]);
2030 session->receiverPipe[PIPE_IN] = INVALID_PIPE_DESC;
2031 }
2032 }
2033
2034