1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "nstackx_dfile_session.h"
17
18 #include "nstackx_congestion.h"
19 #include "nstackx_dfile.h"
20 #include "nstackx_dfile_config.h"
21 #include "nstackx_dfile_frame.h"
22 #include "nstackx_dfile_send.h"
23 #include "nstackx_epoll.h"
24 #include "nstackx_error.h"
25 #include "nstackx_event.h"
26 #include "nstackx_dfile_log.h"
27 #ifdef MBEDTLS_INCLUDED
28 #include "nstackx_mbedtls.h"
29 #else
30 #include "nstackx_openssl.h"
31 #endif
32 #include "nstackx_timer.h"
33 #include "nstackx_dfile_control.h"
34 #include "nstackx_dfile_mp.h"
35 #include "securec.h"
36 #include "nstackx_util.h"
37 #include "nstackx_dfile_dfx.h"
38
39 #define TAG "nStackXDFile"
40
41 #define DEFAULT_WAIT_TIME_MS 1000
42 #define MULTI_THREADS_SOCKET_WAIT_TIME_MS 1
43 #define DEFAULT_NEGOTIATE_TIMEOUT 1000
44 #define TCP_NEGOTIATE_TIMEOUT 10000
45 #define MAX_SERVER_NEGOTIATE_VALID_TIMEOUT (600 * DEFAULT_NEGOTIATE_TIMEOUT)
46 #define MAX_NEGOTIATE_TIMEOUT_COUNT 3
47 #define MAX_UNPROCESSED_READ_EVENT_COUNT 3
48 #define MAX_RECVBUF_COUNT 130000
49 #define MAX_NOMEM_PRINT 10000
50
51 static void ReadEventHandle(void *arg);
52 static void ProcessSessionTrans(const DFileSession *session, uint16_t exceptTransId);
53 DFileEventFunc g_dfileEventFunc;
54
CreateQueueNode(const uint8_t * frame,size_t length,const struct sockaddr_in * peerAddr,uint8_t socketIndex)55 static QueueNode *CreateQueueNode(const uint8_t *frame, size_t length,
56 const struct sockaddr_in *peerAddr, uint8_t socketIndex)
57 {
58 QueueNode *queueNode = NULL;
59
60 if (frame == NULL || length == 0 || length > NSTACKX_MAX_FRAME_SIZE) {
61 return NULL;
62 }
63
64 queueNode = calloc(1, sizeof(QueueNode));
65 if (queueNode == NULL) {
66 return NULL;
67 }
68
69 queueNode->frame = calloc(1, length);
70 if (queueNode->frame == NULL) {
71 free(queueNode);
72 return NULL;
73 }
74 queueNode->length = length;
75 queueNode->sendLen = 0;
76
77 if (memcpy_s(queueNode->frame, length, frame, length) != EOK) {
78 DestroyQueueNode(queueNode);
79 return NULL;
80 }
81 if (peerAddr != NULL) {
82 if (memcpy_s(&queueNode->peerAddr, sizeof(struct sockaddr_in), peerAddr, sizeof(struct sockaddr_in)) != EOK) {
83 DestroyQueueNode(queueNode);
84 return NULL;
85 }
86 }
87 queueNode->socketIndex = socketIndex;
88
89 return queueNode;
90 }
91
DestroyQueueNode(QueueNode * queueNode)92 void DestroyQueueNode(QueueNode *queueNode)
93 {
94 if (queueNode != NULL) {
95 free(queueNode->frame);
96 free(queueNode);
97 }
98 }
99
NotifyMsgRecver(const DFileSession * session,DFileMsgType msgType,const DFileMsg * msg)100 void NotifyMsgRecver(const DFileSession *session, DFileMsgType msgType, const DFileMsg *msg)
101 {
102 if (session == NULL) {
103 DFILE_LOGI(TAG, "session is NULL");
104 return;
105 }
106
107 if (session->msgReceiver == NULL) {
108 DFILE_LOGI(TAG, "msgReceiver is NULL");
109 return;
110 }
111
112 session->msgReceiver(session->sessionId, msgType, msg);
113 }
114
CalculateSessionTransferRatePrepare(DFileSession * session)115 void CalculateSessionTransferRatePrepare(DFileSession *session)
116 {
117 #ifdef NSTACKX_SMALL_FILE_SUPPORT
118 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && (!ListIsEmpty(&session->pendingFileLists)
119 || !ListIsEmpty(&session->smallFileLists))) {
120 return;
121 }
122 #else
123 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && !ListIsEmpty(&session->pendingFileLists)) {
124 return;
125 }
126 #endif
127 if (!ListIsEmpty(&session->dFileTransChain)) {
128 return;
129 }
130 DFILE_LOGI(TAG, "begin to calculate transfer rate");
131 session->bytesTransferred = 0;
132 session->transCount = 0;
133 ClockGetTime(CLOCK_MONOTONIC, &session->startTs);
134 }
135
136 #ifdef NSTACKX_SMALL_FILE_SUPPORT
SendSmallList(DFileSession * session)137 static int32_t SendSmallList(DFileSession *session)
138 {
139 FileListInfo *fileListInfo = NULL;
140 DFileMsg data;
141 if (session->fileListProcessingCnt > 0 || session->fileListPendingCnt == 0) {
142 while (!ListIsEmpty(&session->smallFileLists)) {
143 fileListInfo = (FileListInfo *)ListPopFront(&session->smallFileLists);
144 session->smallListPendingCnt--;
145 if (fileListInfo == NULL) {
146 continue;
147 }
148 int32_t ret = DFileStartTrans(session, fileListInfo);
149 if (ret == NSTACKX_EOK) {
150 return NSTACKX_TRUE;
151 }
152 DFILE_LOGE(TAG, "DFileStartTrans fail, error: %d", ret);
153 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
154 data.errorCode = ret;
155 data.fileList.files = (const char **)fileListInfo->files;
156 data.fileList.fileNum = fileListInfo->fileNum;
157 data.fileList.userData = fileListInfo->userData;
158 NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, &data);
159 DestroyFileListInfo(fileListInfo);
160 }
161 }
162 return NSTACKX_FALSE;
163 }
164 #endif
165
SendPendingList(DFileSession * session)166 static void SendPendingList(DFileSession *session)
167 {
168 FileListInfo *fileListInfo = NULL;
169 DFileMsg data;
170 while (!ListIsEmpty(&session->pendingFileLists)) {
171 fileListInfo = (FileListInfo *)ListPopFront(&session->pendingFileLists);
172 session->fileListPendingCnt--;
173 if (fileListInfo == NULL) {
174 continue;
175 }
176 int32_t ret = DFileStartTrans(session, fileListInfo);
177 if (ret == NSTACKX_EOK) {
178 break;
179 }
180 DFILE_LOGE(TAG, "DFileStartTrans fail, error: %d", ret);
181 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
182 data.errorCode = ret;
183 data.fileList.files = (const char **)fileListInfo->files;
184 data.fileList.fileNum = fileListInfo->fileNum;
185 data.fileList.userData = fileListInfo->userData;
186 NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, &data);
187 DestroyFileListInfo(fileListInfo);
188 }
189 }
190
SendSmallAndPendingList(DFileSession * session)191 static void SendSmallAndPendingList(DFileSession *session)
192 {
193 #ifdef NSTACKX_SMALL_FILE_SUPPORT
194 DFILE_LOGI(TAG, "fileListPendingCnt %u fileListProcessingCnt %u smallListPendingCnt %u smallListProcessingCnt %u",
195 session->fileListPendingCnt, session->fileListProcessingCnt, session->smallListPendingCnt,
196 session->smallListProcessingCnt);
197 if (SendSmallList(session) != NSTACKX_TRUE) {
198 SendPendingList(session);
199 }
200 #else
201 DFILE_LOGI(TAG, "fileListPendingCnt %u fileListProcessingCnt %u",
202 session->fileListPendingCnt, session->fileListProcessingCnt);
203 SendPendingList(session);
204 #endif
205 }
206
NoticeSessionProgress(DFileSession * session)207 void NoticeSessionProgress(DFileSession *session)
208 {
209 DFileMsg data;
210 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
211 if ((FileManagerGetTotalBytes(session->fileManager, &data.transferUpdate.totalBytes) == NSTACKX_EOK) &&
212 (FileManagerGetBytesTransferred(session->fileManager, &data.transferUpdate.bytesTransferred) == NSTACKX_EOK) &&
213 (data.transferUpdate.bytesTransferred <= data.transferUpdate.totalBytes) &&
214 (data.transferUpdate.bytesTransferred > 0)) {
215 NotifyMsgRecver(session, DFILE_ON_SESSION_IN_PROGRESS, &data);
216 }
217 }
218
UpdateMsgProcessInfo(const DFileSession * session,struct DFileTrans * dFileTrans,DFileTransMsgType msgType,DFileTransMsg * msg)219 static void UpdateMsgProcessInfo(const DFileSession *session, struct DFileTrans *dFileTrans,
220 DFileTransMsgType msgType, DFileTransMsg *msg)
221 {
222 uint64_t totalBytes = 0;
223 uint64_t bytesTrans = 0;
224
225 if (session == NULL || dFileTrans == NULL || msg == NULL ||
226 (msgType != DFILE_TRANS_MSG_FILE_RECEIVED && msgType != DFILE_TRANS_MSG_FILE_RECEIVE_FAIL &&
227 msgType != DFILE_TRANS_MSG_FILE_SEND_FAIL && msgType != DFILE_TRANS_MSG_FILE_SENT)) {
228 return;
229 }
230
231 msg->transferUpdate.transId = dFileTrans->transId;
232
233 if (msgType == DFILE_TRANS_MSG_FILE_RECEIVED || msgType == DFILE_TRANS_MSG_FILE_SENT) {
234 totalBytes = DFileTransGetTotalBytes(dFileTrans);
235 if (totalBytes > 0) {
236 bytesTrans = totalBytes;
237 goto L_END;
238 }
239 }
240
241 if (FileManagerGetTransUpdateInfo(session->fileManager, dFileTrans->transId, &totalBytes, &bytesTrans) !=
242 NSTACKX_EOK) {
243 return;
244 }
245
246 L_END:
247 msg->transferUpdate.totalBytes = totalBytes;
248 msg->transferUpdate.bytesTransferred = bytesTrans;
249 if (msgType == DFILE_TRANS_MSG_FILE_SENT) {
250 if (dFileTrans->fileList->vtransFlag) {
251 msg->fileList.transId = dFileTrans->fileList->vtransRealTransId;
252 msg->transferUpdate.transId = dFileTrans->fileList->vtransRealTransId;
253 }
254 NotifyMsgRecver(session, DFILE_ON_TRANS_IN_PROGRESS, msg);
255 }
256 }
257
WakeSendThread(DFileSession * session,uint8_t isSender,uint8_t socketIndex)258 static void WakeSendThread(DFileSession *session, uint8_t isSender, uint8_t socketIndex)
259 {
260 SemPost(&session->outboundQueueWait[socketIndex]);
261 if (isSender && session->clientSendThreadNum > 1) {
262 for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
263 SemPost(&session->sendThreadPara[i].sendWait);
264 }
265 }
266 }
267
CalculateSessionTransferRate(DFileSession * session,uint64_t totalBytes,DFileTransMsgType msgType)268 static void CalculateSessionTransferRate(DFileSession *session, uint64_t totalBytes, DFileTransMsgType msgType)
269 {
270 if (msgType != DFILE_TRANS_MSG_FILE_SENT && msgType != DFILE_TRANS_MSG_END) {
271 return;
272 }
273 if (totalBytes <= UINT64_MAX - session->bytesTransferred) {
274 session->bytesTransferred += totalBytes;
275 } else {
276 session->bytesTransferred = UINT64_MAX;
277 }
278 session->transCount++;
279 if (!ListIsEmpty(&session->dFileTransChain)) {
280 return;
281 }
282 #ifdef NSTACKX_SMALL_FILE_SUPPORT
283 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && (!ListIsEmpty(&session->pendingFileLists)
284 || !ListIsEmpty(&session->smallFileLists))) {
285 return;
286 }
287 #else
288 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && !ListIsEmpty(&session->pendingFileLists)) {
289 return;
290 }
291 #endif
292 struct timespec endTs;
293 ClockGetTime(CLOCK_MONOTONIC, &endTs);
294
295 uint32_t spendTime = GetTimeDiffMs(&endTs, &session->startTs);
296 if (spendTime == 0) {
297 return;
298 }
299 const double rate = 1.0 * session->bytesTransferred / DFILE_MEGABYTES * MSEC_TICKS_PER_SEC / spendTime;
300 DFILE_LOGI(TAG, "Total %u trans, %llu bytes, used %u ms. rate %.2f MB/s",
301 session->transCount, session->bytesTransferred, spendTime, rate);
302 DFileMsg msgData;
303 (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
304 msgData.rate = (uint32_t)rate;
305 NotifyMsgRecver(session, DFILE_ON_SESSION_TRANSFER_RATE, &msgData);
306
307 TransferCompleteEvent(rate);
308 }
309
CheckTransDone(DFileSession * session,struct DFileTrans * dFileTrans,DFileTransMsgType msgType)310 static void CheckTransDone(DFileSession *session, struct DFileTrans *dFileTrans, DFileTransMsgType msgType)
311 {
312 if (msgType == DFILE_TRANS_MSG_FILE_RECEIVE_FAIL || msgType == DFILE_TRANS_MSG_FILE_SENT ||
313 msgType == DFILE_TRANS_MSG_FILE_SEND_FAIL || msgType == DFILE_TRANS_MSG_END) {
314 uint8_t flag = dFileTrans->fileList->smallFlag;
315 if (SetTransIdState(session, dFileTrans->transId, STATE_TRANS_DONE) != NSTACKX_EOK) {
316 DFILE_LOGE(TAG, "set trans id state fail");
317 }
318 if (((PeerInfo *)dFileTrans->context)->currentTransCount > 0) {
319 ((PeerInfo *)dFileTrans->context)->currentTransCount--;
320 }
321 ListRemoveNode(&dFileTrans->list);
322 uint64_t totalBytes = DFileTransGetTotalBytes(dFileTrans);
323 DFileTransDestroy(dFileTrans);
324 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
325 if (flag == NSTACKX_TRUE && session->smallListProcessingCnt > 0) {
326 session->smallListProcessingCnt--;
327 } else if (session->fileListProcessingCnt > 0) {
328 session->fileListProcessingCnt--;
329 }
330 SendSmallAndPendingList(session);
331 }
332 CalculateSessionTransferRate(session, totalBytes, msgType);
333 }
334 }
335
DTransMsgReceiver(struct DFileTrans * dFileTrans,DFileTransMsgType msgType,DFileTransMsg * msg)336 static void DTransMsgReceiver(struct DFileTrans *dFileTrans, DFileTransMsgType msgType,
337 DFileTransMsg *msg)
338 {
339 PeerInfo *peerInfo = dFileTrans->context;
340 DFileSession *session = peerInfo->session;
341
342 UpdateMsgProcessInfo(session, dFileTrans, msgType, msg);
343
344 switch (msgType) {
345 case DFILE_TRANS_MSG_FILE_SEND_DATA:
346 WakeSendThread(session, dFileTrans->isSender, peerInfo->socketIndex);
347 break;
348 case DFILE_TRANS_MSG_FILE_LIST_RECEIVED:
349 NotifyMsgRecver(session, DFILE_ON_FILE_LIST_RECEIVED, msg);
350 break;
351 case DFILE_TRANS_MSG_FILE_RECEIVED: /* Receiver receive all the file data */
352 NotifyMsgRecver(session, DFILE_ON_FILE_RECEIVE_SUCCESS, msg);
353 break;
354 case DFILE_TRANS_MSG_FILE_RECEIVED_TO_FAIL:
355 case DFILE_TRANS_MSG_FILE_RECEIVE_FAIL:
356 NotifyMsgRecver(session, DFILE_ON_FILE_RECEIVE_FAIL, msg);
357 break;
358 case DFILE_TRANS_MSG_FILE_SENT: /* Sender send TRANSFER DONE ACK frame and come to end */
359 NoticeSessionProgress(session);
360 NotifyMsgRecver(session, DFILE_ON_FILE_SEND_SUCCESS, msg);
361 break;
362 case DFILE_TRANS_MSG_FILE_SEND_FAIL:
363 NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, msg);
364 break;
365 case DFILE_TRANS_MSG_IN_PROGRESS:
366 NotifyMsgRecver(session, DFILE_ON_TRANS_IN_PROGRESS, msg);
367 break;
368 case DFILE_TRANS_MSG_FILE_SEND_ACK:
369 ProcessSessionTrans(session, dFileTrans->transId);
370 break;
371 default:
372 DFILE_LOGI(TAG, "transId %u, recv DFileTrans event %d", dFileTrans->transId, msgType);
373 }
374
375 CheckTransDone(session, dFileTrans, msgType);
376 }
377
ServerSettingTimeoutHandle(void * data)378 static void ServerSettingTimeoutHandle(void *data)
379 {
380 PeerInfo *peerInfo = data;
381 ListRemoveNode(&peerInfo->list);
382 TimerDelete(peerInfo->settingTimer);
383 peerInfo->session->peerInfoCnt--;
384 free(peerInfo);
385 DFILE_LOGI(TAG, "DFileServer Setting Negotationion timeout");
386 }
387
ClientSettingTimeoutHandle(void * data)388 static void ClientSettingTimeoutHandle(void *data)
389 {
390 PeerInfo *peerInfo = data;
391 uint8_t cnt = peerInfo->settingTimeoutCnt++;
392 DFileMsg msgData;
393 uint32_t timeout = CapsTcp(peerInfo->session) ? TCP_NEGOTIATE_TIMEOUT : DEFAULT_NEGOTIATE_TIMEOUT;
394 (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
395 if (cnt >= MAX_NEGOTIATE_TIMEOUT_COUNT) {
396 TimerDelete(peerInfo->settingTimer);
397 peerInfo->settingTimer = NULL;
398 peerInfo->settingTimeoutCnt = 0;
399 msgData.errorCode = NSTACKX_EFAILED;
400 DFILE_LOGI(TAG, "DFileClient Setting Negotationion timeout");
401 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &msgData);
402 } else {
403 DFileSessionSendSetting(peerInfo);
404 DFILE_LOGI(TAG, "Client Setting Negotationion timeout %u times", peerInfo->settingTimeoutCnt);
405 if (TimerSetTimeout(peerInfo->settingTimer, timeout, NSTACKX_FALSE) != NSTACKX_EOK) {
406 msgData.errorCode = NSTACKX_EFAILED;
407 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &msgData);
408 DFILE_LOGE(TAG, "Timer setting timer fail");
409 }
410 }
411 }
412
SearchDFileTransNode(List * dFileTransChain,uint16_t transId)413 static DFileTrans *SearchDFileTransNode(List *dFileTransChain, uint16_t transId)
414 {
415 List *pos = NULL;
416 DFileTrans *trans = NULL;
417
418 if (dFileTransChain == NULL || transId == 0) {
419 return NULL;
420 }
421
422 LIST_FOR_EACH(pos, dFileTransChain) {
423 trans = (DFileTrans *)pos;
424 if (trans->transId == transId) {
425 return trans;
426 }
427 }
428 return NULL;
429 }
430
SearchPeerInfoNode(const DFileSession * session,const struct sockaddr_in * peerAddr)431 static PeerInfo *SearchPeerInfoNode(const DFileSession *session, const struct sockaddr_in *peerAddr)
432 {
433 List *pos = NULL;
434 PeerInfo *peerInfo = NULL;
435 LIST_FOR_EACH(pos, &session->peerInfoChain) {
436 peerInfo = (PeerInfo *)pos;
437 if (memcmp(&peerInfo->dstAddr, peerAddr, sizeof(struct sockaddr_in)) == 0 &&
438 peerInfo->session->sessionId == session->sessionId) {
439 return peerInfo;
440 }
441 }
442 return NULL;
443 }
444
DFileSessionSendSetting(PeerInfo * peerInfo)445 void DFileSessionSendSetting(PeerInfo *peerInfo)
446 {
447 uint32_t timeout = CapsTcp(peerInfo->session) ? TCP_NEGOTIATE_TIMEOUT : DEFAULT_NEGOTIATE_TIMEOUT;
448 uint8_t buf[NSTACKX_DEFAULT_FRAME_SIZE];
449 size_t frameLen = 0;
450 DFileMsg data;
451 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
452 SettingFrame settingFramePara;
453 settingFramePara.connType = peerInfo->connType;
454 settingFramePara.mtu = peerInfo->localMtu;
455 settingFramePara.capability = peerInfo->session->capability & NSTACKX_CAPS_LINK_SEQUENCE;
456 settingFramePara.dataFrameSize = 0;
457 settingFramePara.capsCheck = NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
458 if (peerInfo->session->fileManager->keyLen) {
459 DFileGetCipherCaps(peerInfo->session, &settingFramePara);
460 }
461 EncodeSettingFrame(buf, NSTACKX_DEFAULT_FRAME_SIZE, &frameLen, &settingFramePara);
462
463 if (peerInfo->settingTimer != NULL) {
464 int32_t ret = DFileWriteHandle(buf, frameLen, peerInfo);
465 if (ret != (int32_t)frameLen && ret != NSTACKX_EAGAIN) {
466 data.errorCode = NSTACKX_EFAILED;
467 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
468 }
469 return;
470 }
471
472 if (peerInfo->session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
473 peerInfo->settingTimer = TimerStart(peerInfo->session->epollfd, timeout,
474 NSTACKX_FALSE, ClientSettingTimeoutHandle, peerInfo);
475 if (peerInfo->settingTimer == NULL) {
476 DFILE_LOGE(TAG, "setting timmer creat fail");
477 data.errorCode = NSTACKX_EFAILED;
478 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
479 return;
480 }
481 } else {
482 peerInfo->settingTimer = TimerStart(peerInfo->session->epollfd, MAX_SERVER_NEGOTIATE_VALID_TIMEOUT,
483 NSTACKX_FALSE, ServerSettingTimeoutHandle, peerInfo);
484 if (peerInfo->settingTimer == NULL) {
485 return;
486 }
487 }
488
489 int32_t ret = DFileWriteHandle(buf, frameLen, peerInfo);
490 if (ret != (int32_t)frameLen && ret != NSTACKX_EAGAIN) {
491 data.errorCode = NSTACKX_EFAILED;
492 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
493 }
494 }
495
SetDFileSessionConfig(DFileSession * session,DFileConfig * dFileConfig,uint16_t connType,PeerInfo * peerInfo)496 static void SetDFileSessionConfig(DFileSession *session, DFileConfig *dFileConfig, uint16_t connType,
497 PeerInfo *peerInfo)
498 {
499 peerInfo->maxSendRate = dFileConfig->sendRate;
500 (void)memset_s(peerInfo->integralLossRate, INTEGRAL_TIME * sizeof(double), 0, INTEGRAL_TIME * sizeof(double));
501 peerInfo->fastStartCounter = 0;
502 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->measureBefore);
503 peerInfo->dataFrameSize = dFileConfig->dataFrameSize;
504 if (connType == CONNECT_TYPE_WLAN) {
505 peerInfo->sendRate = dFileConfig->sendRate / NSTACKX_WLAN_INIT_SPEED_DIVISOR;
506 } else {
507 peerInfo->sendRate = dFileConfig->sendRate / NSTACKX_P2P_INIT_SPEED_DIVISOR;
508 }
509
510 #ifndef NSTACKX_WITH_LITEOS
511 if ((!peerInfo->gotWifiRate) && (connType == CONNECT_TYPE_WLAN)) {
512 peerInfo->sendRate = (uint16_t)(NSTACKX_WLAN_INIT_RATE / MSEC_TICKS_PER_SEC * DATA_FRAME_SEND_INTERVAL_MS
513 / dFileConfig->dataFrameSize + NSTACKX_WLAN_COMPENSATION_RATE);
514 }
515 #endif
516 if (peerInfo->sendRate < NSTACKX_MIN_SENDRATE) {
517 peerInfo->sendRate = NSTACKX_MIN_SENDRATE;
518 }
519
520 if (FileManagerSetMaxFrameLength(session->fileManager, peerInfo->dataFrameSize) != NSTACKX_EOK) {
521 DFILE_LOGE(TAG, "failed to set max frame length");
522 }
523
524 DFILE_LOGI(TAG, "connType is %u set sendrate is %u maxSendRate is %u peerInfo->dataFrameSize is %u",
525 connType, peerInfo->sendRate, peerInfo->maxSendRate, peerInfo->dataFrameSize);
526 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
527 if (FileManagerSetRecvParaWithConnType(session->fileManager, connType) != NSTACKX_EOK) {
528 DFILE_LOGE(TAG, "failed to set recv para");
529 }
530 }
531 }
532
DFileSessionSetPeerInfo(PeerInfo * peerInfo,SettingState state,SettingFrame * settingFrame)533 static void DFileSessionSetPeerInfo(PeerInfo *peerInfo, SettingState state, SettingFrame *settingFrame)
534 {
535 peerInfo->state = state;
536 peerInfo->mtu = settingFrame->mtu;
537 peerInfo->connType = settingFrame->connType;
538 uint16_t localMtu = peerInfo->localMtu;
539 peerInfo->mtuInuse = (localMtu < peerInfo->mtu) ? localMtu : peerInfo->mtu;
540 peerInfo->remoteSessionId = settingFrame->header.sessionId;
541 }
542
DFileSessionHandleClientSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)543 static void DFileSessionHandleClientSetting(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
544 {
545 List *pos = NULL;
546 SettingFrame hostSettingFrame;
547 (void)memset_s(&hostSettingFrame, sizeof(hostSettingFrame), 0, sizeof(hostSettingFrame));
548 DFILE_LOGI(TAG, "handle Setting Frame, DFileSessionType %u", session->sessionType);
549 /* unsupport version */
550 if (DecodeSettingFrame((SettingFrame *)dFileFrame, &hostSettingFrame) != NSTACKX_EOK || hostSettingFrame.mtu == 0) {
551 return;
552 }
553 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
554 if (peerInfo == NULL) {
555 DFILE_LOGE(TAG, "recv unknown peer setting, maybe be attacked");
556 return;
557 }
558 peerInfo->remoteDFileVersion = hostSettingFrame.dFileVersion;
559 TimerDelete(peerInfo->settingTimer);
560 peerInfo->settingTimer = NULL;
561 DFileSessionSetPeerInfo(peerInfo, SETTING_NEGOTIATED, &hostSettingFrame);
562 LIST_FOR_EACH(pos, &session->dFileTransChain) {
563 DFileTrans *trans = (DFileTrans *)pos;
564 if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
565 DFILE_LOGE(TAG, "set trans mtu failed");
566 }
567 }
568 DFileMsg data;
569 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
570 data.errorCode = NSTACKX_EOK;
571 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_SUCCESS, &data);
572
573 DFileConfig dFileConfig;
574 (void)memset_s(&dFileConfig, sizeof(dFileConfig), 0, sizeof(dFileConfig));
575 if (GetDFileConfig(&dFileConfig, peerInfo->mtuInuse, hostSettingFrame.connType) == NSTACKX_EOK) {
576 SetDFileSessionConfig(session, &dFileConfig, hostSettingFrame.connType, peerInfo);
577 }
578 if (hostSettingFrame.capability & NSTACKX_CAPS_LINK_SEQUENCE) {
579 DFILE_LOGI(TAG, "server replies not support Link Sequence");
580 } else {
581 DFILE_LOGI(TAG, "server replies using normal ACK");
582 session->capability &= ~NSTACKX_CAPS_LINK_SEQUENCE;
583 }
584 DFileChooseCipherType(&hostSettingFrame, session);
585 }
586
DFileGetMTU(SocketProtocol protocol)587 static uint16_t DFileGetMTU(SocketProtocol protocol)
588 {
589 /* for udp, return NSTACKX_DEFAULT_FRAME_SIZE, for D2D, need call D2D MTU interface */
590 uint16_t mtu = 0;
591 if (protocol == NSTACKX_PROTOCOL_UDP) {
592 mtu = NSTACKX_DEFAULT_FRAME_SIZE;
593 } else if (protocol == NSTACKX_PROTOCOL_D2D) {
594 DFILE_LOGE(TAG, "d2d not support");
595 } else if (protocol == NSTACKX_PROTOCOL_TCP) {
596 mtu = NSTACKX_DEFAULT_FRAME_SIZE;
597 }
598
599 return mtu;
600 }
601
CreatePeerInfo(DFileSession * session,const struct sockaddr_in * dstAddr,uint16_t peerMtu,uint16_t connType,uint8_t socketIndex)602 PeerInfo *CreatePeerInfo(DFileSession *session, const struct sockaddr_in *dstAddr, uint16_t peerMtu,
603 uint16_t connType, uint8_t socketIndex)
604 {
605 if (session->peerInfoCnt >= MAX_PEERINFO_SIZE) {
606 return NULL;
607 }
608 PeerInfo *peerInfo = calloc(1, sizeof(PeerInfo));
609 if (peerInfo == NULL) {
610 return NULL;
611 }
612 peerInfo->session = session;
613 peerInfo->dstAddr = *dstAddr;
614 peerInfo->connType = connType;
615 peerInfo->socketIndex = socketIndex;
616 peerInfo->localMtu = DFileGetMTU(session->protocol);
617 session->peerInfoCnt++;
618 peerInfo->gotWifiRate = 0;
619 peerInfo->ackInterval = NSTACKX_INIT_ACK_INTERVAL;
620 peerInfo->rateStateInterval = NSTACKX_INIT_RATE_STAT_INTERVAL;
621
622 if (GetInterfaceNameByIP(session->socket[socketIndex]->srcAddr.sin_addr.s_addr,
623 peerInfo->localInterfaceName, sizeof(peerInfo->localInterfaceName)) != NSTACKX_EOK) {
624 DFILE_LOGE(TAG, "GetInterfaceNameByIP failed %d", errno);
625 }
626
627 if (peerMtu == 0) {
628 return peerInfo;
629 }
630 peerInfo->mtu = peerMtu;
631 peerInfo->mtuInuse = (peerInfo->localMtu < peerInfo->mtu) ? peerInfo->localMtu : peerInfo->mtu;
632 return peerInfo;
633 }
634
HandleLinkSeqCap(DFileSession * session,SettingFrame * hostSettingFrame)635 static void HandleLinkSeqCap(DFileSession *session, SettingFrame *hostSettingFrame)
636 {
637 if (hostSettingFrame->capability & NSTACKX_CAPS_LINK_SEQUENCE) {
638 DFILE_LOGI(TAG, "client wants to enable Link Sequence");
639 } else {
640 DFILE_LOGI(TAG, "client wants to use normal ACK");
641 session->capability &= ~NSTACKX_CAPS_LINK_SEQUENCE;
642 }
643
644 if (hostSettingFrame->capability & NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK) {
645 DFILE_LOGI(TAG, "client support recv feedback");
646 session->capsCheck |= NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
647 } else {
648 DFILE_LOGI(TAG, "client do not support recv feedback");
649 session->capsCheck &= ~NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
650 }
651 }
652
AllocPeerInfo(DFileSession * session,const struct sockaddr_in * peerAddr,const SettingFrame * frame,uint8_t socketIndex)653 static PeerInfo *AllocPeerInfo(DFileSession *session, const struct sockaddr_in *peerAddr, const SettingFrame *frame,
654 uint8_t socketIndex)
655 {
656 PeerInfo *peerInfo = NULL;
657
658 peerInfo = CreatePeerInfo(session, peerAddr, frame->mtu, frame->connType, socketIndex);
659 if (peerInfo == NULL) {
660 return NULL;
661 }
662
663 peerInfo->remoteSessionId = frame->header.sessionId;
664 peerInfo->state = SETTING_NEGOTIATING;
665 DFileSessionSendSetting(peerInfo);
666 if (peerInfo->settingTimer == NULL) {
667 free(peerInfo);
668 session->peerInfoCnt--;
669 return NULL;
670 }
671 ListInsertTail(&peerInfo->session->peerInfoChain, &peerInfo->list);
672 return peerInfo;
673 }
674
DFileSessionHandleServerSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr,uint8_t socketIndex)675 static void DFileSessionHandleServerSetting(DFileSession *session, DFileFrame *dFileFrame,
676 struct sockaddr_in *peerAddr, uint8_t socketIndex)
677 {
678 SettingFrame hostSettingFrame;
679 DFileConfig dFileConfig;
680 (void)memset_s(&hostSettingFrame, sizeof(hostSettingFrame), 0, sizeof(hostSettingFrame));
681 (void)memset_s(&dFileConfig, sizeof(dFileConfig), 0, sizeof(dFileConfig));
682 DFILE_LOGI(TAG, "handle Setting Frame, DFileSessionType %u", session->sessionType);
683 if (DecodeSettingFrame((SettingFrame *)dFileFrame, &hostSettingFrame) != NSTACKX_EOK || hostSettingFrame.mtu == 0) {
684 return;
685 }
686
687 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
688 if (peerInfo != NULL) {
689 if (peerInfo->settingTimeoutCnt >= MAX_NEGOTIATE_TIMEOUT_COUNT) {
690 DFILE_LOGE(TAG, "receive more than %d Setting for one peer, drop", MAX_NEGOTIATE_TIMEOUT_COUNT);
691 return;
692 } else {
693 DFileSessionSetPeerInfo(peerInfo, SETTING_NEGOTIATING, &hostSettingFrame);
694 DFileSessionSendSetting(peerInfo);
695 goto L_END;
696 }
697 }
698
699 peerInfo = AllocPeerInfo(session, peerAddr, &hostSettingFrame, socketIndex);
700 if (peerInfo == NULL) {
701 return;
702 }
703
704 L_END:
705 peerInfo->settingTimeoutCnt++;
706 DFILE_LOGI(TAG, "DFileServer response Setting Frame. count %u", peerInfo->settingTimeoutCnt);
707 peerInfo->remoteDFileVersion = hostSettingFrame.dFileVersion;
708 if (GetDFileConfig(&dFileConfig, peerInfo->mtuInuse, hostSettingFrame.connType) == NSTACKX_EOK) {
709 SetDFileSessionConfig(session, &dFileConfig, hostSettingFrame.connType, peerInfo);
710 }
711 HandleLinkSeqCap(session, &hostSettingFrame);
712 DFileChooseCipherType(&hostSettingFrame, session);
713 }
714
DFileSessionHandleSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr,uint8_t socketIndex)715 static void DFileSessionHandleSetting(DFileSession *session, DFileFrame *dFileFrame,
716 struct sockaddr_in *peerAddr, uint8_t socketIndex)
717 {
718 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
719 DFileSessionHandleServerSetting(session, dFileFrame, peerAddr, socketIndex);
720 } else if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
721 DFileSessionHandleClientSetting(session, dFileFrame, peerAddr);
722 } else {
723 return;
724 }
725 }
726
HandleWithoutSettingError(DFileSession * session,const struct sockaddr_in * peerAddr)727 static void HandleWithoutSettingError(DFileSession *session, const struct sockaddr_in *peerAddr)
728 {
729 List *pos = NULL;
730 DFileTrans *trans = NULL;
731
732 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
733 if (peerInfo == NULL) {
734 DFILE_LOGE(TAG, "recv unknown peer rst, maybe be attacked");
735 return;
736 }
737
738 LIST_FOR_EACH(pos, &session->dFileTransChain) {
739 trans = (DFileTrans *)pos;
740 /*
741 * when client recv peer RST NSTACKX_DFILE_WITHOUT_SETTING_ERROR, Set Trans MTU 0,
742 * and will reset after new setting negotion.
743 */
744 if (DFileTransSetMtu(trans, 0) != NSTACKX_EOK) {
745 DFILE_LOGE(TAG, "DFileTransSetMtu(trans, 0) failed %d", errno);
746 }
747 }
748
749 LIST_FOR_EACH(pos, &session->peerInfoChain) {
750 peerInfo = (PeerInfo *)pos;
751 if (memcmp(&peerInfo->dstAddr, peerAddr, sizeof(struct sockaddr_in)) == 0 &&
752 peerInfo->session->sessionId == session->sessionId) {
753 TimerDelete(peerInfo->settingTimer);
754 peerInfo->settingTimer = NULL;
755 peerInfo->state = SETTING_NEGOTIATING;
756 DFILE_LOGD(TAG, "Send Setting Frame");
757 DFileSessionSendSetting(peerInfo);
758 break;
759 }
760 }
761 }
762
DFileSessionHandleRst(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)763 static void DFileSessionHandleRst(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
764 {
765 uint16_t errCode = 0;
766 if (DecodeRstFrame((RstFrame *)dFileFrame, &errCode, NULL, NULL) != NSTACKX_EOK) {
767 return;
768 }
769
770 uint16_t transId = ntohs(dFileFrame->header.transId);
771 DFILE_LOGD(TAG, "handle RST (%hu) frame, transId %hu", errCode, transId);
772
773 switch (errCode) {
774 case NSTACKX_DFILE_WITHOUT_SETTING_ERROR:
775 HandleWithoutSettingError(session, peerAddr);
776 break;
777 default:
778 DFILE_LOGE(TAG, "Unspported error code %hu", errCode);
779 break;
780 }
781 }
782
DFileSessionResolveBackPress(DFileSession * session,DataBackPressure backPress,uint32_t count)783 static void DFileSessionResolveBackPress(DFileSession *session, DataBackPressure backPress, uint32_t count)
784 {
785 uint32_t index;
786
787 if (PthreadMutexLock(&session->backPressLock) != 0) {
788 DFILE_LOGE(TAG, "pthread backPressLock mutex lock failed");
789 return;
790 }
791
792 if (backPress.recvListOverIo == 1) {
793 for (index = 0; index < count; index++) {
794 session->stopSendCnt[index]++;
795 }
796 } else {
797 for (index = 0; index < count; index++) {
798 session->stopSendCnt[index] = 0;
799 }
800 }
801
802 if (PthreadMutexUnlock(&session->backPressLock) != 0) {
803 DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
804 return;
805 }
806
807 return;
808 }
809
DFileSessionHandleBackPressure(DFileSession * session,const DFileFrame * dFileFrame,const struct sockaddr_in * peerAddr)810 static void DFileSessionHandleBackPressure(DFileSession *session, const DFileFrame *dFileFrame,
811 const struct sockaddr_in *peerAddr)
812 {
813 DataBackPressure backPress;
814 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
815 if (peerInfo == NULL) {
816 DFILE_LOGE(TAG, "can't get valid peerinfo");
817 return;
818 }
819
820 if (DecodeBackPressFrame((BackPressureFrame *)dFileFrame, &backPress) != NSTACKX_EOK) {
821 return;
822 }
823
824 DFileSessionResolveBackPress(session, backPress, session->clientSendThreadNum);
825
826 DFILE_LOGI(TAG, "handle back pressure recvListOverIo %u recvBufThreshold %u stopSendPeriod %u",
827 backPress.recvListOverIo, backPress.recvBufThreshold,
828 backPress.stopSendPeriod);
829
830 return;
831 }
832
CreateTrans(uint16_t transId,DFileSession * session,PeerInfo * peerInfo,uint8_t isSender)833 static DFileTrans *CreateTrans(uint16_t transId, DFileSession *session, PeerInfo *peerInfo, uint8_t isSender)
834 {
835 DFileTransPara transPara;
836
837 if (peerInfo == NULL) {
838 return NULL;
839 }
840 (void)memset_s(&transPara, sizeof(transPara), 0, sizeof(transPara));
841 transPara.isSender = isSender;
842 transPara.transId = transId; /* for receiver, use transId of sender */
843 transPara.fileManager = session->fileManager;
844 transPara.writeHandle = DFileWriteHandle;
845 transPara.msgReceiver = DTransMsgReceiver;
846 transPara.connType = peerInfo->connType;
847 transPara.context = peerInfo;
848 transPara.session = session;
849 transPara.onRenameFile = session->onRenameFile;
850
851 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
852 return DFileTransCreate(&transPara);
853 }
854
DFileSessionHandleFrame(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)855 static int32_t DFileSessionHandleFrame(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
856 {
857 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
858 if (peerInfo == NULL) {
859 DFILE_LOGI(TAG, "can't find peerInfo");
860 return NSTACKX_EFAILED;
861 }
862
863 if (peerInfo->session->sessionType == DFILE_SESSION_TYPE_SERVER && peerInfo->state == SETTING_NEGOTIATING) {
864 peerInfo->state = SETTING_NEGOTIATED;
865 TimerDelete(peerInfo->settingTimer);
866 peerInfo->settingTimer = NULL;
867 }
868
869 uint16_t transId = ntohs(dFileFrame->header.transId);
870 if (transId == 0) {
871 DFILE_LOGE(TAG, "transId is 0");
872 return NSTACKX_EFAILED;
873 }
874
875 DFileTrans *trans = SearchDFileTransNode(&(session->dFileTransChain), transId);
876 if (trans == NULL) {
877 if (dFileFrame->header.type != NSTACKX_DFILE_FILE_HEADER_FRAME) {
878 /* Only HEADER frame can start dfile transfer (receiver) */
879 DFILE_LOGE(TAG, "trans %u is NULL && type is %u", transId, dFileFrame->header.type);
880 return NSTACKX_EFAILED;
881 }
882 if (IsTransIdDone(session, transId) == NSTACKX_EOK) {
883 return NSTACKX_EFAILED;
884 }
885 trans = CreateTrans(transId, session, peerInfo, NSTACKX_FALSE);
886 if (trans == NULL) {
887 DFileMsg data;
888 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
889 data.errorCode = NSTACKX_ENOMEM;
890 NotifyMsgRecver(session, DFILE_ON_FATAL_ERROR, &data);
891 DFILE_LOGE(TAG, "trans is NULL");
892 return NSTACKX_EFAILED;
893 }
894 if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
895 DFILE_LOGE(TAG, "set trans mtu failed");
896 }
897 CalculateSessionTransferRatePrepare(session);
898 ListInsertTail(&(session->dFileTransChain), &(trans->list));
899 peerInfo->currentTransCount++;
900 }
901
902 return HandleDFileFrame(trans, dFileFrame);
903 }
904
DFileWriteHandle(const uint8_t * frame,size_t len,void * context)905 int32_t DFileWriteHandle(const uint8_t *frame, size_t len, void *context)
906 {
907 PeerInfo *peerInfo = context;
908 DFileSession *session = peerInfo->session;
909 QueueNode *queueNode = NULL;
910 struct sockaddr_in peerAddr;
911
912 peerAddr = peerInfo->dstAddr;
913 queueNode = CreateQueueNode(frame, len, &peerAddr, peerInfo->socketIndex);
914 if (queueNode == NULL) {
915 return NSTACKX_ENOMEM;
916 }
917
918 if (PthreadMutexLock(&session->outboundQueueLock) != 0) {
919 DFILE_LOGE(TAG, "pthread mutex lock failed");
920 DestroyQueueNode(queueNode);
921 return NSTACKX_EFAILED;
922 }
923 ListInsertTail(&session->outboundQueue, &queueNode->list);
924 session->outboundQueueSize++;
925 if (PthreadMutexUnlock(&session->outboundQueueLock) != 0) {
926 /* Don't need to free node, as it's mount to outboundQueue */
927 DFILE_LOGE(TAG, "pthread mutex unlock failed");
928 return NSTACKX_EFAILED;
929 }
930 SemPost(&session->outboundQueueWait[peerInfo->socketIndex]);
931 return (int32_t)len;
932 }
933
BindMainLoopToTargetCpu(void)934 static void BindMainLoopToTargetCpu(void)
935 {
936 int32_t cpu;
937 int32_t cpus = GetCpuNum();
938 if (cpus >= FIRST_CPU_NUM_LEVEL) {
939 return;
940 } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
941 cpu = CPU_IDX_0;
942 } else {
943 return;
944 }
945 StartThreadBindCore(cpu);
946 }
947
GetEpollWaitTimeOut(DFileSession * session)948 static int64_t GetEpollWaitTimeOut(DFileSession *session)
949 {
950 int64_t minTimeout = DEFAULT_WAIT_TIME_MS;
951 if (session->mainLoopActiveReadFlag && session->inboundQueueSize) {
952 minTimeout = 0;
953 return minTimeout;
954 }
955 List *pos = NULL;
956 DFileTrans *trans = NULL;
957 int64_t timeout;
958 LIST_FOR_EACH(pos, &session->dFileTransChain) {
959 trans = (DFileTrans *)pos;
960 timeout = DFileTransGetTimeout(trans);
961 if (timeout >= 0 && timeout < minTimeout) {
962 minTimeout = timeout;
963 }
964 }
965 if (minTimeout > DEFAULT_WAIT_TIME_MS) {
966 minTimeout = DEFAULT_WAIT_TIME_MS;
967 }
968 return minTimeout;
969 }
970
ProcessSessionTrans(const DFileSession * session,uint16_t exceptTransId)971 static void ProcessSessionTrans(const DFileSession *session, uint16_t exceptTransId)
972 {
973 List *tmp = NULL;
974 List *pos = NULL;
975 LIST_FOR_EACH_SAFE(pos, tmp, &session->dFileTransChain) {
976 DFileTrans *trans = (DFileTrans *)pos;
977 if (trans->transId != exceptTransId) {
978 DFileTransProcess(trans);
979 }
980 }
981 }
982
NotifyBindPort(const DFileSession * session)983 static void NotifyBindPort(const DFileSession *session)
984 {
985 DFileMsg data;
986 int32_t socketNum;
987 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
988 socketNum = 1;
989
990 for (int i = 0; i < socketNum; i++) {
991 data.sockAddr[i].sin_port = ntohs(session->socket[i]->srcAddr.sin_port);
992 data.sockAddr[i].sin_addr.s_addr = ntohl(session->socket[i]->srcAddr.sin_addr.s_addr);
993 }
994 NotifyMsgRecver(session, DFILE_ON_BIND, &data);
995 }
996
DFileMainLoop(void * arg)997 void *DFileMainLoop(void *arg)
998 {
999 DFileSession *session = arg;
1000 int32_t ret = NSTACKX_EOK;
1001 DFileMsg msgData;
1002 (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
1003 uint8_t isBind = NSTACKX_FALSE;
1004 DFILE_LOGI(TAG, "main thread start");
1005 SetThreadName(DFFILE_MAIN_THREAD_NAME);
1006 SetMaximumPriorityForThread();
1007 SetTidToBindInfo(session, POS_MAIN_THERD_START);
1008 NotifyBindPort(session);
1009 while (!session->closeFlag) {
1010 int64_t minTimeout = GetEpollWaitTimeOut(session);
1011 ret = EpollLoop(session->epollfd, (int32_t)minTimeout);
1012 if (ret == NSTACKX_EFAILED) {
1013 DFILE_LOGE(TAG, "epoll wait failed");
1014 break;
1015 }
1016 if (isBind == NSTACKX_FALSE && session->transFlag == NSTACKX_TRUE) {
1017 BindMainLoopToTargetCpu();
1018 isBind = NSTACKX_TRUE;
1019 }
1020 ProcessSessionTrans(session, 0);
1021 if (session->mainLoopActiveReadFlag && session->inboundQueueSize) {
1022 session->partReadFlag = NSTACKX_TRUE;
1023 ReadEventHandle(session);
1024 session->partReadFlag = NSTACKX_FALSE;
1025 }
1026 }
1027
1028 if (ret == NSTACKX_EFAILED || DFileSessionCheckFatalFlag(session)) {
1029 msgData.errorCode = NSTACKX_EFAILED;
1030 NotifyMsgRecver(session, DFILE_ON_FATAL_ERROR, &msgData);
1031 }
1032
1033 /* Notify sender thread to terminate */
1034 PostOutboundQueueWait(session);
1035
1036 /* Unblock "select" and notify receiver thread to terminate */
1037 NotifyPipeEvent(session);
1038 return NULL;
1039 }
1040
AmendPeerInfoSendRate(PeerInfo * peerInfo)1041 static void AmendPeerInfoSendRate(PeerInfo *peerInfo)
1042 {
1043 peerInfo->amendSendRate = peerInfo->sendRate;
1044 DFILE_LOGI(TAG, "current: sendrate %u, realsendframerate %u---new amendSendRate %d",
1045 peerInfo->sendRate, peerInfo->sendFrameRate, peerInfo->amendSendRate);
1046 }
1047
DFileSendCalculateRate(DFileSession * session,PeerInfo * peerInfo)1048 static void DFileSendCalculateRate(DFileSession *session, PeerInfo *peerInfo)
1049 {
1050 uint64_t sendCount;
1051
1052 if (session->sessionType != DFILE_SESSION_TYPE_CLIENT) {
1053 return;
1054 }
1055
1056 struct timespec nowTime;
1057 ClockGetTime(CLOCK_MONOTONIC, &nowTime);
1058 uint64_t measureElapse = GetTimeDiffUs(&nowTime, &peerInfo->startTime);
1059 /* just calculate io rate */
1060 if (measureElapse > peerInfo->rateStateInterval) {
1061 /* ONLY PEER 0 calculate io rate */
1062 sendCount = (uint64_t)peerInfo->sendCount;
1063 if (peerInfo->socketIndex == 0) {
1064 session->fileManager->iorRate = (uint32_t)(session->fileManager->iorBytes *
1065 NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1066 DFILE_LOGI(TAG, "IO read rate: %u MB/s send list full times %u", session->fileManager->iorRate,
1067 session->fileManager->sendListFullTimes);
1068 session->fileManager->sendListFullTimes = 0;
1069 session->fileManager->iorBytes = 0;
1070 }
1071
1072 peerInfo->sendFrameRate = (uint32_t)(sendCount * DATA_FRAME_SEND_INTERVAL_US / measureElapse);
1073 peerInfo->sendCountRateMB = (uint32_t)(sendCount * peerInfo->dataFrameSize *
1074 NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1075 if (peerInfo->qdiscSearchNum != 0) {
1076 peerInfo->qdiscAveLeft = peerInfo->qdiscAveLeft / peerInfo->qdiscSearchNum;
1077 }
1078
1079 DFILE_LOGI(TAG, "framesize %u maxsendrate %u sendRate %u, amendSendRate %d sendCount %llu,"
1080 "measureElapse %llu sendFrameRate %u %uMB/s,"
1081 "total send block num %llu eAgainCount %u send list empty times %u sleep times %u, noPendingData %u,"
1082 "min qdisc %u max qdisc %u search num %u ave qdisc %u"
1083 "totalRecvBlocks %llu socket:%u "
1084 "overRun %llu maxRetryCountPerSec %u maxRetryCountLastSec %u wlanCatagory %u",
1085 peerInfo->dataFrameSize, peerInfo->maxSendRate, peerInfo->sendRate, peerInfo->amendSendRate,
1086 peerInfo->sendCount, measureElapse, peerInfo->sendFrameRate, peerInfo->sendCountRateMB,
1087 NSTACKX_ATOM_FETCH(&(session->totalSendBlocks)), peerInfo->eAgainCount,
1088 NSTACKX_ATOM_FETCH(&(session->sendBlockListEmptyTimes)), session->sleepTimes,
1089 NSTACKX_ATOM_FETCH(&(session->noPendingDataTimes)), peerInfo->qdiscMinLeft,
1090 peerInfo->qdiscMaxLeft, peerInfo->qdiscSearchNum, peerInfo->qdiscAveLeft,
1091 NSTACKX_ATOM_FETCH(&(session->totalRecvBlocks)), peerInfo->socketIndex,
1092 peerInfo->overRun, peerInfo->maxRetryCountPerSec, peerInfo->maxRetryCountLastSec, session->wlanCatagory);
1093 AmendPeerInfoSendRate(peerInfo);
1094 ClearSessionStats(session);
1095 ClearPeerinfoStats(peerInfo);
1096 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1097 }
1098 }
1099
UpdateAllTransRetryCount(DFileSession * session,PeerInfo * peerInfo)1100 void UpdateAllTransRetryCount(DFileSession *session, PeerInfo *peerInfo)
1101 {
1102 List *pos = NULL;
1103 DFileTrans *trans = NULL;
1104 uint32_t allRetryCount = 0;
1105 LIST_FOR_EACH(pos, &session->dFileTransChain) { /* for client , only one peer */
1106 trans = (DFileTrans *)pos;
1107 allRetryCount += trans->retryCount;
1108 }
1109 peerInfo->allDtransRetryCount = allRetryCount;
1110 }
1111
DFileRecvCalculateRate(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)1112 static void DFileRecvCalculateRate(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
1113 {
1114 struct timespec nowTime;
1115 uint64_t recvCount;
1116 uint32_t timeOut;
1117
1118 if (session->sessionType != DFILE_SESSION_TYPE_SERVER ||
1119 dFileFrame->header.type != NSTACKX_DFILE_FILE_DATA_FRAME) {
1120 return;
1121 }
1122
1123 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
1124 if (peerInfo == NULL) {
1125 return;
1126 }
1127
1128 timeOut = (peerInfo->connType == CONNECT_TYPE_P2P) ? NSTACKX_P2P_MAX_CONTROL_FRAME_TIMEOUT :
1129 NSTACKX_WLAN_MAX_CONTROL_FRAME_TIMEOUT;
1130
1131 peerInfo->recvCount++;
1132 ClockGetTime(CLOCK_MONOTONIC, &nowTime);
1133 uint64_t measureElapse = GetTimeDiffUs(&nowTime, &session->measureBefore);
1134 if (measureElapse > peerInfo->rateStateInterval) {
1135 session->fileManager->iowRate = (uint32_t)(session->fileManager->iowBytes *
1136 NSTACKX_MICRO_TICKS / measureElapse / DFILE_KILOBYTES);
1137 session->fileManager->iowCount = session->fileManager->iowBytes / peerInfo->dataFrameSize *
1138 (NSTACKX_MILLI_TICKS * timeOut - peerInfo->rateStateInterval) / measureElapse;
1139 if (session->fileManager->iowRate > session->fileManager->iowMaxRate) {
1140 session->fileManager->iowMaxRate = session->fileManager->iowRate;
1141 }
1142 DFILE_LOGI(TAG, "measureElapse %llu iowBytes %llu iowCount %llu IO write rate : %u KB/s", measureElapse,
1143 session->fileManager->iowBytes, session->fileManager->iowCount, session->fileManager->iowRate);
1144 session->fileManager->iowBytes = 0;
1145 ClockGetTime(CLOCK_MONOTONIC, &session->measureBefore);
1146 }
1147
1148 measureElapse = GetTimeDiffUs(&nowTime, &peerInfo->startTime);
1149 if (measureElapse > peerInfo->rateStateInterval) {
1150 recvCount = (uint64_t)peerInfo->recvCount;
1151 peerInfo->recvFrameRate = (uint32_t)(recvCount * DATA_FRAME_SEND_INTERVAL_US / measureElapse);
1152 peerInfo->recvCountRateMB = (uint32_t)(recvCount * peerInfo->dataFrameSize *
1153 NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1154 peerInfo->recvCount = 0;
1155 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1156 }
1157 }
1158
CheckElapseTime(const struct timespec * before,uint64_t overRun)1159 static uint64_t CheckElapseTime(const struct timespec *before, uint64_t overRun)
1160 {
1161 struct timespec now;
1162 ClockGetTime(CLOCK_MONOTONIC, &now);
1163 uint64_t elapseUs = GetTimeDiffUs(&now, before);
1164 elapseUs += overRun;
1165 if (elapseUs < DATA_FRAME_SEND_INTERVAL_US) {
1166 if (usleep((useconds_t)((uint64_t)DATA_FRAME_SEND_INTERVAL_US - elapseUs)) != NSTACKX_EOK) {
1167 DFILE_LOGE(TAG, "usleep(DATA_FRAME_SEND_INTERVAL_US - elapseUs) failed %d", errno);
1168 }
1169 return 0;
1170 } else {
1171 uint64_t delta = (elapseUs - DATA_FRAME_SEND_INTERVAL_US);
1172 return delta;
1173 }
1174 }
1175
BindClientSendThreadToTargetCpu(uint32_t idx)1176 static void BindClientSendThreadToTargetCpu(uint32_t idx)
1177 {
1178 int32_t cpu;
1179 int32_t cpus = GetCpuNum();
1180 if (cpus >= FIRST_CPU_NUM_LEVEL) {
1181 return;
1182 } else if (cpus >= SECOND_CPU_NUM_LEVEL) {
1183 cpu = CPU_IDX_1 + (int32_t)idx;
1184 } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
1185 cpu = CPU_IDX_1;
1186 } else {
1187 return;
1188 }
1189 if (cpu > cpus) {
1190 cpu = cpus - 1;
1191 }
1192 StartThreadBindCore(cpu);
1193 }
1194
DFileSenderUpdateMeasureTime(DFileSession * session,uint8_t socketIndex)1195 static void DFileSenderUpdateMeasureTime(DFileSession *session, uint8_t socketIndex)
1196 {
1197 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
1198 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
1199 if (peerInfo == NULL) {
1200 return;
1201 }
1202 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1203 BindClientSendThreadToTargetCpu(0);
1204 }
1205 }
1206
TerminateMainThreadFatalInner(void * arg)1207 static void TerminateMainThreadFatalInner(void *arg)
1208 {
1209 DFileSession *session = (DFileSession *)arg;
1210 DFileSessionSetFatalFlag(session);
1211 }
1212
PostFatalEvent(DFileSession * session)1213 static void PostFatalEvent(DFileSession *session)
1214 {
1215 if (PostEvent(&session->eventNodeChain, session->epollfd, TerminateMainThreadFatalInner, session) != NSTACKX_EOK) {
1216 DFILE_LOGE(TAG, "PostEvent TerminateMainThreadFatalInner failed");
1217 DFileSessionSetFatalFlag(session);
1218 }
1219 }
1220
GetSocketWaitMs(uint32_t threadNum)1221 static uint32_t GetSocketWaitMs(uint32_t threadNum)
1222 {
1223 if (threadNum > 1) {
1224 return MULTI_THREADS_SOCKET_WAIT_TIME_MS;
1225 }
1226 return DEFAULT_WAIT_TIME_MS;
1227 }
1228
SetSendThreadName(uint32_t threadIdx)1229 static void SetSendThreadName(uint32_t threadIdx)
1230 {
1231 char name[MAX_THREAD_NAME_LEN] = {0};
1232 if (sprintf_s(name, sizeof(name), "%s%u", DFFILE_SEND_THREAD_NAME_PREFIX, threadIdx) < 0) {
1233 DFILE_LOGE(TAG, "sprintf send thead name failed");
1234 }
1235 SetThreadName(name);
1236 }
1237
1238 typedef struct {
1239 struct DFileSession *session;
1240 uint32_t threadIdx;
1241 }SendThreadCtx;
1242
DFileAddiSenderHandle(void * arg)1243 static void *DFileAddiSenderHandle(void *arg)
1244 {
1245 SendThreadCtx *sendThreadCtx = (SendThreadCtx *)arg;
1246 struct DFileSession *session = sendThreadCtx->session;
1247 uint32_t threadIdx = sendThreadCtx->threadIdx;
1248 free(sendThreadCtx);
1249 int32_t ret = NSTACKX_EOK;
1250 DFILE_LOGI(TAG, "send thread %u start", threadIdx);
1251 SetSendThreadName(threadIdx);
1252 SetTidToBindInfo(session, threadIdx + POS_SEND_THERD_START);
1253 List unsent;
1254 uint8_t canWrite = NSTACKX_FALSE;
1255 uint32_t socketWaitMs = GetSocketWaitMs(session->clientSendThreadNum);
1256 BindClientSendThreadToTargetCpu(threadIdx + 1);
1257 ListInitHead(&unsent);
1258 while (!session->addiSenderCloseFlag) {
1259 if (ListIsEmpty(&unsent) && !FileManagerHasPendingData(session->fileManager)) {
1260 NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
1261 SemWait(&session->sendThreadPara[threadIdx].sendWait);
1262 if (session->addiSenderCloseFlag) {
1263 break;
1264 }
1265 }
1266 if (ret == NSTACKX_EAGAIN) {
1267 ret = WaitSocketEvent(NULL, session->socket[0]->sockfd, socketWaitMs, NULL, &canWrite);
1268 if (ret != NSTACKX_EOK || session->closeFlag) {
1269 break;
1270 }
1271 if (!canWrite) {
1272 ret = NSTACKX_EAGAIN;
1273 continue;
1274 }
1275 }
1276 ret = SendDataFrame(session, &unsent, threadIdx, 0);
1277 if ((ret < 0 && ret != NSTACKX_EAGAIN) || session->closeFlag) {
1278 break;
1279 }
1280 if (ret == NSTACKX_EAGAIN) {
1281 continue;
1282 }
1283 SemWait(&session->sendThreadPara[threadIdx].semNewCycle);
1284 }
1285 if (ret < 0 && ret != NSTACKX_EAGAIN) {
1286 PostFatalEvent(session);
1287 }
1288 DestroyIovList(&unsent, session, threadIdx);
1289 return NULL;
1290 }
1291
CloseAddiSendThread(struct DFileSession * session)1292 static void CloseAddiSendThread(struct DFileSession *session)
1293 {
1294 if (session->sessionType == DFILE_SESSION_TYPE_SERVER || session->clientSendThreadNum <= 1) {
1295 return;
1296 }
1297 session->addiSenderCloseFlag = NSTACKX_TRUE;
1298 for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
1299 SemPost(&session->sendThreadPara[i].sendWait);
1300 SemPost(&session->sendThreadPara[i].semNewCycle);
1301 PthreadJoin(session->sendThreadPara[i].senderTid, NULL);
1302 SemDestroy(&session->sendThreadPara[i].sendWait);
1303 SemDestroy(&session->sendThreadPara[i].semNewCycle);
1304 }
1305 }
1306
ErrorHandle(struct DFileSession * session,uint16_t cnt)1307 static void ErrorHandle(struct DFileSession *session, uint16_t cnt)
1308 {
1309 while (cnt > 0) {
1310 SemPost(&session->sendThreadPara[cnt - 1].sendWait);
1311 SemPost(&session->sendThreadPara[cnt - 1].semNewCycle);
1312 PthreadJoin(session->sendThreadPara[cnt - 1].senderTid, NULL);
1313 SemDestroy(&session->sendThreadPara[cnt - 1].sendWait);
1314 SemDestroy(&session->sendThreadPara[cnt - 1].semNewCycle);
1315 cnt--;
1316 }
1317 }
1318
CreateAddiSendThread(struct DFileSession * session)1319 static int32_t CreateAddiSendThread(struct DFileSession *session)
1320 {
1321 uint16_t i;
1322 if (session->sessionType == DFILE_SESSION_TYPE_SERVER || session->clientSendThreadNum <= 1) {
1323 return NSTACKX_EOK;
1324 }
1325 session->addiSenderCloseFlag = NSTACKX_FALSE;
1326 for (i = 0; i < session->clientSendThreadNum - 1; i++) {
1327 SendThreadCtx *sendThreadCtx = (SendThreadCtx *)calloc(1, sizeof(SendThreadCtx));
1328 if (sendThreadCtx == NULL) {
1329 goto L_ERR_SENDER_THREAD;
1330 }
1331
1332 if (SemInit(&session->sendThreadPara[i].sendWait, 0, 0) != 0) {
1333 free(sendThreadCtx);
1334 goto L_ERR_SENDER_THREAD;
1335 }
1336
1337 if (SemInit(&session->sendThreadPara[i].semNewCycle, 0, 0) != 0) {
1338 SemDestroy(&session->sendThreadPara[i].sendWait);
1339 free(sendThreadCtx);
1340 goto L_ERR_SENDER_THREAD;
1341 }
1342
1343 sendThreadCtx->session = session;
1344 sendThreadCtx->threadIdx = i;
1345 if (PthreadCreate(&(session->sendThreadPara[i].senderTid), NULL, DFileAddiSenderHandle, sendThreadCtx)) {
1346 DFILE_LOGE(TAG, "Create sender thread failed");
1347 SemDestroy(&session->sendThreadPara[i].sendWait);
1348 SemDestroy(&session->sendThreadPara[i].semNewCycle);
1349 free(sendThreadCtx);
1350 goto L_ERR_SENDER_THREAD;
1351 }
1352 }
1353 return NSTACKX_EOK;
1354
1355 L_ERR_SENDER_THREAD:
1356 session->addiSenderCloseFlag = NSTACKX_TRUE;
1357 ErrorHandle(session, i);
1358 return NSTACKX_EFAILED;
1359 }
1360
UpdatePeerinfoQdiscInfo(PeerInfo * peerInfo,uint32_t qDiscLeft)1361 static void UpdatePeerinfoQdiscInfo(PeerInfo *peerInfo, uint32_t qDiscLeft)
1362 {
1363 if (peerInfo->qdiscMinLeft == 0) {
1364 peerInfo->qdiscMinLeft = (uint16_t)qDiscLeft;
1365 } else if (peerInfo->qdiscMinLeft > qDiscLeft) {
1366 peerInfo->qdiscMinLeft = (uint16_t)qDiscLeft;
1367 }
1368 if (peerInfo->qdiscMaxLeft < qDiscLeft) {
1369 peerInfo->qdiscMaxLeft = (uint16_t)qDiscLeft;
1370 }
1371 peerInfo->qdiscSearchNum++;
1372 peerInfo->qdiscAveLeft += qDiscLeft;
1373 }
1374
UpdatePeerinfoAmendSendrateByQdisc(PeerInfo * peerInfo)1375 static void UpdatePeerinfoAmendSendrateByQdisc(PeerInfo *peerInfo)
1376 {
1377 uint32_t qDiscLeft;
1378 uint32_t qDiscLeftSendRate;
1379
1380 if (GetQdiscLen(peerInfo->localInterfaceName, ROOT_QUEUE, &qDiscLeft) == NSTACKX_EOK) {
1381 if (peerInfo->mtuInuse == 0) {
1382 return;
1383 }
1384 uint32_t mtuNumInOneFrame = peerInfo->dataFrameSize / peerInfo->mtuInuse;
1385 if (mtuNumInOneFrame == 0) {
1386 mtuNumInOneFrame = 1;
1387 }
1388 UpdatePeerinfoQdiscInfo(peerInfo, qDiscLeft);
1389 qDiscLeftSendRate = qDiscLeft / mtuNumInOneFrame;
1390 if (peerInfo->sendRate >= qDiscLeftSendRate) {
1391 peerInfo->amendSendRate = (int32_t)qDiscLeftSendRate;
1392 } else {
1393 peerInfo->amendSendRate = peerInfo->sendRate;
1394 }
1395 }
1396 return;
1397 }
1398
UpdateInfoAfterSend(DFileSession * session,PeerInfo * peerInfo,int32_t curAmendSendRate,struct timespec * before,uint8_t socketIndex)1399 static void UpdateInfoAfterSend(DFileSession *session, PeerInfo *peerInfo, int32_t curAmendSendRate,
1400 struct timespec *before, uint8_t socketIndex)
1401 {
1402 session->cycleRunning[socketIndex] = NSTACKX_FALSE;
1403 if (curAmendSendRate > 0 && peerInfo->intervalSendCount >= (uint32_t)curAmendSendRate) {
1404 if (peerInfo->decreaseStatus == 1) {
1405 peerInfo->overRun = 0;
1406 peerInfo->decreaseStatus = 0;
1407 }
1408 peerInfo->overRun = CheckElapseTime(before, peerInfo->overRun);
1409 if (peerInfo->overRun == 0) {
1410 session->sleepTimes++;
1411 }
1412 }
1413 peerInfo->intervalSendCount = 0;
1414
1415 UpdatePeerinfoAmendSendrateByQdisc(peerInfo);
1416 }
1417
DFileSessionSendFrame(DFileSession * session,QueueNode ** preQueueNode,List * unsent,struct timespec * before,uint8_t socketIndex)1418 static int32_t DFileSessionSendFrame(DFileSession *session, QueueNode **preQueueNode, List *unsent,
1419 struct timespec *before, uint8_t socketIndex)
1420 {
1421 int32_t ret;
1422
1423 if (CapsTcp(session) && !ListIsEmpty(unsent)) {
1424 session->sendRemain = 1;
1425 ret = SendDataFrame(session, unsent, (uint32_t)(session->clientSendThreadNum - 1), socketIndex);
1426 session->sendRemain = 0;
1427 if ((ret == NSTACKX_EFAILED || ret == NSTACKX_EAGAIN) || session->closeFlag) {
1428 DFILE_LOGI(TAG, "ret is %d", ret);
1429 return ret;
1430 }
1431 }
1432
1433 ret = SendOutboundFrame(session, preQueueNode);
1434 if (session->sessionType != DFILE_SESSION_TYPE_CLIENT) {
1435 return ret;
1436 }
1437 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
1438 if (peerInfo == NULL) {
1439 DFILE_LOGE(TAG, "can't get valid peerinfo");
1440 return NSTACKX_EFAILED;
1441 }
1442 if (peerInfo->mtuInuse == 0) {
1443 return ret;
1444 }
1445
1446 if (ret == NSTACKX_EOK) {
1447 if (!session->cycleRunning[socketIndex]) {
1448 session->cycleRunning[socketIndex] = NSTACKX_TRUE;
1449 }
1450 ret = SendDataFrame(session, unsent, (uint32_t)(session->clientSendThreadNum - 1), socketIndex);
1451 }
1452 int32_t curAmendSendRate = peerInfo->amendSendRate;
1453 DFileSendCalculateRate(session, peerInfo);
1454
1455 if ((ret == NSTACKX_EFAILED || ret == NSTACKX_EAGAIN) || session->closeFlag) {
1456 DFILE_LOGI(TAG, "ret is %d and peerInfo->intervalSendCount is %u", ret, peerInfo->intervalSendCount);
1457 return ret;
1458 }
1459
1460 UpdateInfoAfterSend(session, peerInfo, curAmendSendRate, before, socketIndex);
1461
1462 for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
1463 SemPost(&session->sendThreadPara[i].semNewCycle);
1464 }
1465 return ret;
1466 }
1467
WaitNewSendData(const QueueNode * queueNode,const List * unsent,DFileSession * session,uint8_t socketIndex)1468 static void WaitNewSendData(const QueueNode *queueNode, const List *unsent, DFileSession *session, uint8_t socketIndex)
1469 {
1470 if (queueNode == NULL && ListIsEmpty(unsent) && !FileManagerHasPendingData(session->fileManager) &&
1471 !session->outboundQueueSize) {
1472 NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
1473 SemWait(&session->outboundQueueWait[socketIndex]);
1474 }
1475 }
1476
DFileSenderPre(DFileSession * session,uint8_t socketIndex)1477 static void DFileSenderPre(DFileSession *session, uint8_t socketIndex)
1478 {
1479 SetSendThreadName((uint32_t)(session->clientSendThreadNum + socketIndex - 1));
1480 DFileSenderUpdateMeasureTime(session, socketIndex);
1481 SetMaximumPriorityForThread();
1482 uint32_t pos = (uint32_t)(session->clientSendThreadNum - 1 + socketIndex + POS_SEND_THERD_START);
1483 if (pos >= POS_SEND_THERD_START + NSTACKX_MAX_CLIENT_SEND_THREAD_NUM) {
1484 return;
1485 }
1486
1487 SetTidToBindInfo(session, pos);
1488 }
1489
DFileSenderClose(DFileSession * session,QueueNode * queueNode,List * unsent,void * arg)1490 void DFileSenderClose(DFileSession *session, QueueNode *queueNode, List *unsent, void *arg)
1491 {
1492 DFILE_LOGI(TAG, "DFileSendCalculateRate: total send block num %llu.", session->totalSendBlocks);
1493 CloseAddiSendThread(session);
1494 DestroyIovList(unsent, session, session->clientSendThreadNum - 1U);
1495 DestroyQueueNode(queueNode);
1496 free(arg);
1497 return;
1498 }
1499
DFileSenderHandle(void * arg)1500 void *DFileSenderHandle(void *arg)
1501 {
1502 DFileSession *session = ((SenderThreadPara*)arg)->session;
1503 uint8_t socketIndex = ((SenderThreadPara*)arg)->socketIndex;
1504 QueueNode *queueNode = NULL;
1505 List unsent;
1506 int32_t ret = NSTACKX_EOK;
1507 struct timespec before;
1508 uint8_t canWrite = NSTACKX_FALSE;
1509 uint32_t socketWaitMs = GetSocketWaitMs(session->clientSendThreadNum);
1510 uint8_t isBind = NSTACKX_FALSE;
1511
1512 if (CreateAddiSendThread(session) != NSTACKX_EOK) {
1513 PostFatalEvent(session);
1514 return NULL;
1515 }
1516 ListInitHead(&unsent);
1517 DFileSenderPre(session, socketIndex);
1518 while (!session->closeFlag) {
1519 WaitNewSendData(queueNode, &unsent, session, socketIndex);
1520 if (session->closeFlag) {
1521 break;
1522 }
1523 if (!session->cycleRunning[socketIndex]) {
1524 ClockGetTime(CLOCK_MONOTONIC, &before);
1525 }
1526 if (ret == NSTACKX_EAGAIN) {
1527 ret = WaitSocketEvent(NULL, session->socket[socketIndex]->sockfd, socketWaitMs, NULL, &canWrite);
1528 if (ret != NSTACKX_EOK || session->closeFlag) {
1529 break;
1530 }
1531 if (!canWrite) {
1532 ret = NSTACKX_EAGAIN;
1533 continue;
1534 }
1535 }
1536 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && isBind == NSTACKX_FALSE &&
1537 session->transFlag == NSTACKX_TRUE) {
1538 BindClientSendThreadToTargetCpu(0);
1539 isBind = NSTACKX_TRUE;
1540 }
1541 ret = DFileSessionSendFrame(session, &queueNode, &unsent, &before, socketIndex);
1542 if (ret < 0 && ret != NSTACKX_EAGAIN) {
1543 PeerShuttedEvent();
1544 PostFatalEvent(session);
1545 break;
1546 }
1547 }
1548 DFileSenderClose(session, queueNode, &unsent, arg);
1549 return NULL;
1550 }
1551
ClearDFileFrameList(List * head)1552 static void ClearDFileFrameList(List *head)
1553 {
1554 if (head == NULL || ListIsEmpty(head)) {
1555 return;
1556 }
1557 List *tmp = NULL;
1558 List *pos = NULL;
1559 LIST_FOR_EACH_SAFE(pos, tmp, head) {
1560 QueueNode *node = (QueueNode *)pos;
1561 ListRemoveNode(&node->list);
1562 DestroyQueueNode(node);
1563 }
1564 }
1565
CheckDfileType(uint8_t type)1566 static inline int32_t CheckDfileType(uint8_t type)
1567 {
1568 if (type >= NSTACKX_DFILE_TYPE_MAX || type < NSTACKX_DFILE_FILE_HEADER_FRAME) {
1569 return NSTACKX_EFAILED;
1570 }
1571 return NSTACKX_EOK;
1572 }
1573
CheckSessionType(DFileSessionType type)1574 static inline int32_t CheckSessionType(DFileSessionType type)
1575 {
1576 if (type < DFILE_SESSION_TYPE_CLIENT || type > DFILE_SESSION_TYPE_SERVER) {
1577 return NSTACKX_EFAILED;
1578 }
1579 return NSTACKX_EOK;
1580 }
1581
ProcessDFileFrameList(DFileSession * session,List * head)1582 static void ProcessDFileFrameList(DFileSession *session, List *head)
1583 {
1584 QueueNode *queueNode = NULL;
1585 DFileFrame *dFileFrame = NULL;
1586 struct sockaddr_in *peerAddr = NULL;
1587 int32_t handleFrameRet = NSTACKX_EOK;
1588 while (!ListIsEmpty(head) && !session->closeFlag) {
1589 queueNode = (QueueNode *)ListPopFront(head);
1590 if (queueNode == NULL) {
1591 continue;
1592 }
1593 dFileFrame = (DFileFrame *)(queueNode->frame);
1594 peerAddr = &queueNode->peerAddr;
1595 uint8_t type = dFileFrame->header.type;
1596
1597 if (ntohs(dFileFrame->header.length) > NSTACKX_MAX_FRAME_SIZE - sizeof(DFileFrameHeader)) {
1598 DFILE_LOGE(TAG, "header length %u is too big", ntohs(dFileFrame->header.length));
1599 DestroyQueueNode(queueNode);
1600 continue;
1601 }
1602 if (CheckDfileType(dFileFrame->header.type) != NSTACKX_EOK ||
1603 CheckSessionType(session->sessionType) != NSTACKX_EOK) {
1604 handleFrameRet = NSTACKX_EFAILED;
1605 } else if (dFileFrame->header.type == NSTACKX_DFILE_SETTING_FRAME) {
1606 DFileSessionHandleSetting(session, dFileFrame, peerAddr, queueNode->socketIndex);
1607 } else if (dFileFrame->header.type == NSTACKX_DFILE_RST_FRAME &&
1608 dFileFrame->header.transId == 0) {
1609 DFileSessionHandleRst(session, dFileFrame, peerAddr);
1610 } else if (dFileFrame->header.type == NSTACKX_DFILE_FILE_BACK_PRESSURE_FRAME) {
1611 DFileSessionHandleBackPressure(session, dFileFrame, peerAddr);
1612 } else {
1613 /*
1614 * For NSTACKX_DFILE_FILE_DATA_FRAME, "dFileFrame" may be free when handling failed, and become invalid.
1615 */
1616 handleFrameRet = DFileSessionHandleFrame(session, dFileFrame, peerAddr);
1617 }
1618
1619 if (handleFrameRet != NSTACKX_EOK || type != NSTACKX_DFILE_FILE_DATA_FRAME) {
1620 /* For FILE_DATA frame, the memory is passed to file manager. */
1621 free(queueNode->frame);
1622 queueNode->frame = NULL;
1623 }
1624 free(queueNode);
1625 }
1626 ClearDFileFrameList(head);
1627 }
1628
ReadEventHandle(void * arg)1629 static void ReadEventHandle(void *arg)
1630 {
1631 DFileSession *session = arg;
1632 List newHead;
1633 ListInitHead(&newHead);
1634 struct timespec before, now;
1635 if (!session->partReadFlag) {
1636 NSTACKX_ATOM_FETCH_DEC(&session->unprocessedReadEventCount);
1637 } else {
1638 ClockGetTime(CLOCK_MONOTONIC, &before);
1639 }
1640 while (session->inboundQueueSize && !session->closeFlag) {
1641 if (session->partReadFlag) {
1642 ClockGetTime(CLOCK_MONOTONIC, &now);
1643 if (GetTimeDiffMs(&now, &before) >= DEFAULT_WAIT_TIME_MS) {
1644 break;
1645 }
1646 }
1647 if (PthreadMutexLock(&session->inboundQueueLock) != 0) {
1648 DFILE_LOGE(TAG, "PthreadMutexLock error");
1649 return;
1650 }
1651 ListMove(&session->inboundQueue, &newHead);
1652 session->recvBlockNumInner += session->inboundQueueSize;
1653 session->inboundQueueSize = 0;
1654 if (PthreadMutexUnlock(&session->inboundQueueLock) != 0) {
1655 DFILE_LOGE(TAG, "PthreadMutexUnlock error");
1656 break;
1657 }
1658 ProcessDFileFrameList(session, &newHead);
1659 }
1660 ClearDFileFrameList(&newHead);
1661 }
1662
DFileAddInboundQueue(DFileSession * session,const uint8_t * frame,size_t frameLength,struct sockaddr_in * peerAddr,uint8_t socketIndex)1663 static int32_t DFileAddInboundQueue(DFileSession *session, const uint8_t *frame, size_t frameLength,
1664 struct sockaddr_in *peerAddr, uint8_t socketIndex)
1665 {
1666 if (session->inboundQueueSize > MAX_RECVBUF_COUNT) {
1667 if (session->inboundQueueSize % MAX_NOMEM_PRINT == 0) {
1668 DFILE_LOGI(TAG, "no mem inboundQueueSize:%llu", session->inboundQueueSize);
1669 }
1670 return NSTACKX_ENOMEM;
1671 }
1672 QueueNode *queueNode = CreateQueueNode(frame, frameLength, peerAddr, socketIndex);
1673 if (queueNode == NULL) {
1674 return NSTACKX_ENOMEM;
1675 }
1676
1677 if (PthreadMutexLock(&session->inboundQueueLock) != 0) {
1678 DestroyQueueNode(queueNode);
1679 return NSTACKX_EFAILED;
1680 }
1681 ListInsertTail(&session->inboundQueue, &queueNode->list);
1682 session->inboundQueueSize++;
1683 session->recvBlockNumDirect++;
1684
1685 if (PthreadMutexUnlock(&session->inboundQueueLock) != 0) {
1686 /* queue node is pushed to list, don't need destroy here. */
1687 return NSTACKX_EFAILED;
1688 }
1689
1690 return NSTACKX_EOK;
1691 }
1692
DFileReceiverUpdateSessionMeasureTime(DFileSession * session)1693 static void DFileReceiverUpdateSessionMeasureTime(DFileSession *session)
1694 {
1695 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
1696 ClockGetTime(CLOCK_MONOTONIC, &session->measureBefore);
1697 }
1698 }
1699
BindServerRecvThreadToTargetCpu(DFileSession * session)1700 static void BindServerRecvThreadToTargetCpu(DFileSession *session)
1701 {
1702 int32_t cpu;
1703 int32_t cpus = GetCpuNum();
1704 if (session->sessionType != DFILE_SESSION_TYPE_SERVER) {
1705 cpu = CPU_IDX_2;
1706 StartThreadBindCore(cpu);
1707 return;
1708 }
1709 if (cpus >= FIRST_CPU_NUM_LEVEL) {
1710 return;
1711 } else if (cpus >= SECOND_CPU_NUM_LEVEL) {
1712 cpu = CPU_IDX_1;
1713 } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
1714 cpu = CPU_IDX_0;
1715 } else {
1716 return;
1717 }
1718 StartThreadBindCore(cpu);
1719 }
1720
PostReadEventToMainLoop(DFileSession * session)1721 static void PostReadEventToMainLoop(DFileSession *session)
1722 {
1723 if (NSTACKX_ATOM_FETCH(&(session->unprocessedReadEventCount)) >= MAX_UNPROCESSED_READ_EVENT_COUNT) {
1724 return;
1725 }
1726 NSTACKX_ATOM_FETCH_INC(&session->unprocessedReadEventCount);
1727 if (PostEvent(&session->eventNodeChain, session->epollfd, ReadEventHandle, session) != NSTACKX_EOK) {
1728 DFILE_LOGE(TAG, "post read event failed");
1729 NSTACKX_ATOM_FETCH_DEC(&session->unprocessedReadEventCount);
1730 session->mainLoopActiveReadFlag = NSTACKX_TRUE;
1731 } else {
1732 session->mainLoopActiveReadFlag = NSTACKX_FALSE;
1733 }
1734 }
1735
DFileSessionHandleReadBuffer(DFileSession * session,const uint8_t * buf,size_t bufLen,struct sockaddr_in * peerAddr,uint8_t socketIndex)1736 int32_t DFileSessionHandleReadBuffer(DFileSession *session, const uint8_t *buf, size_t bufLen,
1737 struct sockaddr_in *peerAddr, uint8_t socketIndex)
1738 {
1739 DFileFrame *dFileFrame = NULL;
1740 if (DecodeDFileFrame(buf, bufLen, &dFileFrame) != NSTACKX_EOK) {
1741 /* discard packet with non-zero trans id during cancel stage */
1742 DFILE_LOGE(TAG, "drop malformed frame");
1743 return NSTACKX_EOK;
1744 }
1745
1746 int32_t ret = DFileAddInboundQueue(session, buf, bufLen, peerAddr, socketIndex);
1747 if (ret == NSTACKX_ENOMEM) {
1748 return NSTACKX_EOK;
1749 }
1750 if (ret != NSTACKX_EOK) {
1751 DFILE_LOGE(TAG, "frame add in bound queue failed :%d", ret);
1752 return NSTACKX_EFAILED;
1753 }
1754 PostReadEventToMainLoop(session);
1755 DFileRecvCalculateRate(session, dFileFrame, peerAddr);
1756 return NSTACKX_EOK;
1757 }
1758
DFileRecverPre(DFileSession * session)1759 static void DFileRecverPre(DFileSession *session)
1760 {
1761 SetThreadName(DFFILE_RECV_THREAD_NAME);
1762 DFileReceiverUpdateSessionMeasureTime(session);
1763 SetMaximumPriorityForThread();
1764 SetTidToBindInfo(session, POS_RECV_THERD_START);
1765 }
1766
RcverWaitSocket(DFileSession * session,uint8_t * canRead)1767 int32_t RcverWaitSocket(DFileSession *session, uint8_t *canRead)
1768 {
1769 if (session->acceptFlag == 0) {
1770 return WaitSocketEvent(session, session->socket[0]->sockfd, DEFAULT_WAIT_TIME_MS, canRead, NULL);
1771 } else {
1772 return WaitSocketEvent(session, session->acceptSocket->sockfd, DEFAULT_WAIT_TIME_MS, canRead, NULL);
1773 }
1774 }
1775
DFileSocketRecv(DFileSession * session)1776 int32_t DFileSocketRecv(DFileSession *session)
1777 {
1778 return DFileSocketRecvSP(session);
1779 }
1780
DFileAcceptSocket(DFileSession * session)1781 int32_t DFileAcceptSocket(DFileSession *session)
1782 {
1783 session->acceptSocket = AcceptSocket(session->socket[0]);
1784 if (session->acceptSocket == NULL) {
1785 DFILE_LOGI(TAG, "accept socket failed");
1786 return NSTACKX_EFAILED;
1787 } else {
1788 DFILE_LOGI(TAG, "accept socket success");
1789 session->acceptFlag = 1;
1790 SetTcpKeepAlive(session->acceptSocket->sockfd);
1791 }
1792
1793 AcceptSocketEvent();
1794
1795 return NSTACKX_EOK;
1796 }
1797
DFileReceiverHandle(void * arg)1798 void *DFileReceiverHandle(void *arg)
1799 {
1800 DFileSession *session = arg;
1801 uint8_t canRead = NSTACKX_FALSE;
1802 int32_t ret = NSTACKX_EAGAIN;
1803 uint8_t isBind = NSTACKX_FALSE;
1804
1805 DFILE_LOGI(TAG, "recv thread start");
1806 DFileRecverPre(session);
1807 while (!session->closeFlag) {
1808 if (ret == NSTACKX_EAGAIN || !canRead) {
1809 ret = RcverWaitSocket(session, &canRead);
1810 if (ret != NSTACKX_EOK || session->closeFlag) {
1811 break;
1812 }
1813 }
1814 if (!canRead) {
1815 continue;
1816 }
1817 if (isBind == NSTACKX_FALSE && session->transFlag == NSTACKX_TRUE) {
1818 BindServerRecvThreadToTargetCpu(session);
1819 isBind = NSTACKX_TRUE;
1820 }
1821
1822 ret = DFileSocketRecv(session);
1823 if (ret != NSTACKX_EAGAIN && ret != NSTACKX_EOK) {
1824 PeerShuttedEvent();
1825 break;
1826 }
1827 }
1828 DFILE_LOGI(TAG, "Total recv blocks: direct %llu inner %llu", session->recvBlockNumDirect,
1829 session->recvBlockNumInner);
1830 if (ret < 0 && ret != NSTACKX_EAGAIN && ret != NSTACKX_PEER_CLOSE) {
1831 PostFatalEvent(session);
1832 }
1833
1834 DFILE_LOGI(TAG, "session %u Exit receiver thread ret %d", session->sessionId, ret);
1835 return NULL;
1836 }
1837
DFileControlHandle(void * arg)1838 void *DFileControlHandle(void *arg)
1839 {
1840 SetThreadName(DFFILE_CONTROL_THREAD_NAME);
1841 DFileSession *session = arg;
1842 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
1843 DFileSenderControlHandle(session);
1844 } else {
1845 DFileReceiverControlHandle(session);
1846 }
1847 return NULL;
1848 }
1849
RealPathFileName(FileListInfo * fileListInfo)1850 static int32_t RealPathFileName(FileListInfo *fileListInfo)
1851 {
1852 uint32_t i;
1853 int32_t ret = NSTACKX_EOK;
1854 for (i = 0; i < fileListInfo->fileNum; i++) {
1855 char *tmpFileName = fileListInfo->files[i];
1856 char *tmpFileNameRes = realpath(tmpFileName, NULL);
1857 if (tmpFileNameRes == NULL) {
1858 ret = NSTACKX_EFAILED;
1859 break;
1860 }
1861 fileListInfo->files[i] = tmpFileNameRes;
1862 free(tmpFileName);
1863 }
1864 if (ret != NSTACKX_EOK) {
1865 DFILE_LOGE(TAG, "realpath failed");
1866 }
1867 return ret;
1868 }
1869
FreeTransFileListInfo(FileListInfo * fileListInfo)1870 static void FreeTransFileListInfo(FileListInfo *fileListInfo)
1871 {
1872 free(fileListInfo->files);
1873 fileListInfo->files = NULL;
1874 if (fileListInfo->remotePath != NULL) {
1875 free(fileListInfo->remotePath);
1876 fileListInfo->remotePath = NULL;
1877 }
1878 free(fileListInfo);
1879 }
1880
DFileStartTransInner(DFileSession * session,FileListInfo * fileListInfo)1881 static int32_t DFileStartTransInner(DFileSession *session, FileListInfo *fileListInfo)
1882 {
1883 uint16_t transId = session->lastDFileTransId + 1;
1884 if (transId == 0) { /* overflow */
1885 transId = 1;
1886 }
1887
1888 PeerInfo *peerInfo = TransSelectPeerInfo(session);
1889 DFileTrans *trans = CreateTrans(transId, session, peerInfo, NSTACKX_TRUE);
1890 if (trans == NULL) {
1891 DFILE_LOGE(TAG, "CreateTrans error");
1892 return NSTACKX_ENOMEM;
1893 }
1894
1895 if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
1896 DFILE_LOGE(TAG, "set trans mtu failed");
1897 }
1898 if (RealPathFileName(fileListInfo) != NSTACKX_EOK) {
1899 DFileTransDestroy(trans);
1900 return NSTACKX_EFAILED;
1901 }
1902 int32_t ret = DFileTransSendFiles(trans, fileListInfo);
1903 if (ret != NSTACKX_EOK) {
1904 DFileTransDestroy(trans);
1905 DFILE_LOGE(TAG, "DFileTransSendFiles fail, error: %d", ret);
1906 return ret;
1907 }
1908 ret = DFileTransAddExtraInfo(trans, fileListInfo->pathType, fileListInfo->noticeFileNameType,
1909 fileListInfo->userData);
1910 if (ret != NSTACKX_EOK) {
1911 DFILE_LOGE(TAG, "DFileTransAddExtraInfo fail");
1912 DFileTransDestroy(trans);
1913 return NSTACKX_EFAILED;
1914 }
1915 trans->fileList->tarFlag = fileListInfo->tarFlag;
1916 trans->fileList->smallFlag = fileListInfo->smallFlag;
1917 trans->fileList->tarFile = fileListInfo->tarFile;
1918 trans->fileList->noSyncFlag = fileListInfo->noSyncFlag;
1919
1920 fileListInfo->userData = NULL;
1921 ListInsertTail(&(session->dFileTransChain), &(trans->list));
1922 session->lastDFileTransId = transId;
1923 if (fileListInfo->smallFlag == NSTACKX_TRUE) {
1924 session->smallListProcessingCnt++;
1925 } else {
1926 session->fileListProcessingCnt++;
1927 }
1928 /* Elements in ctx->fileListInfo->files[] are reused by dFileTranns, so don't need to free. */
1929 FreeTransFileListInfo(fileListInfo);
1930 return NSTACKX_EOK;
1931 }
1932
DFileStartTrans(DFileSession * session,FileListInfo * fileListInfo)1933 int32_t DFileStartTrans(DFileSession *session, FileListInfo *fileListInfo)
1934 {
1935 if (PthreadMutexLock(&session->transIdLock) != 0) {
1936 DFILE_LOGE(TAG, "pthread mutex lock error");
1937 return NSTACKX_EFAILED;
1938 }
1939 int32_t ret = DFileStartTransInner(session, fileListInfo);
1940 if (PthreadMutexUnlock(&session->transIdLock) != 0) {
1941 DFILE_LOGE(TAG, "pthread mutex unlock error");
1942 }
1943 return ret;
1944 }
1945
TerminateMainThreadInner(void * arg)1946 void TerminateMainThreadInner(void *arg)
1947 {
1948 DFileSession *session = (DFileSession *)arg;
1949 DFileSessionSetTerminateFlag(session);
1950 }
1951
StartDFileThreadsInner(DFileSession * session)1952 int32_t StartDFileThreadsInner(DFileSession *session)
1953 {
1954 if (PthreadCreate(&(session->tid), NULL, DFileMainLoop, session)) {
1955 DFILE_LOGE(TAG, "Create mainloop thread failed");
1956 goto L_ERR_MAIN_LOOP_THREAD;
1957 }
1958
1959 if (CreateSenderThread(session) != NSTACKX_EOK) {
1960 goto L_ERR_SENDER_THREAD;
1961 }
1962
1963 if (PthreadCreate(&(session->receiverTid), NULL, DFileReceiverHandle, session)) {
1964 DFILE_LOGE(TAG, "Create receiver thread failed");
1965 goto L_ERR_RECEIVER_THREAD;
1966 }
1967
1968 if (PthreadCreate(&(session->controlTid), NULL, DFileControlHandle, session)) {
1969 DFILE_LOGE(TAG, "Create control thread failed");
1970 goto L_ERR_CONTROL_THREAD;
1971 }
1972 return NSTACKX_EOK;
1973 L_ERR_CONTROL_THREAD:
1974 DFileSessionSetTerminateFlag(session);
1975 PthreadJoin(session->controlTid, NULL);
1976 session->receiverTid = INVALID_TID;
1977 L_ERR_RECEIVER_THREAD:
1978 DFileSessionSetTerminateFlag(session);
1979 PthreadJoin(session->senderTid[0], NULL);
1980 session->senderTid[0] = INVALID_TID;
1981 PostOutboundQueueWait(session);
1982 L_ERR_SENDER_THREAD:
1983 DFileSessionSetTerminateFlag(session);
1984 if (PostEvent(&session->eventNodeChain, session->epollfd, TerminateMainThreadInner, session) != NSTACKX_EOK) {
1985 DFILE_LOGE(TAG, "post terminate thread failed");
1986 }
1987 PthreadJoin(session->tid, NULL);
1988 session->tid = INVALID_TID;
1989 L_ERR_MAIN_LOOP_THREAD:
1990 return NSTACKX_EFAILED;
1991 }
1992
FileManagerMsgHandle(FileManagerMsgType msgType,int32_t errCode,void * context)1993 static void FileManagerMsgHandle(FileManagerMsgType msgType, int32_t errCode, void *context)
1994 {
1995 DFileSession *session = context;
1996 if (msgType == FILE_MANAGER_INNER_ERROR) {
1997 DFILE_LOGE(TAG, "Session (%u) fatal error -- File Manager error: %d", session->sessionId, errCode);
1998 PostFatalEvent(session);
1999 }
2000
2001 if (msgType == FILE_MANAGER_IN_PROGRESS) {
2002 NoticeSessionProgress(session);
2003 }
2004 }
2005
CreateFileManager(DFileSession * session,const uint8_t * key,uint32_t keyLen,uint8_t isSender,uint16_t connType)2006 int32_t CreateFileManager(DFileSession *session, const uint8_t *key, uint32_t keyLen, uint8_t isSender,
2007 uint16_t connType)
2008 {
2009 FileManagerMsgPara msgPara;
2010 if (session == NULL) {
2011 DFILE_LOGE(TAG, "invalid input");
2012 return NSTACKX_EINVAL;
2013 }
2014 if (isSender && (connType != CONNECT_TYPE_P2P && connType != CONNECT_TYPE_WLAN)) {
2015 DFILE_LOGE(TAG, "connType for sender is illagal");
2016 return NSTACKX_EINVAL;
2017 }
2018 if (keyLen > 0) {
2019 if ((keyLen != AES_128_KEY_LENGTH && keyLen != CHACHA20_KEY_LENGTH) || key == NULL) {
2020 DFILE_LOGE(TAG, "error key or key len");
2021 return NSTACKX_EFAILED;
2022 }
2023 if (!IsCryptoIncluded()) {
2024 DFILE_LOGE(TAG, "crypto is not opened");
2025 return NSTACKX_EFAILED;
2026 }
2027 }
2028 msgPara.epollfd = session->epollfd;
2029 msgPara.eventNodeChain = &session->eventNodeChain;
2030 msgPara.msgReceiver = FileManagerMsgHandle;
2031 msgPara.context = session;
2032 session->fileManager = FileManagerCreate(isSender, &msgPara, key, keyLen, connType);
2033 if (session->fileManager == NULL) {
2034 DFILE_LOGE(TAG, "create filemanager failed");
2035 return NSTACKX_EFAILED;
2036 }
2037 return NSTACKX_EOK;
2038 }
2039
DestroyReceiverPipe(DFileSession * session)2040 void DestroyReceiverPipe(DFileSession *session)
2041 {
2042 if (session->receiverPipe[PIPE_OUT] != INVALID_PIPE_DESC) {
2043 CloseDesc(session->receiverPipe[PIPE_OUT]);
2044 session->receiverPipe[PIPE_OUT] = INVALID_PIPE_DESC;
2045 }
2046 if (session->receiverPipe[PIPE_IN] != INVALID_PIPE_DESC) {
2047 CloseDesc(session->receiverPipe[PIPE_IN]);
2048 session->receiverPipe[PIPE_IN] = INVALID_PIPE_DESC;
2049 }
2050 }
2051
DFileSetEvent(void * softObj,DFileEventFunc func)2052 void DFileSetEvent(void *softObj, DFileEventFunc func)
2053 {
2054 g_dfileEventFunc = func;
2055 (void)softObj;
2056 }
2057
NSTACKX_DFileAssembleFunc(void * softObj,const DFileEvent * info)2058 void NSTACKX_DFileAssembleFunc(void *softObj, const DFileEvent *info)
2059 {
2060 if (g_dfileEventFunc != NULL) {
2061 g_dfileEventFunc(softObj, info);
2062 }
2063 }
2064