1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "nstackx_dfile_session.h"
17
18 #include "nstackx_congestion.h"
19 #include "nstackx_dfile.h"
20 #include "nstackx_dfile_config.h"
21 #include "nstackx_dfile_frame.h"
22 #include "nstackx_dfile_send.h"
23 #include "nstackx_epoll.h"
24 #include "nstackx_error.h"
25 #include "nstackx_event.h"
26 #include "nstackx_dfile_log.h"
27 #ifdef MBEDTLS_INCLUDED
28 #include "nstackx_mbedtls.h"
29 #else
30 #include "nstackx_openssl.h"
31 #endif
32 #include "nstackx_timer.h"
33 #include "nstackx_dfile_control.h"
34 #include "nstackx_dfile_mp.h"
35 #include "securec.h"
36 #include "nstackx_util.h"
37 #include "nstackx_dfile_dfx.h"
38
39 #define TAG "nStackXDFile"
40
41 #define DEFAULT_WAIT_TIME_MS 1000
42 #define MULTI_THREADS_SOCKET_WAIT_TIME_MS 1
43 #define DEFAULT_NEGOTIATE_TIMEOUT 1000
44 #define TCP_NEGOTIATE_TIMEOUT 10000
45 #define MAX_SERVER_NEGOTIATE_VALID_TIMEOUT (600 * DEFAULT_NEGOTIATE_TIMEOUT)
46 #define MAX_NEGOTIATE_TIMEOUT_COUNT 3
47 #define MAX_UNPROCESSED_READ_EVENT_COUNT 3
48 #define MAX_RECVBUF_COUNT 130000
49 #define MAX_NOMEM_PRINT 10000
50
51 static void ReadEventHandle(void *arg);
52 static void ProcessSessionTrans(const DFileSession *session, uint16_t exceptTransId);
53 DFileEventFunc g_dfileEventFunc;
54
CreateQueueNode(const uint8_t * frame,size_t length,const struct sockaddr_in * peerAddr,uint8_t socketIndex)55 static QueueNode *CreateQueueNode(const uint8_t *frame, size_t length,
56 const struct sockaddr_in *peerAddr, uint8_t socketIndex)
57 {
58 QueueNode *queueNode = NULL;
59
60 if (frame == NULL || length == 0 || length > NSTACKX_MAX_FRAME_SIZE) {
61 return NULL;
62 }
63
64 queueNode = calloc(1, sizeof(QueueNode));
65 if (queueNode == NULL) {
66 return NULL;
67 }
68
69 queueNode->frame = calloc(1, length);
70 if (queueNode->frame == NULL) {
71 free(queueNode);
72 return NULL;
73 }
74 queueNode->length = length;
75 queueNode->sendLen = 0;
76
77 if (memcpy_s(queueNode->frame, length, frame, length) != EOK) {
78 DestroyQueueNode(queueNode);
79 return NULL;
80 }
81 if (peerAddr != NULL) {
82 if (memcpy_s(&queueNode->peerAddr, sizeof(struct sockaddr_in), peerAddr, sizeof(struct sockaddr_in)) != EOK) {
83 DestroyQueueNode(queueNode);
84 return NULL;
85 }
86 }
87 queueNode->socketIndex = socketIndex;
88
89 return queueNode;
90 }
91
DestroyQueueNode(QueueNode * queueNode)92 void DestroyQueueNode(QueueNode *queueNode)
93 {
94 if (queueNode != NULL) {
95 free(queueNode->frame);
96 free(queueNode);
97 }
98 }
99
NotifyMsgRecver(const DFileSession * session,DFileMsgType msgType,const DFileMsg * msg)100 void NotifyMsgRecver(const DFileSession *session, DFileMsgType msgType, const DFileMsg *msg)
101 {
102 if (session == NULL) {
103 DFILE_LOGI(TAG, "session is NULL");
104 return;
105 }
106
107 if (session->msgReceiver == NULL) {
108 DFILE_LOGI(TAG, "msgReceiver is NULL");
109 return;
110 }
111
112 session->msgReceiver(session->sessionId, msgType, msg);
113 }
114
CalculateSessionTransferRatePrepare(DFileSession * session)115 void CalculateSessionTransferRatePrepare(DFileSession *session)
116 {
117 #ifdef NSTACKX_SMALL_FILE_SUPPORT
118 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && (!ListIsEmpty(&session->pendingFileLists)
119 || !ListIsEmpty(&session->smallFileLists))) {
120 return;
121 }
122 #else
123 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && !ListIsEmpty(&session->pendingFileLists)) {
124 return;
125 }
126 #endif
127 if (!ListIsEmpty(&session->dFileTransChain)) {
128 return;
129 }
130 DFILE_LOGI(TAG, "begin to calculate transfer rate");
131 session->bytesTransferred = 0;
132 session->transCount = 0;
133 ClockGetTime(CLOCK_MONOTONIC, &session->startTs);
134 }
135
136 #ifdef NSTACKX_SMALL_FILE_SUPPORT
SendSmallList(DFileSession * session)137 static int32_t SendSmallList(DFileSession *session)
138 {
139 FileListInfo *fileListInfo = NULL;
140 DFileMsg data;
141 if (session->fileListProcessingCnt > 0 || session->fileListPendingCnt == 0) {
142 while (!ListIsEmpty(&session->smallFileLists)) {
143 fileListInfo = (FileListInfo *)ListPopFront(&session->smallFileLists);
144 session->smallListPendingCnt--;
145 if (fileListInfo == NULL) {
146 continue;
147 }
148 int32_t ret = DFileStartTrans(session, fileListInfo);
149 if (ret == NSTACKX_EOK) {
150 return NSTACKX_TRUE;
151 }
152 DFILE_LOGE(TAG, "DFileStartTrans fail, error: %d", ret);
153 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
154 data.errorCode = ret;
155 data.fileList.files = (const char **)fileListInfo->files;
156 data.fileList.fileNum = fileListInfo->fileNum;
157 data.fileList.userData = fileListInfo->userData;
158 NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, &data);
159 DestroyFileListInfo(fileListInfo);
160 }
161 }
162 return NSTACKX_FALSE;
163 }
164 #endif
165
SendPendingList(DFileSession * session)166 static void SendPendingList(DFileSession *session)
167 {
168 FileListInfo *fileListInfo = NULL;
169 DFileMsg data;
170 while (!ListIsEmpty(&session->pendingFileLists)) {
171 fileListInfo = (FileListInfo *)ListPopFront(&session->pendingFileLists);
172 session->fileListPendingCnt--;
173 if (fileListInfo == NULL) {
174 continue;
175 }
176 int32_t ret = DFileStartTrans(session, fileListInfo);
177 if (ret == NSTACKX_EOK) {
178 break;
179 }
180 DFILE_LOGE(TAG, "DFileStartTrans fail, error: %d", ret);
181 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
182 data.errorCode = ret;
183 data.fileList.files = (const char **)fileListInfo->files;
184 data.fileList.fileNum = fileListInfo->fileNum;
185 data.fileList.userData = fileListInfo->userData;
186 NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, &data);
187 DestroyFileListInfo(fileListInfo);
188 }
189 }
190
SendSmallAndPendingList(DFileSession * session)191 static void SendSmallAndPendingList(DFileSession *session)
192 {
193 #ifdef NSTACKX_SMALL_FILE_SUPPORT
194 DFILE_LOGI(TAG, "fileListPendingCnt %u fileListProcessingCnt %u smallListPendingCnt %u smallListProcessingCnt %u",
195 session->fileListPendingCnt, session->fileListProcessingCnt, session->smallListPendingCnt,
196 session->smallListProcessingCnt);
197 if (SendSmallList(session) != NSTACKX_TRUE) {
198 SendPendingList(session);
199 }
200 #else
201 DFILE_LOGI(TAG, "fileListPendingCnt %u fileListProcessingCnt %u",
202 session->fileListPendingCnt, session->fileListProcessingCnt);
203 SendPendingList(session);
204 #endif
205 }
206
NoticeSessionProgress(DFileSession * session)207 void NoticeSessionProgress(DFileSession *session)
208 {
209 DFileMsg data;
210 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
211 if ((FileManagerGetTotalBytes(session->fileManager, &data.transferUpdate.totalBytes) == NSTACKX_EOK) &&
212 (FileManagerGetBytesTransferred(session->fileManager, &data.transferUpdate.bytesTransferred) == NSTACKX_EOK) &&
213 (data.transferUpdate.bytesTransferred <= data.transferUpdate.totalBytes) &&
214 (data.transferUpdate.bytesTransferred > 0)) {
215 NotifyMsgRecver(session, DFILE_ON_SESSION_IN_PROGRESS, &data);
216 }
217 }
218
UpdateMsgProcessInfo(const DFileSession * session,struct DFileTrans * dFileTrans,DFileTransMsgType msgType,DFileTransMsg * msg)219 static void UpdateMsgProcessInfo(const DFileSession *session, struct DFileTrans *dFileTrans,
220 DFileTransMsgType msgType, DFileTransMsg *msg)
221 {
222 uint64_t totalBytes = 0;
223 uint64_t bytesTrans = 0;
224
225 if (session == NULL || dFileTrans == NULL || msg == NULL ||
226 (msgType != DFILE_TRANS_MSG_FILE_RECEIVED && msgType != DFILE_TRANS_MSG_FILE_RECEIVE_FAIL &&
227 msgType != DFILE_TRANS_MSG_FILE_SEND_FAIL && msgType != DFILE_TRANS_MSG_FILE_SENT)) {
228 return;
229 }
230
231 msg->transferUpdate.transId = dFileTrans->transId;
232
233 if (msgType == DFILE_TRANS_MSG_FILE_RECEIVED || msgType == DFILE_TRANS_MSG_FILE_SENT) {
234 totalBytes = DFileTransGetTotalBytes(dFileTrans);
235 if (totalBytes > 0) {
236 bytesTrans = totalBytes;
237 goto L_END;
238 }
239 }
240
241 if (FileManagerGetTransUpdateInfo(session->fileManager, dFileTrans->transId, &totalBytes, &bytesTrans) !=
242 NSTACKX_EOK) {
243 return;
244 }
245
246 L_END:
247 msg->transferUpdate.totalBytes = totalBytes;
248 msg->transferUpdate.bytesTransferred = bytesTrans;
249 if (msgType == DFILE_TRANS_MSG_FILE_SENT) {
250 if (dFileTrans->fileList->vtransFlag) {
251 msg->fileList.transId = dFileTrans->fileList->vtransRealTransId;
252 msg->transferUpdate.transId = dFileTrans->fileList->vtransRealTransId;
253 }
254 NotifyMsgRecver(session, DFILE_ON_TRANS_IN_PROGRESS, msg);
255 }
256 }
257
WakeSendThread(DFileSession * session,uint8_t isSender,uint8_t socketIndex)258 static void WakeSendThread(DFileSession *session, uint8_t isSender, uint8_t socketIndex)
259 {
260 SemPost(&session->outboundQueueWait[socketIndex]);
261 if (isSender && session->clientSendThreadNum > 1) {
262 for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
263 SemPost(&session->sendThreadPara[i].sendWait);
264 }
265 }
266 }
267
CalculateSessionTransferRate(DFileSession * session,uint64_t totalBytes,DFileTransMsgType msgType)268 static void CalculateSessionTransferRate(DFileSession *session, uint64_t totalBytes, DFileTransMsgType msgType)
269 {
270 if (msgType != DFILE_TRANS_MSG_FILE_SENT && msgType != DFILE_TRANS_MSG_END) {
271 return;
272 }
273 if (totalBytes <= UINT64_MAX - session->bytesTransferred) {
274 session->bytesTransferred += totalBytes;
275 } else {
276 session->bytesTransferred = UINT64_MAX;
277 }
278 session->transCount++;
279 if (!ListIsEmpty(&session->dFileTransChain)) {
280 return;
281 }
282 #ifdef NSTACKX_SMALL_FILE_SUPPORT
283 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && (!ListIsEmpty(&session->pendingFileLists)
284 || !ListIsEmpty(&session->smallFileLists))) {
285 return;
286 }
287 #else
288 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && !ListIsEmpty(&session->pendingFileLists)) {
289 return;
290 }
291 #endif
292 struct timespec endTs;
293 ClockGetTime(CLOCK_MONOTONIC, &endTs);
294
295 uint32_t spendTime = GetTimeDiffMs(&endTs, &session->startTs);
296 if (spendTime == 0) {
297 return;
298 }
299 const double rate = 1.0 * session->bytesTransferred / DFILE_MEGABYTES * MSEC_TICKS_PER_SEC / spendTime;
300 DFILE_LOGI(TAG, "Total %u trans, %llu bytes, used %u ms. rate %.2f MB/s",
301 session->transCount, session->bytesTransferred, spendTime, rate);
302 DFileMsg msgData;
303 (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
304 msgData.rate = (uint32_t)rate;
305 NotifyMsgRecver(session, DFILE_ON_SESSION_TRANSFER_RATE, &msgData);
306
307 TransferCompleteEvent(rate);
308 }
309
CheckTransDone(DFileSession * session,struct DFileTrans * dFileTrans,DFileTransMsgType msgType)310 static void CheckTransDone(DFileSession *session, struct DFileTrans *dFileTrans, DFileTransMsgType msgType)
311 {
312 if (msgType == DFILE_TRANS_MSG_FILE_RECEIVE_FAIL || msgType == DFILE_TRANS_MSG_FILE_SENT ||
313 msgType == DFILE_TRANS_MSG_FILE_SEND_FAIL || msgType == DFILE_TRANS_MSG_END) {
314 uint8_t flag = dFileTrans->fileList->smallFlag;
315 if (SetTransIdState(session, dFileTrans->transId, STATE_TRANS_DONE) != NSTACKX_EOK) {
316 DFILE_LOGE(TAG, "set trans id state fail");
317 }
318 ((PeerInfo *)dFileTrans->context)->currentTransCount--;
319 ListRemoveNode(&dFileTrans->list);
320 uint64_t totalBytes = DFileTransGetTotalBytes(dFileTrans);
321 DFileTransDestroy(dFileTrans);
322 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
323 if (flag == NSTACKX_TRUE && session->smallListProcessingCnt > 0) {
324 session->smallListProcessingCnt--;
325 } else if (session->fileListProcessingCnt > 0) {
326 session->fileListProcessingCnt--;
327 }
328 SendSmallAndPendingList(session);
329 }
330 CalculateSessionTransferRate(session, totalBytes, msgType);
331 }
332 }
333
DTransMsgReceiver(struct DFileTrans * dFileTrans,DFileTransMsgType msgType,DFileTransMsg * msg)334 static void DTransMsgReceiver(struct DFileTrans *dFileTrans, DFileTransMsgType msgType,
335 DFileTransMsg *msg)
336 {
337 PeerInfo *peerInfo = dFileTrans->context;
338 DFileSession *session = peerInfo->session;
339
340 UpdateMsgProcessInfo(session, dFileTrans, msgType, msg);
341
342 switch (msgType) {
343 case DFILE_TRANS_MSG_FILE_SEND_DATA:
344 WakeSendThread(session, dFileTrans->isSender, peerInfo->socketIndex);
345 break;
346 case DFILE_TRANS_MSG_FILE_LIST_RECEIVED:
347 NotifyMsgRecver(session, DFILE_ON_FILE_LIST_RECEIVED, msg);
348 break;
349 case DFILE_TRANS_MSG_FILE_RECEIVED: /* Receiver receive all the file data */
350 NotifyMsgRecver(session, DFILE_ON_FILE_RECEIVE_SUCCESS, msg);
351 break;
352 case DFILE_TRANS_MSG_FILE_RECEIVED_TO_FAIL:
353 case DFILE_TRANS_MSG_FILE_RECEIVE_FAIL:
354 NotifyMsgRecver(session, DFILE_ON_FILE_RECEIVE_FAIL, msg);
355 break;
356 case DFILE_TRANS_MSG_FILE_SENT: /* Sender send TRANSFER DONE ACK frame and come to end */
357 NoticeSessionProgress(session);
358 NotifyMsgRecver(session, DFILE_ON_FILE_SEND_SUCCESS, msg);
359 break;
360 case DFILE_TRANS_MSG_FILE_SEND_FAIL:
361 NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, msg);
362 break;
363 case DFILE_TRANS_MSG_IN_PROGRESS:
364 NotifyMsgRecver(session, DFILE_ON_TRANS_IN_PROGRESS, msg);
365 break;
366 case DFILE_TRANS_MSG_FILE_SEND_ACK:
367 ProcessSessionTrans(session, dFileTrans->transId);
368 break;
369 default:
370 DFILE_LOGI(TAG, "transId %u, recv DFileTrans event %d", dFileTrans->transId, msgType);
371 }
372
373 CheckTransDone(session, dFileTrans, msgType);
374 }
375
ServerSettingTimeoutHandle(void * data)376 static void ServerSettingTimeoutHandle(void *data)
377 {
378 PeerInfo *peerInfo = data;
379 ListRemoveNode(&peerInfo->list);
380 TimerDelete(peerInfo->settingTimer);
381 peerInfo->session->peerInfoCnt--;
382 free(peerInfo);
383 DFILE_LOGI(TAG, "DFileServer Setting Negotationion timeout");
384 }
385
ClientSettingTimeoutHandle(void * data)386 static void ClientSettingTimeoutHandle(void *data)
387 {
388 PeerInfo *peerInfo = data;
389 uint8_t cnt = peerInfo->settingTimeoutCnt++;
390 DFileMsg msgData;
391 uint32_t timeout = CapsTcp(peerInfo->session) ? TCP_NEGOTIATE_TIMEOUT : DEFAULT_NEGOTIATE_TIMEOUT;
392 (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
393 if (cnt >= MAX_NEGOTIATE_TIMEOUT_COUNT) {
394 TimerDelete(peerInfo->settingTimer);
395 peerInfo->settingTimer = NULL;
396 peerInfo->settingTimeoutCnt = 0;
397 msgData.errorCode = NSTACKX_EFAILED;
398 DFILE_LOGI(TAG, "DFileClient Setting Negotationion timeout");
399 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &msgData);
400 } else {
401 DFileSessionSendSetting(peerInfo);
402 DFILE_LOGI(TAG, "Client Setting Negotationion timeout %u times", peerInfo->settingTimeoutCnt);
403 if (TimerSetTimeout(peerInfo->settingTimer, timeout, NSTACKX_FALSE) != NSTACKX_EOK) {
404 msgData.errorCode = NSTACKX_EFAILED;
405 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &msgData);
406 DFILE_LOGE(TAG, "Timer setting timer fail");
407 }
408 }
409 }
410
SearchDFileTransNode(List * dFileTransChain,uint16_t transId)411 static DFileTrans *SearchDFileTransNode(List *dFileTransChain, uint16_t transId)
412 {
413 List *pos = NULL;
414 DFileTrans *trans = NULL;
415
416 if (dFileTransChain == NULL || transId == 0) {
417 return NULL;
418 }
419
420 LIST_FOR_EACH(pos, dFileTransChain) {
421 trans = (DFileTrans *)pos;
422 if (trans->transId == transId) {
423 return trans;
424 }
425 }
426 return NULL;
427 }
428
SearchPeerInfoNode(const DFileSession * session,const struct sockaddr_in * peerAddr)429 static PeerInfo *SearchPeerInfoNode(const DFileSession *session, const struct sockaddr_in *peerAddr)
430 {
431 List *pos = NULL;
432 PeerInfo *peerInfo = NULL;
433 LIST_FOR_EACH(pos, &session->peerInfoChain) {
434 peerInfo = (PeerInfo *)pos;
435 if (memcmp(&peerInfo->dstAddr, peerAddr, sizeof(struct sockaddr_in)) == 0 &&
436 peerInfo->session->sessionId == session->sessionId) {
437 return peerInfo;
438 }
439 }
440 return NULL;
441 }
442
DFileSessionSendSetting(PeerInfo * peerInfo)443 void DFileSessionSendSetting(PeerInfo *peerInfo)
444 {
445 uint32_t timeout = CapsTcp(peerInfo->session) ? TCP_NEGOTIATE_TIMEOUT : DEFAULT_NEGOTIATE_TIMEOUT;
446 uint8_t buf[NSTACKX_DEFAULT_FRAME_SIZE];
447 size_t frameLen = 0;
448 DFileMsg data;
449 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
450 SettingFrame settingFramePara;
451 settingFramePara.connType = peerInfo->connType;
452 settingFramePara.mtu = peerInfo->localMtu;
453 settingFramePara.capability = peerInfo->session->capability & NSTACKX_CAPS_LINK_SEQUENCE;
454 settingFramePara.dataFrameSize = 0;
455 settingFramePara.capsCheck = NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
456 if (peerInfo->session->fileManager->keyLen) {
457 DFileGetCipherCaps(peerInfo->session, &settingFramePara);
458 settingFramePara.deviceBits = DFileGetDeviceBits();
459 }
460 EncodeSettingFrame(buf, NSTACKX_DEFAULT_FRAME_SIZE, &frameLen, &settingFramePara);
461
462 if (peerInfo->settingTimer != NULL) {
463 int32_t ret = DFileWriteHandle(buf, frameLen, peerInfo);
464 if (ret != (int32_t)frameLen && ret != NSTACKX_EAGAIN) {
465 data.errorCode = NSTACKX_EFAILED;
466 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
467 }
468 return;
469 }
470
471 if (peerInfo->session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
472 peerInfo->settingTimer = TimerStart(peerInfo->session->epollfd, timeout,
473 NSTACKX_FALSE, ClientSettingTimeoutHandle, peerInfo);
474 if (peerInfo->settingTimer == NULL) {
475 DFILE_LOGE(TAG, "setting timmer creat fail");
476 data.errorCode = NSTACKX_EFAILED;
477 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
478 return;
479 }
480 } else {
481 peerInfo->settingTimer = TimerStart(peerInfo->session->epollfd, MAX_SERVER_NEGOTIATE_VALID_TIMEOUT,
482 NSTACKX_FALSE, ServerSettingTimeoutHandle, peerInfo);
483 if (peerInfo->settingTimer == NULL) {
484 return;
485 }
486 }
487
488 int32_t ret = DFileWriteHandle(buf, frameLen, peerInfo);
489 if (ret != (int32_t)frameLen && ret != NSTACKX_EAGAIN) {
490 data.errorCode = NSTACKX_EFAILED;
491 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
492 }
493 }
494
SetDFileSessionConfig(DFileSession * session,DFileConfig * dFileConfig,uint16_t connType,PeerInfo * peerInfo)495 static void SetDFileSessionConfig(DFileSession *session, DFileConfig *dFileConfig, uint16_t connType,
496 PeerInfo *peerInfo)
497 {
498 peerInfo->maxSendRate = dFileConfig->sendRate;
499 (void)memset_s(peerInfo->integralLossRate, INTEGRAL_TIME * sizeof(double), 0, INTEGRAL_TIME * sizeof(double));
500 peerInfo->fastStartCounter = 0;
501 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->measureBefore);
502 peerInfo->dataFrameSize = dFileConfig->dataFrameSize;
503 if (connType == CONNECT_TYPE_WLAN) {
504 peerInfo->sendRate = dFileConfig->sendRate / NSTACKX_WLAN_INIT_SPEED_DIVISOR;
505 } else {
506 peerInfo->sendRate = dFileConfig->sendRate / NSTACKX_P2P_INIT_SPEED_DIVISOR;
507 }
508
509 #ifndef NSTACKX_WITH_LITEOS
510 if ((!peerInfo->gotWifiRate) && (connType == CONNECT_TYPE_WLAN)) {
511 peerInfo->sendRate = (uint16_t)(NSTACKX_WLAN_INIT_RATE / MSEC_TICKS_PER_SEC * DATA_FRAME_SEND_INTERVAL_MS
512 / dFileConfig->dataFrameSize + NSTACKX_WLAN_COMPENSATION_RATE);
513 }
514 #endif
515 if (peerInfo->sendRate < NSTACKX_MIN_SENDRATE) {
516 peerInfo->sendRate = NSTACKX_MIN_SENDRATE;
517 }
518
519 if (FileManagerSetMaxFrameLength(session->fileManager, peerInfo->dataFrameSize) != NSTACKX_EOK) {
520 DFILE_LOGE(TAG, "failed to set max frame length");
521 }
522
523 DFILE_LOGI(TAG, "connType is %u set sendrate is %u maxSendRate is %u peerInfo->dataFrameSize is %u",
524 connType, peerInfo->sendRate, peerInfo->maxSendRate, peerInfo->dataFrameSize);
525 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
526 if (FileManagerSetRecvParaWithConnType(session->fileManager, connType) != NSTACKX_EOK) {
527 DFILE_LOGE(TAG, "failed to set recv para");
528 }
529 }
530 }
531
DFileSessionSetPeerInfo(PeerInfo * peerInfo,SettingState state,SettingFrame * settingFrame)532 static void DFileSessionSetPeerInfo(PeerInfo *peerInfo, SettingState state, SettingFrame *settingFrame)
533 {
534 peerInfo->state = state;
535 peerInfo->mtu = settingFrame->mtu;
536 peerInfo->connType = settingFrame->connType;
537 uint16_t localMtu = peerInfo->localMtu;
538 peerInfo->mtuInuse = (localMtu < peerInfo->mtu) ? localMtu : peerInfo->mtu;
539 peerInfo->remoteSessionId = settingFrame->header.sessionId;
540 }
541
DFileSessionHandleClientSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)542 static void DFileSessionHandleClientSetting(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
543 {
544 List *pos = NULL;
545 SettingFrame hostSettingFrame;
546 (void)memset_s(&hostSettingFrame, sizeof(hostSettingFrame), 0, sizeof(hostSettingFrame));
547 DFILE_LOGI(TAG, "handle Setting Frame, DFileSessionType %u", session->sessionType);
548 /* unsupport version */
549 if (DecodeSettingFrame((SettingFrame *)dFileFrame, &hostSettingFrame) != NSTACKX_EOK || hostSettingFrame.mtu == 0) {
550 return;
551 }
552 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
553 if (peerInfo == NULL) {
554 DFILE_LOGE(TAG, "recv unknown peer setting, maybe be attacked");
555 return;
556 }
557 peerInfo->remoteDFileVersion = hostSettingFrame.dFileVersion;
558 TimerDelete(peerInfo->settingTimer);
559 peerInfo->settingTimer = NULL;
560 DFileSessionSetPeerInfo(peerInfo, SETTING_NEGOTIATED, &hostSettingFrame);
561 LIST_FOR_EACH(pos, &session->dFileTransChain) {
562 DFileTrans *trans = (DFileTrans *)pos;
563 if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
564 DFILE_LOGE(TAG, "set trans mtu failed");
565 }
566 }
567 DFileMsg data;
568 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
569 data.errorCode = NSTACKX_EOK;
570 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_SUCCESS, &data);
571
572 DFileConfig dFileConfig;
573 (void)memset_s(&dFileConfig, sizeof(dFileConfig), 0, sizeof(dFileConfig));
574 if (GetDFileConfig(&dFileConfig, peerInfo->mtuInuse, hostSettingFrame.connType) == NSTACKX_EOK) {
575 SetDFileSessionConfig(session, &dFileConfig, hostSettingFrame.connType, peerInfo);
576 }
577 if (hostSettingFrame.capability & NSTACKX_CAPS_LINK_SEQUENCE) {
578 DFILE_LOGI(TAG, "server replies not support Link Sequence");
579 } else {
580 DFILE_LOGI(TAG, "server replies using normal ACK");
581 session->capability &= ~NSTACKX_CAPS_LINK_SEQUENCE;
582 }
583 DFileChooseCipherType(&hostSettingFrame, session);
584 }
585
DFileGetMTU(SocketProtocol protocol)586 static uint16_t DFileGetMTU(SocketProtocol protocol)
587 {
588 /* for udp, return NSTACKX_DEFAULT_FRAME_SIZE, for D2D, need call D2D MTU interface */
589 uint16_t mtu = 0;
590 if (protocol == NSTACKX_PROTOCOL_UDP) {
591 mtu = NSTACKX_DEFAULT_FRAME_SIZE;
592 } else if (protocol == NSTACKX_PROTOCOL_D2D) {
593 DFILE_LOGE(TAG, "d2d not support");
594 } else if (protocol == NSTACKX_PROTOCOL_TCP) {
595 mtu = NSTACKX_DEFAULT_FRAME_SIZE;
596 }
597
598 return mtu;
599 }
600
CreatePeerInfo(DFileSession * session,const struct sockaddr_in * dstAddr,uint16_t peerMtu,uint16_t connType,uint8_t socketIndex)601 PeerInfo *CreatePeerInfo(DFileSession *session, const struct sockaddr_in *dstAddr, uint16_t peerMtu,
602 uint16_t connType, uint8_t socketIndex)
603 {
604 if (session->peerInfoCnt >= MAX_PEERINFO_SIZE) {
605 return NULL;
606 }
607 PeerInfo *peerInfo = calloc(1, sizeof(PeerInfo));
608 if (peerInfo == NULL) {
609 return NULL;
610 }
611 peerInfo->session = session;
612 peerInfo->dstAddr = *dstAddr;
613 peerInfo->connType = connType;
614 peerInfo->socketIndex = socketIndex;
615 peerInfo->localMtu = DFileGetMTU(session->protocol);
616 session->peerInfoCnt++;
617 peerInfo->gotWifiRate = 0;
618 peerInfo->ackInterval = NSTACKX_INIT_ACK_INTERVAL;
619 peerInfo->rateStateInterval = NSTACKX_INIT_RATE_STAT_INTERVAL;
620
621 if (GetInterfaceNameByIP(session->socket[socketIndex]->srcAddr.sin_addr.s_addr,
622 peerInfo->localInterfaceName, sizeof(peerInfo->localInterfaceName)) != NSTACKX_EOK) {
623 DFILE_LOGE(TAG, "GetInterfaceNameByIP failed %d", errno);
624 }
625
626 if (peerMtu == 0) {
627 return peerInfo;
628 }
629 peerInfo->mtu = peerMtu;
630 peerInfo->mtuInuse = (peerInfo->localMtu < peerInfo->mtu) ? peerInfo->localMtu : peerInfo->mtu;
631 return peerInfo;
632 }
633
HandleLinkSeqCap(DFileSession * session,SettingFrame * hostSettingFrame)634 static void HandleLinkSeqCap(DFileSession *session, SettingFrame *hostSettingFrame)
635 {
636 if (hostSettingFrame->capability & NSTACKX_CAPS_LINK_SEQUENCE) {
637 DFILE_LOGI(TAG, "client wants to enable Link Sequence");
638 } else {
639 DFILE_LOGI(TAG, "client wants to use normal ACK");
640 session->capability &= ~NSTACKX_CAPS_LINK_SEQUENCE;
641 }
642
643 if (hostSettingFrame->capability & NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK) {
644 DFILE_LOGI(TAG, "client support recv feedback");
645 session->capsCheck |= NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
646 } else {
647 DFILE_LOGI(TAG, "client do not support recv feedback");
648 session->capsCheck &= ~NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
649 }
650 }
651
AllocPeerInfo(DFileSession * session,const struct sockaddr_in * peerAddr,const SettingFrame * frame,uint8_t socketIndex)652 static PeerInfo *AllocPeerInfo(DFileSession *session, const struct sockaddr_in *peerAddr, const SettingFrame *frame,
653 uint8_t socketIndex)
654 {
655 PeerInfo *peerInfo = NULL;
656
657 peerInfo = CreatePeerInfo(session, peerAddr, frame->mtu, frame->connType, socketIndex);
658 if (peerInfo == NULL) {
659 return NULL;
660 }
661
662 peerInfo->remoteSessionId = frame->header.sessionId;
663 peerInfo->state = SETTING_NEGOTIATING;
664 DFileSessionSendSetting(peerInfo);
665 if (peerInfo->settingTimer == NULL) {
666 free(peerInfo);
667 session->peerInfoCnt--;
668 return NULL;
669 }
670 ListInsertTail(&peerInfo->session->peerInfoChain, &peerInfo->list);
671 return peerInfo;
672 }
673
DFileSessionHandleServerSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr,uint8_t socketIndex)674 static void DFileSessionHandleServerSetting(DFileSession *session, DFileFrame *dFileFrame,
675 struct sockaddr_in *peerAddr, uint8_t socketIndex)
676 {
677 SettingFrame hostSettingFrame;
678 DFileConfig dFileConfig;
679 (void)memset_s(&hostSettingFrame, sizeof(hostSettingFrame), 0, sizeof(hostSettingFrame));
680 (void)memset_s(&dFileConfig, sizeof(dFileConfig), 0, sizeof(dFileConfig));
681 DFILE_LOGI(TAG, "handle Setting Frame, DFileSessionType %u", session->sessionType);
682 if (DecodeSettingFrame((SettingFrame *)dFileFrame, &hostSettingFrame) != NSTACKX_EOK || hostSettingFrame.mtu == 0) {
683 return;
684 }
685
686 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
687 if (peerInfo != NULL) {
688 if (peerInfo->settingTimeoutCnt >= MAX_NEGOTIATE_TIMEOUT_COUNT) {
689 DFILE_LOGE(TAG, "receive more than %d Setting for one peer, drop", MAX_NEGOTIATE_TIMEOUT_COUNT);
690 return;
691 } else {
692 DFileSessionSetPeerInfo(peerInfo, SETTING_NEGOTIATING, &hostSettingFrame);
693 DFileSessionSendSetting(peerInfo);
694 goto L_END;
695 }
696 }
697
698 peerInfo = AllocPeerInfo(session, peerAddr, &hostSettingFrame, socketIndex);
699 if (peerInfo == NULL) {
700 return;
701 }
702
703 L_END:
704 peerInfo->settingTimeoutCnt++;
705 DFILE_LOGI(TAG, "DFileServer response Setting Frame. count %u", peerInfo->settingTimeoutCnt);
706 peerInfo->remoteDFileVersion = hostSettingFrame.dFileVersion;
707 if (GetDFileConfig(&dFileConfig, peerInfo->mtuInuse, hostSettingFrame.connType) == NSTACKX_EOK) {
708 SetDFileSessionConfig(session, &dFileConfig, hostSettingFrame.connType, peerInfo);
709 }
710 HandleLinkSeqCap(session, &hostSettingFrame);
711 DFileChooseCipherType(&hostSettingFrame, session);
712 }
713
DFileSessionHandleSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr,uint8_t socketIndex)714 static void DFileSessionHandleSetting(DFileSession *session, DFileFrame *dFileFrame,
715 struct sockaddr_in *peerAddr, uint8_t socketIndex)
716 {
717 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
718 DFileSessionHandleServerSetting(session, dFileFrame, peerAddr, socketIndex);
719 } else if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
720 DFileSessionHandleClientSetting(session, dFileFrame, peerAddr);
721 } else {
722 return;
723 }
724 }
725
HandleWithoutSettingError(DFileSession * session,const struct sockaddr_in * peerAddr)726 static void HandleWithoutSettingError(DFileSession *session, const struct sockaddr_in *peerAddr)
727 {
728 List *pos = NULL;
729 DFileTrans *trans = NULL;
730
731 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
732 if (peerInfo == NULL) {
733 DFILE_LOGE(TAG, "recv unknown peer rst, maybe be attacked");
734 return;
735 }
736
737 LIST_FOR_EACH(pos, &session->dFileTransChain) {
738 trans = (DFileTrans *)pos;
739 /*
740 * when client recv peer RST NSTACKX_DFILE_WITHOUT_SETTING_ERROR, Set Trans MTU 0,
741 * and will reset after new setting negotion.
742 */
743 if (DFileTransSetMtu(trans, 0) != NSTACKX_EOK) {
744 DFILE_LOGE(TAG, "DFileTransSetMtu(trans, 0) failed %d", errno);
745 }
746 }
747
748 LIST_FOR_EACH(pos, &session->peerInfoChain) {
749 peerInfo = (PeerInfo *)pos;
750 if (memcmp(&peerInfo->dstAddr, peerAddr, sizeof(struct sockaddr_in)) == 0 &&
751 peerInfo->session->sessionId == session->sessionId) {
752 TimerDelete(peerInfo->settingTimer);
753 peerInfo->settingTimer = NULL;
754 peerInfo->state = SETTING_NEGOTIATING;
755 DFILE_LOGD(TAG, "Send Setting Frame");
756 DFileSessionSendSetting(peerInfo);
757 break;
758 }
759 }
760 }
761
DFileSessionHandleRst(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)762 static void DFileSessionHandleRst(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
763 {
764 uint16_t errCode = 0;
765 if (DecodeRstFrame((RstFrame *)dFileFrame, &errCode, NULL, NULL) != NSTACKX_EOK) {
766 return;
767 }
768
769 uint16_t transId = ntohs(dFileFrame->header.transId);
770 DFILE_LOGD(TAG, "handle RST (%hu) frame, transId %hu", errCode, transId);
771
772 switch (errCode) {
773 case NSTACKX_DFILE_WITHOUT_SETTING_ERROR:
774 HandleWithoutSettingError(session, peerAddr);
775 break;
776 default:
777 DFILE_LOGE(TAG, "Unspported error code %hu", errCode);
778 break;
779 }
780 }
781
DFileSessionResolveBackPress(DFileSession * session,DataBackPressure backPress,uint32_t count)782 static void DFileSessionResolveBackPress(DFileSession *session, DataBackPressure backPress, uint32_t count)
783 {
784 uint32_t index;
785
786 if (PthreadMutexLock(&session->backPressLock) != 0) {
787 DFILE_LOGE(TAG, "pthread backPressLock mutex lock failed");
788 return;
789 }
790
791 if (backPress.recvListOverIo == 1) {
792 for (index = 0; index < count; index++) {
793 session->stopSendCnt[index]++;
794 }
795 } else {
796 for (index = 0; index < count; index++) {
797 session->stopSendCnt[index] = 0;
798 }
799 }
800
801 if (PthreadMutexUnlock(&session->backPressLock) != 0) {
802 DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
803 return;
804 }
805
806 return;
807 }
808
DFileSessionHandleBackPressure(DFileSession * session,const DFileFrame * dFileFrame,const struct sockaddr_in * peerAddr)809 static void DFileSessionHandleBackPressure(DFileSession *session, const DFileFrame *dFileFrame,
810 const struct sockaddr_in *peerAddr)
811 {
812 DataBackPressure backPress;
813 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
814 if (peerInfo == NULL) {
815 DFILE_LOGE(TAG, "can't get valid peerinfo");
816 return;
817 }
818
819 if (DecodeBackPressFrame((BackPressureFrame *)dFileFrame, &backPress) != NSTACKX_EOK) {
820 return;
821 }
822
823 DFileSessionResolveBackPress(session, backPress, session->clientSendThreadNum);
824
825 DFILE_LOGI(TAG, "handle back pressure recvListOverIo %u recvBufThreshold %u stopSendPeriod %u",
826 backPress.recvListOverIo, backPress.recvBufThreshold,
827 backPress.stopSendPeriod);
828
829 return;
830 }
831
CreateTrans(uint16_t transId,DFileSession * session,PeerInfo * peerInfo,uint8_t isSender)832 static DFileTrans *CreateTrans(uint16_t transId, DFileSession *session, PeerInfo *peerInfo, uint8_t isSender)
833 {
834 DFileTransPara transPara;
835
836 if (peerInfo == NULL) {
837 return NULL;
838 }
839 (void)memset_s(&transPara, sizeof(transPara), 0, sizeof(transPara));
840 transPara.isSender = isSender;
841 transPara.transId = transId; /* for receiver, use transId of sender */
842 transPara.fileManager = session->fileManager;
843 transPara.writeHandle = DFileWriteHandle;
844 transPara.msgReceiver = DTransMsgReceiver;
845 transPara.connType = peerInfo->connType;
846 transPara.context = peerInfo;
847 transPara.session = session;
848 transPara.onRenameFile = session->onRenameFile;
849
850 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
851 return DFileTransCreate(&transPara);
852 }
853
DFileSessionHandleFrame(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)854 static int32_t DFileSessionHandleFrame(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
855 {
856 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
857 if (peerInfo == NULL) {
858 DFILE_LOGI(TAG, "can't find peerInfo");
859 return NSTACKX_EFAILED;
860 }
861
862 if (peerInfo->session->sessionType == DFILE_SESSION_TYPE_SERVER && peerInfo->state == SETTING_NEGOTIATING) {
863 peerInfo->state = SETTING_NEGOTIATED;
864 TimerDelete(peerInfo->settingTimer);
865 peerInfo->settingTimer = NULL;
866 }
867
868 uint16_t transId = ntohs(dFileFrame->header.transId);
869 if (transId == 0) {
870 DFILE_LOGE(TAG, "transId is 0");
871 return NSTACKX_EFAILED;
872 }
873
874 DFileTrans *trans = SearchDFileTransNode(&(session->dFileTransChain), transId);
875 if (trans == NULL) {
876 if (dFileFrame->header.type != NSTACKX_DFILE_FILE_HEADER_FRAME) {
877 /* Only HEADER frame can start dfile transfer (receiver) */
878 DFILE_LOGE(TAG, "trans %u is NULL && type is %u", transId, dFileFrame->header.type);
879 return NSTACKX_EFAILED;
880 }
881 if (IsTransIdDone(session, transId) == NSTACKX_EOK) {
882 return NSTACKX_EFAILED;
883 }
884 trans = CreateTrans(transId, session, peerInfo, NSTACKX_FALSE);
885 if (trans == NULL) {
886 DFileMsg data;
887 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
888 data.errorCode = NSTACKX_ENOMEM;
889 NotifyMsgRecver(session, DFILE_ON_FATAL_ERROR, &data);
890 DFILE_LOGE(TAG, "trans is NULL");
891 return NSTACKX_EFAILED;
892 }
893 if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
894 DFILE_LOGE(TAG, "set trans mtu failed");
895 }
896 CalculateSessionTransferRatePrepare(session);
897 ListInsertTail(&(session->dFileTransChain), &(trans->list));
898 peerInfo->currentTransCount++;
899 }
900
901 return HandleDFileFrame(trans, dFileFrame);
902 }
903
DFileWriteHandle(const uint8_t * frame,size_t len,void * context)904 int32_t DFileWriteHandle(const uint8_t *frame, size_t len, void *context)
905 {
906 PeerInfo *peerInfo = context;
907 DFileSession *session = peerInfo->session;
908 QueueNode *queueNode = NULL;
909 struct sockaddr_in peerAddr;
910
911 peerAddr = peerInfo->dstAddr;
912 queueNode = CreateQueueNode(frame, len, &peerAddr, peerInfo->socketIndex);
913 if (queueNode == NULL) {
914 return NSTACKX_ENOMEM;
915 }
916
917 if (PthreadMutexLock(&session->outboundQueueLock) != 0) {
918 DFILE_LOGE(TAG, "pthread mutex lock failed");
919 DestroyQueueNode(queueNode);
920 return NSTACKX_EFAILED;
921 }
922 ListInsertTail(&session->outboundQueue, &queueNode->list);
923 session->outboundQueueSize++;
924 if (PthreadMutexUnlock(&session->outboundQueueLock) != 0) {
925 /* Don't need to free node, as it's mount to outboundQueue */
926 DFILE_LOGE(TAG, "pthread mutex unlock failed");
927 return NSTACKX_EFAILED;
928 }
929 SemPost(&session->outboundQueueWait[peerInfo->socketIndex]);
930 return (int32_t)len;
931 }
932
BindMainLoopToTargetCpu(void)933 static void BindMainLoopToTargetCpu(void)
934 {
935 int32_t cpu;
936 int32_t cpus = GetCpuNum();
937 if (cpus >= FIRST_CPU_NUM_LEVEL) {
938 return;
939 } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
940 cpu = CPU_IDX_0;
941 } else {
942 return;
943 }
944 StartThreadBindCore(cpu);
945 }
946
GetEpollWaitTimeOut(DFileSession * session)947 static int64_t GetEpollWaitTimeOut(DFileSession *session)
948 {
949 int64_t minTimeout = DEFAULT_WAIT_TIME_MS;
950 if (session->mainLoopActiveReadFlag && session->inboundQueueSize) {
951 minTimeout = 0;
952 return minTimeout;
953 }
954 List *pos = NULL;
955 DFileTrans *trans = NULL;
956 int64_t timeout;
957 LIST_FOR_EACH(pos, &session->dFileTransChain) {
958 trans = (DFileTrans *)pos;
959 timeout = DFileTransGetTimeout(trans);
960 if (timeout >= 0 && timeout < minTimeout) {
961 minTimeout = timeout;
962 }
963 }
964 if (minTimeout > DEFAULT_WAIT_TIME_MS) {
965 minTimeout = DEFAULT_WAIT_TIME_MS;
966 }
967 return minTimeout;
968 }
969
ProcessSessionTrans(const DFileSession * session,uint16_t exceptTransId)970 static void ProcessSessionTrans(const DFileSession *session, uint16_t exceptTransId)
971 {
972 List *tmp = NULL;
973 List *pos = NULL;
974 LIST_FOR_EACH_SAFE(pos, tmp, &session->dFileTransChain) {
975 DFileTrans *trans = (DFileTrans *)pos;
976 if (trans->transId != exceptTransId) {
977 DFileTransProcess(trans);
978 }
979 }
980 }
981
NotifyBindPort(const DFileSession * session)982 static void NotifyBindPort(const DFileSession *session)
983 {
984 DFileMsg data;
985 int32_t socketNum;
986 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
987 socketNum = 1;
988
989 for (int i = 0; i < socketNum; i++) {
990 data.sockAddr[i].sin_port = ntohs(session->socket[i]->srcAddr.sin_port);
991 data.sockAddr[i].sin_addr.s_addr = ntohl(session->socket[i]->srcAddr.sin_addr.s_addr);
992 }
993 NotifyMsgRecver(session, DFILE_ON_BIND, &data);
994 }
995
DFileMainLoop(void * arg)996 void *DFileMainLoop(void *arg)
997 {
998 DFileSession *session = arg;
999 int32_t ret = NSTACKX_EOK;
1000 DFileMsg msgData;
1001 (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
1002 uint8_t isBind = NSTACKX_FALSE;
1003 DFILE_LOGI(TAG, "main thread start");
1004 SetThreadName(DFFILE_MAIN_THREAD_NAME);
1005 SetMaximumPriorityForThread();
1006 SetTidToBindInfo(session, POS_MAIN_THERD_START);
1007 NotifyBindPort(session);
1008 while (!session->closeFlag) {
1009 int64_t minTimeout = GetEpollWaitTimeOut(session);
1010 ret = EpollLoop(session->epollfd, (int32_t)minTimeout);
1011 if (ret == NSTACKX_EFAILED) {
1012 DFILE_LOGE(TAG, "epoll wait failed");
1013 break;
1014 }
1015 if (isBind == NSTACKX_FALSE && session->transFlag == NSTACKX_TRUE) {
1016 BindMainLoopToTargetCpu();
1017 isBind = NSTACKX_TRUE;
1018 }
1019 ProcessSessionTrans(session, 0);
1020 if (session->mainLoopActiveReadFlag && session->inboundQueueSize) {
1021 session->partReadFlag = NSTACKX_TRUE;
1022 ReadEventHandle(session);
1023 session->partReadFlag = NSTACKX_FALSE;
1024 }
1025 }
1026
1027 if (ret == NSTACKX_EFAILED || DFileSessionCheckFatalFlag(session)) {
1028 msgData.errorCode = NSTACKX_EFAILED;
1029 NotifyMsgRecver(session, DFILE_ON_FATAL_ERROR, &msgData);
1030 }
1031
1032 /* Notify sender thread to terminate */
1033 PostOutboundQueueWait(session);
1034
1035 /* Unblock "select" and notify receiver thread to terminate */
1036 NotifyPipeEvent(session);
1037 return NULL;
1038 }
1039
AmendPeerInfoSendRate(PeerInfo * peerInfo)1040 static void AmendPeerInfoSendRate(PeerInfo *peerInfo)
1041 {
1042 peerInfo->amendSendRate = peerInfo->sendRate;
1043 DFILE_LOGI(TAG, "current: sendrate %u, realsendframerate %u---new amendSendRate %d",
1044 peerInfo->sendRate, peerInfo->sendFrameRate, peerInfo->amendSendRate);
1045 }
1046
DFileSendCalculateRate(DFileSession * session,PeerInfo * peerInfo)1047 static void DFileSendCalculateRate(DFileSession *session, PeerInfo *peerInfo)
1048 {
1049 uint64_t sendCount;
1050
1051 if (session->sessionType != DFILE_SESSION_TYPE_CLIENT) {
1052 return;
1053 }
1054
1055 struct timespec nowTime;
1056 ClockGetTime(CLOCK_MONOTONIC, &nowTime);
1057 uint64_t measureElapse = GetTimeDiffUs(&nowTime, &peerInfo->startTime);
1058 /* just calculate io rate */
1059 if (measureElapse > peerInfo->rateStateInterval) {
1060 /* ONLY PEER 0 calculate io rate */
1061 sendCount = (uint64_t)peerInfo->sendCount;
1062 if (peerInfo->socketIndex == 0) {
1063 session->fileManager->iorRate = (uint32_t)(session->fileManager->iorBytes *
1064 NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1065 DFILE_LOGI(TAG, "IO read rate: %u MB/s send list full times %u", session->fileManager->iorRate,
1066 session->fileManager->sendListFullTimes);
1067 session->fileManager->sendListFullTimes = 0;
1068 session->fileManager->iorBytes = 0;
1069 }
1070
1071 peerInfo->sendFrameRate = (uint32_t)(sendCount * DATA_FRAME_SEND_INTERVAL_US / measureElapse);
1072 peerInfo->sendCountRateMB = (uint32_t)(sendCount * peerInfo->dataFrameSize *
1073 NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1074 if (peerInfo->qdiscSearchNum != 0) {
1075 peerInfo->qdiscAveLeft = peerInfo->qdiscAveLeft / peerInfo->qdiscSearchNum;
1076 }
1077
1078 DFILE_LOGI(TAG, "framesize %u maxsendrate %u sendRate %u, amendSendRate %d sendCount %llu,"
1079 "measureElapse %llu sendFrameRate %u %uMB/s,"
1080 "total send block num %llu eAgainCount %u send list empty times %u sleep times %u, noPendingData %u,"
1081 "min qdisc %u max qdisc %u search num %u ave qdisc %u"
1082 "totalRecvBlocks %llu socket:%u "
1083 "overRun %llu maxRetryCountPerSec %u maxRetryCountLastSec %u wlanCatagory %u",
1084 peerInfo->dataFrameSize, peerInfo->maxSendRate, peerInfo->sendRate, peerInfo->amendSendRate,
1085 peerInfo->sendCount, measureElapse, peerInfo->sendFrameRate, peerInfo->sendCountRateMB,
1086 NSTACKX_ATOM_FETCH(&(session->totalSendBlocks)), peerInfo->eAgainCount,
1087 NSTACKX_ATOM_FETCH(&(session->sendBlockListEmptyTimes)), session->sleepTimes,
1088 NSTACKX_ATOM_FETCH(&(session->noPendingDataTimes)), peerInfo->qdiscMinLeft,
1089 peerInfo->qdiscMaxLeft, peerInfo->qdiscSearchNum, peerInfo->qdiscAveLeft,
1090 NSTACKX_ATOM_FETCH(&(session->totalRecvBlocks)), peerInfo->socketIndex,
1091 peerInfo->overRun, peerInfo->maxRetryCountPerSec, peerInfo->maxRetryCountLastSec, session->wlanCatagory);
1092 AmendPeerInfoSendRate(peerInfo);
1093 ClearSessionStats(session);
1094 ClearPeerinfoStats(peerInfo);
1095 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1096 }
1097 }
1098
UpdateAllTransRetryCount(DFileSession * session,PeerInfo * peerInfo)1099 void UpdateAllTransRetryCount(DFileSession *session, PeerInfo *peerInfo)
1100 {
1101 List *pos = NULL;
1102 DFileTrans *trans = NULL;
1103 uint32_t allRetryCount = 0;
1104 LIST_FOR_EACH(pos, &session->dFileTransChain) { /* for client , only one peer */
1105 trans = (DFileTrans *)pos;
1106 allRetryCount += trans->retryCount;
1107 }
1108 peerInfo->allDtransRetryCount = allRetryCount;
1109 }
1110
DFileRecvCalculateRate(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)1111 static void DFileRecvCalculateRate(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
1112 {
1113 struct timespec nowTime;
1114 uint64_t recvCount;
1115 uint32_t timeOut;
1116
1117 if (session->sessionType != DFILE_SESSION_TYPE_SERVER ||
1118 dFileFrame->header.type != NSTACKX_DFILE_FILE_DATA_FRAME) {
1119 return;
1120 }
1121
1122 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
1123 if (peerInfo == NULL) {
1124 return;
1125 }
1126
1127 timeOut = (peerInfo->connType == CONNECT_TYPE_P2P) ? NSTACKX_P2P_MAX_CONTROL_FRAME_TIMEOUT :
1128 NSTACKX_WLAN_MAX_CONTROL_FRAME_TIMEOUT;
1129
1130 peerInfo->recvCount++;
1131 ClockGetTime(CLOCK_MONOTONIC, &nowTime);
1132 uint64_t measureElapse = GetTimeDiffUs(&nowTime, &session->measureBefore);
1133 if (measureElapse > peerInfo->rateStateInterval) {
1134 session->fileManager->iowRate = (uint32_t)(session->fileManager->iowBytes *
1135 NSTACKX_MICRO_TICKS / measureElapse / DFILE_KILOBYTES);
1136 session->fileManager->iowCount = session->fileManager->iowBytes / peerInfo->dataFrameSize *
1137 (NSTACKX_MILLI_TICKS * timeOut - peerInfo->rateStateInterval) / measureElapse;
1138 if (session->fileManager->iowRate > session->fileManager->iowMaxRate) {
1139 session->fileManager->iowMaxRate = session->fileManager->iowRate;
1140 }
1141 DFILE_LOGI(TAG, "measureElapse %llu iowBytes %llu iowCount %llu IO write rate : %u KB/s", measureElapse,
1142 session->fileManager->iowBytes, session->fileManager->iowCount, session->fileManager->iowRate);
1143 session->fileManager->iowBytes = 0;
1144 ClockGetTime(CLOCK_MONOTONIC, &session->measureBefore);
1145 }
1146
1147 measureElapse = GetTimeDiffUs(&nowTime, &peerInfo->startTime);
1148 if (measureElapse > peerInfo->rateStateInterval) {
1149 recvCount = (uint64_t)peerInfo->recvCount;
1150 peerInfo->recvFrameRate = (uint32_t)(recvCount * DATA_FRAME_SEND_INTERVAL_US / measureElapse);
1151 peerInfo->recvCountRateMB = (uint32_t)(recvCount * peerInfo->dataFrameSize *
1152 NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1153 peerInfo->recvCount = 0;
1154 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1155 }
1156 }
1157
CheckElapseTime(const struct timespec * before,uint64_t overRun)1158 static uint64_t CheckElapseTime(const struct timespec *before, uint64_t overRun)
1159 {
1160 struct timespec now;
1161 ClockGetTime(CLOCK_MONOTONIC, &now);
1162 uint64_t elapseUs = GetTimeDiffUs(&now, before);
1163 elapseUs += overRun;
1164 if (elapseUs < DATA_FRAME_SEND_INTERVAL_US) {
1165 if (usleep((useconds_t)((uint64_t)DATA_FRAME_SEND_INTERVAL_US - elapseUs)) != NSTACKX_EOK) {
1166 DFILE_LOGE(TAG, "usleep(DATA_FRAME_SEND_INTERVAL_US - elapseUs) failed %d", errno);
1167 }
1168 return 0;
1169 } else {
1170 uint64_t delta = (elapseUs - DATA_FRAME_SEND_INTERVAL_US);
1171 return delta;
1172 }
1173 }
1174
BindClientSendThreadToTargetCpu(uint32_t idx)1175 static void BindClientSendThreadToTargetCpu(uint32_t idx)
1176 {
1177 int32_t cpu;
1178 int32_t cpus = GetCpuNum();
1179 if (cpus >= FIRST_CPU_NUM_LEVEL) {
1180 return;
1181 } else if (cpus >= SECOND_CPU_NUM_LEVEL) {
1182 cpu = CPU_IDX_1 + (int32_t)idx;
1183 } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
1184 cpu = CPU_IDX_1;
1185 } else {
1186 return;
1187 }
1188 if (cpu > cpus) {
1189 cpu = cpus - 1;
1190 }
1191 StartThreadBindCore(cpu);
1192 }
1193
DFileSenderUpdateMeasureTime(DFileSession * session,uint8_t socketIndex)1194 static void DFileSenderUpdateMeasureTime(DFileSession *session, uint8_t socketIndex)
1195 {
1196 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
1197 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
1198 if (peerInfo == NULL) {
1199 return;
1200 }
1201 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1202 BindClientSendThreadToTargetCpu(0);
1203 }
1204 }
1205
TerminateMainThreadFatalInner(void * arg)1206 static void TerminateMainThreadFatalInner(void *arg)
1207 {
1208 DFileSession *session = (DFileSession *)arg;
1209 DFileSessionSetFatalFlag(session);
1210 }
1211
PostFatalEvent(DFileSession * session)1212 static void PostFatalEvent(DFileSession *session)
1213 {
1214 if (PostEvent(&session->eventNodeChain, session->epollfd, TerminateMainThreadFatalInner, session) != NSTACKX_EOK) {
1215 DFILE_LOGE(TAG, "PostEvent TerminateMainThreadFatalInner failed");
1216 DFileSessionSetFatalFlag(session);
1217 }
1218 }
1219
GetSocketWaitMs(uint32_t threadNum)1220 static uint32_t GetSocketWaitMs(uint32_t threadNum)
1221 {
1222 if (threadNum > 1) {
1223 return MULTI_THREADS_SOCKET_WAIT_TIME_MS;
1224 }
1225 return DEFAULT_WAIT_TIME_MS;
1226 }
1227
SetSendThreadName(uint32_t threadIdx)1228 static void SetSendThreadName(uint32_t threadIdx)
1229 {
1230 char name[MAX_THREAD_NAME_LEN] = {0};
1231 if (sprintf_s(name, sizeof(name), "%s%u", DFFILE_SEND_THREAD_NAME_PREFIX, threadIdx) < 0) {
1232 DFILE_LOGE(TAG, "sprintf send thead name failed");
1233 }
1234 SetThreadName(name);
1235 }
1236
1237 typedef struct {
1238 struct DFileSession *session;
1239 uint32_t threadIdx;
1240 }SendThreadCtx;
1241
DFileAddiSenderHandle(void * arg)1242 static void *DFileAddiSenderHandle(void *arg)
1243 {
1244 SendThreadCtx *sendThreadCtx = (SendThreadCtx *)arg;
1245 struct DFileSession *session = sendThreadCtx->session;
1246 uint32_t threadIdx = sendThreadCtx->threadIdx;
1247 free(sendThreadCtx);
1248 int32_t ret = NSTACKX_EOK;
1249 DFILE_LOGI(TAG, "send thread %u start", threadIdx);
1250 SetSendThreadName(threadIdx);
1251 SetTidToBindInfo(session, threadIdx + POS_SEND_THERD_START);
1252 List unsent;
1253 uint8_t canWrite = NSTACKX_FALSE;
1254 uint32_t socketWaitMs = GetSocketWaitMs(session->clientSendThreadNum);
1255 BindClientSendThreadToTargetCpu(threadIdx + 1);
1256 ListInitHead(&unsent);
1257 while (!session->addiSenderCloseFlag) {
1258 if (ListIsEmpty(&unsent) && !FileManagerHasPendingData(session->fileManager)) {
1259 NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
1260 SemWait(&session->sendThreadPara[threadIdx].sendWait);
1261 if (session->addiSenderCloseFlag) {
1262 break;
1263 }
1264 }
1265 if (ret == NSTACKX_EAGAIN) {
1266 ret = WaitSocketEvent(NULL, session->socket[0]->sockfd, socketWaitMs, NULL, &canWrite);
1267 if (ret != NSTACKX_EOK || session->closeFlag) {
1268 break;
1269 }
1270 if (!canWrite) {
1271 ret = NSTACKX_EAGAIN;
1272 continue;
1273 }
1274 }
1275 ret = SendDataFrame(session, &unsent, threadIdx, 0);
1276 if ((ret < 0 && ret != NSTACKX_EAGAIN) || session->closeFlag) {
1277 break;
1278 }
1279 if (ret == NSTACKX_EAGAIN) {
1280 continue;
1281 }
1282 SemWait(&session->sendThreadPara[threadIdx].semNewCycle);
1283 }
1284 if (ret < 0 && ret != NSTACKX_EAGAIN) {
1285 PostFatalEvent(session);
1286 }
1287 DestroyIovList(&unsent, session, threadIdx);
1288 return NULL;
1289 }
1290
CloseAddiSendThread(struct DFileSession * session)1291 static void CloseAddiSendThread(struct DFileSession *session)
1292 {
1293 if (session->sessionType == DFILE_SESSION_TYPE_SERVER || session->clientSendThreadNum <= 1) {
1294 return;
1295 }
1296 session->addiSenderCloseFlag = NSTACKX_TRUE;
1297 for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
1298 SemPost(&session->sendThreadPara[i].sendWait);
1299 SemPost(&session->sendThreadPara[i].semNewCycle);
1300 PthreadJoin(session->sendThreadPara[i].senderTid, NULL);
1301 SemDestroy(&session->sendThreadPara[i].sendWait);
1302 SemDestroy(&session->sendThreadPara[i].semNewCycle);
1303 }
1304 }
1305
ErrorHandle(struct DFileSession * session,uint16_t cnt)1306 static void ErrorHandle(struct DFileSession *session, uint16_t cnt)
1307 {
1308 while (cnt > 0) {
1309 SemPost(&session->sendThreadPara[cnt - 1].sendWait);
1310 SemPost(&session->sendThreadPara[cnt - 1].semNewCycle);
1311 PthreadJoin(session->sendThreadPara[cnt - 1].senderTid, NULL);
1312 SemDestroy(&session->sendThreadPara[cnt - 1].sendWait);
1313 SemDestroy(&session->sendThreadPara[cnt - 1].semNewCycle);
1314 cnt--;
1315 }
1316 }
1317
CreateAddiSendThread(struct DFileSession * session)1318 static int32_t CreateAddiSendThread(struct DFileSession *session)
1319 {
1320 uint16_t i;
1321 if (session->sessionType == DFILE_SESSION_TYPE_SERVER || session->clientSendThreadNum <= 1) {
1322 return NSTACKX_EOK;
1323 }
1324 session->addiSenderCloseFlag = NSTACKX_FALSE;
1325 for (i = 0; i < session->clientSendThreadNum - 1; i++) {
1326 SendThreadCtx *sendThreadCtx = (SendThreadCtx *)calloc(1, sizeof(SendThreadCtx));
1327 if (sendThreadCtx == NULL) {
1328 goto L_ERR_SENDER_THREAD;
1329 }
1330
1331 if (SemInit(&session->sendThreadPara[i].sendWait, 0, 0) != 0) {
1332 free(sendThreadCtx);
1333 goto L_ERR_SENDER_THREAD;
1334 }
1335
1336 if (SemInit(&session->sendThreadPara[i].semNewCycle, 0, 0) != 0) {
1337 SemDestroy(&session->sendThreadPara[i].sendWait);
1338 free(sendThreadCtx);
1339 goto L_ERR_SENDER_THREAD;
1340 }
1341
1342 sendThreadCtx->session = session;
1343 sendThreadCtx->threadIdx = i;
1344 if (PthreadCreate(&(session->sendThreadPara[i].senderTid), NULL, DFileAddiSenderHandle, sendThreadCtx)) {
1345 DFILE_LOGE(TAG, "Create sender thread failed");
1346 SemDestroy(&session->sendThreadPara[i].sendWait);
1347 SemDestroy(&session->sendThreadPara[i].semNewCycle);
1348 free(sendThreadCtx);
1349 goto L_ERR_SENDER_THREAD;
1350 }
1351 }
1352 return NSTACKX_EOK;
1353
1354 L_ERR_SENDER_THREAD:
1355 session->addiSenderCloseFlag = NSTACKX_TRUE;
1356 ErrorHandle(session, i);
1357 return NSTACKX_EFAILED;
1358 }
1359
UpdatePeerinfoQdiscInfo(PeerInfo * peerInfo,uint32_t qDiscLeft)1360 static void UpdatePeerinfoQdiscInfo(PeerInfo *peerInfo, uint32_t qDiscLeft)
1361 {
1362 if (peerInfo->qdiscMinLeft == 0) {
1363 peerInfo->qdiscMinLeft = (uint16_t)qDiscLeft;
1364 } else if (peerInfo->qdiscMinLeft > qDiscLeft) {
1365 peerInfo->qdiscMinLeft = (uint16_t)qDiscLeft;
1366 }
1367 if (peerInfo->qdiscMaxLeft < qDiscLeft) {
1368 peerInfo->qdiscMaxLeft = (uint16_t)qDiscLeft;
1369 }
1370 peerInfo->qdiscSearchNum++;
1371 peerInfo->qdiscAveLeft += qDiscLeft;
1372 }
1373
UpdatePeerinfoAmendSendrateByQdisc(PeerInfo * peerInfo)1374 static void UpdatePeerinfoAmendSendrateByQdisc(PeerInfo *peerInfo)
1375 {
1376 uint32_t qDiscLeft;
1377 uint32_t qDiscLeftSendRate;
1378
1379 if (GetQdiscLen(peerInfo->localInterfaceName, ROOT_QUEUE, &qDiscLeft) == NSTACKX_EOK) {
1380 if (peerInfo->mtuInuse == 0) {
1381 return;
1382 }
1383 uint32_t mtuNumInOneFrame = peerInfo->dataFrameSize / peerInfo->mtuInuse;
1384 if (mtuNumInOneFrame == 0) {
1385 mtuNumInOneFrame = 1;
1386 }
1387 UpdatePeerinfoQdiscInfo(peerInfo, qDiscLeft);
1388 qDiscLeftSendRate = qDiscLeft / mtuNumInOneFrame;
1389 if (peerInfo->sendRate >= qDiscLeftSendRate) {
1390 peerInfo->amendSendRate = (int32_t)qDiscLeftSendRate;
1391 } else {
1392 peerInfo->amendSendRate = peerInfo->sendRate;
1393 }
1394 }
1395 return;
1396 }
1397
UpdateInfoAfterSend(DFileSession * session,PeerInfo * peerInfo,int32_t curAmendSendRate,struct timespec * before,uint8_t socketIndex)1398 static void UpdateInfoAfterSend(DFileSession *session, PeerInfo *peerInfo, int32_t curAmendSendRate,
1399 struct timespec *before, uint8_t socketIndex)
1400 {
1401 session->cycleRunning[socketIndex] = NSTACKX_FALSE;
1402 if (curAmendSendRate > 0 && peerInfo->intervalSendCount >= (uint32_t)curAmendSendRate) {
1403 if (peerInfo->decreaseStatus == 1) {
1404 peerInfo->overRun = 0;
1405 peerInfo->decreaseStatus = 0;
1406 }
1407 peerInfo->overRun = CheckElapseTime(before, peerInfo->overRun);
1408 if (peerInfo->overRun == 0) {
1409 session->sleepTimes++;
1410 }
1411 }
1412 peerInfo->intervalSendCount = 0;
1413
1414 UpdatePeerinfoAmendSendrateByQdisc(peerInfo);
1415 }
1416
DFileSessionSendFrame(DFileSession * session,QueueNode ** preQueueNode,List * unsent,struct timespec * before,uint8_t socketIndex)1417 static int32_t DFileSessionSendFrame(DFileSession *session, QueueNode **preQueueNode, List *unsent,
1418 struct timespec *before, uint8_t socketIndex)
1419 {
1420 int32_t ret;
1421
1422 if (CapsTcp(session) && !ListIsEmpty(unsent)) {
1423 session->sendRemain = 1;
1424 ret = SendDataFrame(session, unsent, (uint32_t)(session->clientSendThreadNum - 1), socketIndex);
1425 session->sendRemain = 0;
1426 if ((ret == NSTACKX_EFAILED || ret == NSTACKX_EAGAIN) || session->closeFlag) {
1427 DFILE_LOGI(TAG, "ret is %d", ret);
1428 return ret;
1429 }
1430 }
1431
1432 ret = SendOutboundFrame(session, preQueueNode);
1433 if (session->sessionType != DFILE_SESSION_TYPE_CLIENT) {
1434 return ret;
1435 }
1436 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
1437 if (peerInfo == NULL) {
1438 DFILE_LOGE(TAG, "can't get valid peerinfo");
1439 return NSTACKX_EFAILED;
1440 }
1441 if (peerInfo->mtuInuse == 0) {
1442 return ret;
1443 }
1444
1445 if (ret == NSTACKX_EOK) {
1446 if (!session->cycleRunning[socketIndex]) {
1447 session->cycleRunning[socketIndex] = NSTACKX_TRUE;
1448 }
1449 ret = SendDataFrame(session, unsent, (uint32_t)(session->clientSendThreadNum - 1), socketIndex);
1450 }
1451 int32_t curAmendSendRate = peerInfo->amendSendRate;
1452 DFileSendCalculateRate(session, peerInfo);
1453
1454 if ((ret == NSTACKX_EFAILED || ret == NSTACKX_EAGAIN) || session->closeFlag) {
1455 DFILE_LOGI(TAG, "ret is %d and peerInfo->intervalSendCount is %u", ret, peerInfo->intervalSendCount);
1456 return ret;
1457 }
1458
1459 UpdateInfoAfterSend(session, peerInfo, curAmendSendRate, before, socketIndex);
1460
1461 for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
1462 SemPost(&session->sendThreadPara[i].semNewCycle);
1463 }
1464 return ret;
1465 }
1466
WaitNewSendData(const QueueNode * queueNode,const List * unsent,DFileSession * session,uint8_t socketIndex)1467 static void WaitNewSendData(const QueueNode *queueNode, const List *unsent, DFileSession *session, uint8_t socketIndex)
1468 {
1469 if (queueNode == NULL && ListIsEmpty(unsent) && !FileManagerHasPendingData(session->fileManager) &&
1470 !session->outboundQueueSize) {
1471 NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
1472 SemWait(&session->outboundQueueWait[socketIndex]);
1473 }
1474 }
1475
DFileSenderPre(DFileSession * session,uint8_t socketIndex)1476 static void DFileSenderPre(DFileSession *session, uint8_t socketIndex)
1477 {
1478 SetSendThreadName((uint32_t)(session->clientSendThreadNum + socketIndex - 1));
1479 DFileSenderUpdateMeasureTime(session, socketIndex);
1480 SetMaximumPriorityForThread();
1481 uint32_t pos = (uint32_t)(session->clientSendThreadNum - 1 + socketIndex + POS_SEND_THERD_START);
1482 if (pos >= POS_SEND_THERD_START + NSTACKX_MAX_CLIENT_SEND_THREAD_NUM) {
1483 return;
1484 }
1485
1486 SetTidToBindInfo(session, pos);
1487 }
1488
DFileSenderClose(DFileSession * session,QueueNode * queueNode,List * unsent,void * arg)1489 void DFileSenderClose(DFileSession *session, QueueNode *queueNode, List *unsent, void *arg)
1490 {
1491 DFILE_LOGI(TAG, "DFileSendCalculateRate: total send block num %llu.", session->totalSendBlocks);
1492 CloseAddiSendThread(session);
1493 DestroyIovList(unsent, session, session->clientSendThreadNum - 1U);
1494 DestroyQueueNode(queueNode);
1495 free(arg);
1496 return;
1497 }
1498
DFileSenderHandle(void * arg)1499 void *DFileSenderHandle(void *arg)
1500 {
1501 DFileSession *session = ((SenderThreadPara*)arg)->session;
1502 uint8_t socketIndex = ((SenderThreadPara*)arg)->socketIndex;
1503 QueueNode *queueNode = NULL;
1504 List unsent;
1505 int32_t ret = NSTACKX_EOK;
1506 struct timespec before;
1507 uint8_t canWrite = NSTACKX_FALSE;
1508 uint32_t socketWaitMs = GetSocketWaitMs(session->clientSendThreadNum);
1509 uint8_t isBind = NSTACKX_FALSE;
1510
1511 if (CreateAddiSendThread(session) != NSTACKX_EOK) {
1512 PostFatalEvent(session);
1513 return NULL;
1514 }
1515 ListInitHead(&unsent);
1516 DFileSenderPre(session, socketIndex);
1517 while (!session->closeFlag) {
1518 WaitNewSendData(queueNode, &unsent, session, socketIndex);
1519 if (session->closeFlag) {
1520 break;
1521 }
1522 if (!session->cycleRunning[socketIndex]) {
1523 ClockGetTime(CLOCK_MONOTONIC, &before);
1524 }
1525 if (ret == NSTACKX_EAGAIN) {
1526 ret = WaitSocketEvent(NULL, session->socket[socketIndex]->sockfd, socketWaitMs, NULL, &canWrite);
1527 if (ret != NSTACKX_EOK || session->closeFlag) {
1528 break;
1529 }
1530 if (!canWrite) {
1531 ret = NSTACKX_EAGAIN;
1532 continue;
1533 }
1534 }
1535 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && isBind == NSTACKX_FALSE &&
1536 session->transFlag == NSTACKX_TRUE) {
1537 BindClientSendThreadToTargetCpu(0);
1538 isBind = NSTACKX_TRUE;
1539 }
1540 ret = DFileSessionSendFrame(session, &queueNode, &unsent, &before, socketIndex);
1541 if (ret < 0 && ret != NSTACKX_EAGAIN) {
1542 PeerShuttedEvent();
1543 PostFatalEvent(session);
1544 break;
1545 }
1546 }
1547 DFileSenderClose(session, queueNode, &unsent, arg);
1548 return NULL;
1549 }
1550
ClearDFileFrameList(List * head)1551 static void ClearDFileFrameList(List *head)
1552 {
1553 if (head == NULL || ListIsEmpty(head)) {
1554 return;
1555 }
1556 List *tmp = NULL;
1557 List *pos = NULL;
1558 LIST_FOR_EACH_SAFE(pos, tmp, head) {
1559 QueueNode *node = (QueueNode *)pos;
1560 ListRemoveNode(&node->list);
1561 DestroyQueueNode(node);
1562 }
1563 }
1564
CheckDfileType(uint8_t type)1565 static inline int32_t CheckDfileType(uint8_t type)
1566 {
1567 if (type >= NSTACKX_DFILE_TYPE_MAX || type < NSTACKX_DFILE_FILE_HEADER_FRAME) {
1568 return NSTACKX_EFAILED;
1569 }
1570 return NSTACKX_EOK;
1571 }
1572
CheckSessionType(DFileSessionType type)1573 static inline int32_t CheckSessionType(DFileSessionType type)
1574 {
1575 if (type < DFILE_SESSION_TYPE_CLIENT || type > DFILE_SESSION_TYPE_SERVER) {
1576 return NSTACKX_EFAILED;
1577 }
1578 return NSTACKX_EOK;
1579 }
1580
ProcessDFileFrameList(DFileSession * session,List * head)1581 static void ProcessDFileFrameList(DFileSession *session, List *head)
1582 {
1583 QueueNode *queueNode = NULL;
1584 DFileFrame *dFileFrame = NULL;
1585 struct sockaddr_in *peerAddr = NULL;
1586 int32_t handleFrameRet = NSTACKX_EOK;
1587 while (!ListIsEmpty(head) && !session->closeFlag) {
1588 queueNode = (QueueNode *)ListPopFront(head);
1589 if (queueNode == NULL) {
1590 continue;
1591 }
1592 dFileFrame = (DFileFrame *)(queueNode->frame);
1593 peerAddr = &queueNode->peerAddr;
1594 uint8_t type = dFileFrame->header.type;
1595
1596 if (ntohs(dFileFrame->header.length) > NSTACKX_MAX_FRAME_SIZE - sizeof(DFileFrameHeader)) {
1597 DFILE_LOGE(TAG, "header length %u is too big", ntohs(dFileFrame->header.length));
1598 DestroyQueueNode(queueNode);
1599 continue;
1600 }
1601 if (CheckDfileType(dFileFrame->header.type) != NSTACKX_EOK ||
1602 CheckSessionType(session->sessionType) != NSTACKX_EOK) {
1603 handleFrameRet = NSTACKX_EFAILED;
1604 } else if (dFileFrame->header.type == NSTACKX_DFILE_SETTING_FRAME) {
1605 DFileSessionHandleSetting(session, dFileFrame, peerAddr, queueNode->socketIndex);
1606 } else if (dFileFrame->header.type == NSTACKX_DFILE_RST_FRAME &&
1607 dFileFrame->header.transId == 0) {
1608 DFileSessionHandleRst(session, dFileFrame, peerAddr);
1609 } else if (dFileFrame->header.type == NSTACKX_DFILE_FILE_BACK_PRESSURE_FRAME) {
1610 DFileSessionHandleBackPressure(session, dFileFrame, peerAddr);
1611 } else {
1612 /*
1613 * For NSTACKX_DFILE_FILE_DATA_FRAME, "dFileFrame" may be free when handling failed, and become invalid.
1614 */
1615 handleFrameRet = DFileSessionHandleFrame(session, dFileFrame, peerAddr);
1616 }
1617
1618 if (handleFrameRet != NSTACKX_EOK || type != NSTACKX_DFILE_FILE_DATA_FRAME) {
1619 /* For FILE_DATA frame, the memory is passed to file manager. */
1620 free(queueNode->frame);
1621 queueNode->frame = NULL;
1622 }
1623 free(queueNode);
1624 }
1625 ClearDFileFrameList(head);
1626 }
1627
ReadEventHandle(void * arg)1628 static void ReadEventHandle(void *arg)
1629 {
1630 DFileSession *session = arg;
1631 List newHead;
1632 ListInitHead(&newHead);
1633 struct timespec before, now;
1634 if (!session->partReadFlag) {
1635 NSTACKX_ATOM_FETCH_DEC(&session->unprocessedReadEventCount);
1636 } else {
1637 ClockGetTime(CLOCK_MONOTONIC, &before);
1638 }
1639 while (session->inboundQueueSize && !session->closeFlag) {
1640 if (session->partReadFlag) {
1641 ClockGetTime(CLOCK_MONOTONIC, &now);
1642 if (GetTimeDiffMs(&now, &before) >= DEFAULT_WAIT_TIME_MS) {
1643 break;
1644 }
1645 }
1646 if (PthreadMutexLock(&session->inboundQueueLock) != 0) {
1647 DFILE_LOGE(TAG, "PthreadMutexLock error");
1648 return;
1649 }
1650 ListMove(&session->inboundQueue, &newHead);
1651 session->recvBlockNumInner += session->inboundQueueSize;
1652 session->inboundQueueSize = 0;
1653 if (PthreadMutexUnlock(&session->inboundQueueLock) != 0) {
1654 DFILE_LOGE(TAG, "PthreadMutexUnlock error");
1655 break;
1656 }
1657 ProcessDFileFrameList(session, &newHead);
1658 }
1659 ClearDFileFrameList(&newHead);
1660 }
1661
DFileAddInboundQueue(DFileSession * session,const uint8_t * frame,size_t frameLength,struct sockaddr_in * peerAddr,uint8_t socketIndex)1662 static int32_t DFileAddInboundQueue(DFileSession *session, const uint8_t *frame, size_t frameLength,
1663 struct sockaddr_in *peerAddr, uint8_t socketIndex)
1664 {
1665 if (session->inboundQueueSize > MAX_RECVBUF_COUNT) {
1666 if (session->inboundQueueSize % MAX_NOMEM_PRINT == 0) {
1667 DFILE_LOGI(TAG, "no mem inboundQueueSize:%llu", session->inboundQueueSize);
1668 }
1669 return NSTACKX_ENOMEM;
1670 }
1671 QueueNode *queueNode = CreateQueueNode(frame, frameLength, peerAddr, socketIndex);
1672 if (queueNode == NULL) {
1673 return NSTACKX_ENOMEM;
1674 }
1675
1676 if (PthreadMutexLock(&session->inboundQueueLock) != 0) {
1677 DestroyQueueNode(queueNode);
1678 return NSTACKX_EFAILED;
1679 }
1680 ListInsertTail(&session->inboundQueue, &queueNode->list);
1681 session->inboundQueueSize++;
1682 session->recvBlockNumDirect++;
1683
1684 if (PthreadMutexUnlock(&session->inboundQueueLock) != 0) {
1685 /* queue node is pushed to list, don't need destroy here. */
1686 return NSTACKX_EFAILED;
1687 }
1688
1689 return NSTACKX_EOK;
1690 }
1691
DFileReceiverUpdateSessionMeasureTime(DFileSession * session)1692 static void DFileReceiverUpdateSessionMeasureTime(DFileSession *session)
1693 {
1694 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
1695 ClockGetTime(CLOCK_MONOTONIC, &session->measureBefore);
1696 }
1697 }
1698
BindServerRecvThreadToTargetCpu(DFileSession * session)1699 static void BindServerRecvThreadToTargetCpu(DFileSession *session)
1700 {
1701 int32_t cpu;
1702 int32_t cpus = GetCpuNum();
1703 if (session->sessionType != DFILE_SESSION_TYPE_SERVER) {
1704 cpu = CPU_IDX_2;
1705 StartThreadBindCore(cpu);
1706 return;
1707 }
1708 if (cpus >= FIRST_CPU_NUM_LEVEL) {
1709 return;
1710 } else if (cpus >= SECOND_CPU_NUM_LEVEL) {
1711 cpu = CPU_IDX_1;
1712 } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
1713 cpu = CPU_IDX_0;
1714 } else {
1715 return;
1716 }
1717 StartThreadBindCore(cpu);
1718 }
1719
PostReadEventToMainLoop(DFileSession * session)1720 static void PostReadEventToMainLoop(DFileSession *session)
1721 {
1722 if (NSTACKX_ATOM_FETCH(&(session->unprocessedReadEventCount)) >= MAX_UNPROCESSED_READ_EVENT_COUNT) {
1723 return;
1724 }
1725 NSTACKX_ATOM_FETCH_INC(&session->unprocessedReadEventCount);
1726 if (PostEvent(&session->eventNodeChain, session->epollfd, ReadEventHandle, session) != NSTACKX_EOK) {
1727 DFILE_LOGE(TAG, "post read event failed");
1728 NSTACKX_ATOM_FETCH_DEC(&session->unprocessedReadEventCount);
1729 session->mainLoopActiveReadFlag = NSTACKX_TRUE;
1730 } else {
1731 session->mainLoopActiveReadFlag = NSTACKX_FALSE;
1732 }
1733 }
1734
DFileSessionHandleReadBuffer(DFileSession * session,const uint8_t * buf,size_t bufLen,struct sockaddr_in * peerAddr,uint8_t socketIndex)1735 int32_t DFileSessionHandleReadBuffer(DFileSession *session, const uint8_t *buf, size_t bufLen,
1736 struct sockaddr_in *peerAddr, uint8_t socketIndex)
1737 {
1738 DFileFrame *dFileFrame = NULL;
1739 if (DecodeDFileFrame(buf, bufLen, &dFileFrame) != NSTACKX_EOK) {
1740 /* discard packet with non-zero trans id during cancel stage */
1741 DFILE_LOGE(TAG, "drop malformed frame");
1742 return NSTACKX_EOK;
1743 }
1744
1745 int32_t ret = DFileAddInboundQueue(session, buf, bufLen, peerAddr, socketIndex);
1746 if (ret == NSTACKX_ENOMEM) {
1747 return NSTACKX_EOK;
1748 }
1749 if (ret != NSTACKX_EOK) {
1750 DFILE_LOGE(TAG, "frame add in bound queue failed :%d", ret);
1751 return NSTACKX_EFAILED;
1752 }
1753 PostReadEventToMainLoop(session);
1754 DFileRecvCalculateRate(session, dFileFrame, peerAddr);
1755 return NSTACKX_EOK;
1756 }
1757
DFileRecverPre(DFileSession * session)1758 static void DFileRecverPre(DFileSession *session)
1759 {
1760 SetThreadName(DFFILE_RECV_THREAD_NAME);
1761 DFileReceiverUpdateSessionMeasureTime(session);
1762 SetMaximumPriorityForThread();
1763 SetTidToBindInfo(session, POS_RECV_THERD_START);
1764 }
1765
RcverWaitSocket(DFileSession * session,uint8_t * canRead)1766 int32_t RcverWaitSocket(DFileSession *session, uint8_t *canRead)
1767 {
1768 if (session->acceptFlag == 0) {
1769 return WaitSocketEvent(session, session->socket[0]->sockfd, DEFAULT_WAIT_TIME_MS, canRead, NULL);
1770 } else {
1771 return WaitSocketEvent(session, session->acceptSocket->sockfd, DEFAULT_WAIT_TIME_MS, canRead, NULL);
1772 }
1773 }
1774
DFileSocketRecv(DFileSession * session)1775 int32_t DFileSocketRecv(DFileSession *session)
1776 {
1777 return DFileSocketRecvSP(session);
1778 }
1779
DFileAcceptSocket(DFileSession * session)1780 int32_t DFileAcceptSocket(DFileSession *session)
1781 {
1782 session->acceptSocket = AcceptSocket(session->socket[0]);
1783 if (session->acceptSocket == NULL) {
1784 DFILE_LOGI(TAG, "accept socket failed");
1785 return NSTACKX_EFAILED;
1786 } else {
1787 DFILE_LOGI(TAG, "accept socket success");
1788 session->acceptFlag = 1;
1789 SetTcpKeepAlive(session->acceptSocket->sockfd);
1790 }
1791
1792 AcceptSocketEvent();
1793
1794 return NSTACKX_EOK;
1795 }
1796
DFileReceiverHandle(void * arg)1797 void *DFileReceiverHandle(void *arg)
1798 {
1799 DFileSession *session = arg;
1800 uint8_t canRead = NSTACKX_FALSE;
1801 int32_t ret = NSTACKX_EAGAIN;
1802 uint8_t isBind = NSTACKX_FALSE;
1803
1804 DFILE_LOGI(TAG, "recv thread start");
1805 DFileRecverPre(session);
1806 while (!session->closeFlag) {
1807 if (ret == NSTACKX_EAGAIN || !canRead) {
1808 ret = RcverWaitSocket(session, &canRead);
1809 if (ret != NSTACKX_EOK || session->closeFlag) {
1810 break;
1811 }
1812 }
1813 if (!canRead) {
1814 continue;
1815 }
1816 if (isBind == NSTACKX_FALSE && session->transFlag == NSTACKX_TRUE) {
1817 BindServerRecvThreadToTargetCpu(session);
1818 isBind = NSTACKX_TRUE;
1819 }
1820
1821 ret = DFileSocketRecv(session);
1822 if (ret != NSTACKX_EAGAIN && ret != NSTACKX_EOK) {
1823 PeerShuttedEvent();
1824 break;
1825 }
1826 }
1827 DFILE_LOGI(TAG, "Total recv blocks: direct %llu inner %llu", session->recvBlockNumDirect,
1828 session->recvBlockNumInner);
1829 if (ret < 0 && ret != NSTACKX_EAGAIN && ret != NSTACKX_PEER_CLOSE) {
1830 PostFatalEvent(session);
1831 }
1832
1833 DFILE_LOGI(TAG, "session %u Exit receiver thread ret %d", session->sessionId, ret);
1834 return NULL;
1835 }
1836
DFileControlHandle(void * arg)1837 void *DFileControlHandle(void *arg)
1838 {
1839 SetThreadName(DFFILE_CONTROL_THREAD_NAME);
1840 DFileSession *session = arg;
1841 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
1842 DFileSenderControlHandle(session);
1843 } else {
1844 DFileReceiverControlHandle(session);
1845 }
1846 return NULL;
1847 }
1848
RealPathFileName(FileListInfo * fileListInfo)1849 static int32_t RealPathFileName(FileListInfo *fileListInfo)
1850 {
1851 uint32_t i;
1852 int32_t ret = NSTACKX_EOK;
1853 for (i = 0; i < fileListInfo->fileNum; i++) {
1854 char *tmpFileName = fileListInfo->files[i];
1855 char *tmpFileNameRes = realpath(tmpFileName, NULL);
1856 if (tmpFileNameRes == NULL) {
1857 ret = NSTACKX_EFAILED;
1858 break;
1859 }
1860 fileListInfo->files[i] = tmpFileNameRes;
1861 free(tmpFileName);
1862 }
1863 if (ret != NSTACKX_EOK) {
1864 DFILE_LOGE(TAG, "realpath failed");
1865 }
1866 return ret;
1867 }
1868
FreeTransFileListInfo(FileListInfo * fileListInfo)1869 static void FreeTransFileListInfo(FileListInfo *fileListInfo)
1870 {
1871 free(fileListInfo->files);
1872 fileListInfo->files = NULL;
1873 if (fileListInfo->remotePath != NULL) {
1874 free(fileListInfo->remotePath);
1875 fileListInfo->remotePath = NULL;
1876 }
1877 free(fileListInfo);
1878 }
1879
DFileStartTransInner(DFileSession * session,FileListInfo * fileListInfo)1880 static int32_t DFileStartTransInner(DFileSession *session, FileListInfo *fileListInfo)
1881 {
1882 uint16_t transId = session->lastDFileTransId + 1;
1883 if (transId == 0) { /* overflow */
1884 transId = 1;
1885 }
1886
1887 PeerInfo *peerInfo = TransSelectPeerInfo(session);
1888 DFileTrans *trans = CreateTrans(transId, session, peerInfo, NSTACKX_TRUE);
1889 if (trans == NULL) {
1890 DFILE_LOGE(TAG, "CreateTrans error");
1891 return NSTACKX_ENOMEM;
1892 }
1893
1894 if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
1895 DFILE_LOGE(TAG, "set trans mtu failed");
1896 }
1897 if (RealPathFileName(fileListInfo) != NSTACKX_EOK) {
1898 DFileTransDestroy(trans);
1899 return NSTACKX_EFAILED;
1900 }
1901 int32_t ret = DFileTransSendFiles(trans, fileListInfo);
1902 if (ret != NSTACKX_EOK) {
1903 DFileTransDestroy(trans);
1904 DFILE_LOGE(TAG, "DFileTransSendFiles fail, error: %d", ret);
1905 return ret;
1906 }
1907 ret = DFileTransAddExtraInfo(trans, fileListInfo->pathType, fileListInfo->noticeFileNameType,
1908 fileListInfo->userData);
1909 if (ret != NSTACKX_EOK) {
1910 DFILE_LOGE(TAG, "DFileTransAddExtraInfo fail");
1911 DFileTransDestroy(trans);
1912 return NSTACKX_EFAILED;
1913 }
1914 trans->fileList->tarFlag = fileListInfo->tarFlag;
1915 trans->fileList->smallFlag = fileListInfo->smallFlag;
1916 trans->fileList->tarFile = fileListInfo->tarFile;
1917 trans->fileList->noSyncFlag = fileListInfo->noSyncFlag;
1918
1919 fileListInfo->userData = NULL;
1920 ListInsertTail(&(session->dFileTransChain), &(trans->list));
1921 session->lastDFileTransId = transId;
1922 if (fileListInfo->smallFlag == NSTACKX_TRUE) {
1923 session->smallListProcessingCnt++;
1924 } else {
1925 session->fileListProcessingCnt++;
1926 }
1927 /* Elements in ctx->fileListInfo->files[] are reused by dFileTranns, so don't need to free. */
1928 FreeTransFileListInfo(fileListInfo);
1929 return NSTACKX_EOK;
1930 }
1931
DFileStartTrans(DFileSession * session,FileListInfo * fileListInfo)1932 int32_t DFileStartTrans(DFileSession *session, FileListInfo *fileListInfo)
1933 {
1934 if (PthreadMutexLock(&session->transIdLock) != 0) {
1935 DFILE_LOGE(TAG, "pthread mutex lock error");
1936 return NSTACKX_EFAILED;
1937 }
1938 int32_t ret = DFileStartTransInner(session, fileListInfo);
1939 if (PthreadMutexUnlock(&session->transIdLock) != 0) {
1940 DFILE_LOGE(TAG, "pthread mutex unlock error");
1941 }
1942 return ret;
1943 }
1944
TerminateMainThreadInner(void * arg)1945 void TerminateMainThreadInner(void *arg)
1946 {
1947 DFileSession *session = (DFileSession *)arg;
1948 DFileSessionSetTerminateFlag(session);
1949 }
1950
StartDFileThreadsInner(DFileSession * session)1951 int32_t StartDFileThreadsInner(DFileSession *session)
1952 {
1953 if (PthreadCreate(&(session->tid), NULL, DFileMainLoop, session)) {
1954 DFILE_LOGE(TAG, "Create mainloop thread failed");
1955 goto L_ERR_MAIN_LOOP_THREAD;
1956 }
1957
1958 if (CreateSenderThread(session) != NSTACKX_EOK) {
1959 goto L_ERR_SENDER_THREAD;
1960 }
1961
1962 if (PthreadCreate(&(session->receiverTid), NULL, DFileReceiverHandle, session)) {
1963 DFILE_LOGE(TAG, "Create receiver thread failed");
1964 goto L_ERR_RECEIVER_THREAD;
1965 }
1966
1967 if (PthreadCreate(&(session->controlTid), NULL, DFileControlHandle, session)) {
1968 DFILE_LOGE(TAG, "Create control thread failed");
1969 goto L_ERR_CONTROL_THREAD;
1970 }
1971 return NSTACKX_EOK;
1972 L_ERR_CONTROL_THREAD:
1973 DFileSessionSetTerminateFlag(session);
1974 PthreadJoin(session->controlTid, NULL);
1975 session->receiverTid = INVALID_TID;
1976 L_ERR_RECEIVER_THREAD:
1977 DFileSessionSetTerminateFlag(session);
1978 PthreadJoin(session->senderTid[0], NULL);
1979 session->senderTid[0] = INVALID_TID;
1980 PostOutboundQueueWait(session);
1981 L_ERR_SENDER_THREAD:
1982 DFileSessionSetTerminateFlag(session);
1983 if (PostEvent(&session->eventNodeChain, session->epollfd, TerminateMainThreadInner, session) != NSTACKX_EOK) {
1984 DFILE_LOGE(TAG, "post terminate thread failed");
1985 }
1986 PthreadJoin(session->tid, NULL);
1987 session->tid = INVALID_TID;
1988 L_ERR_MAIN_LOOP_THREAD:
1989 return NSTACKX_EFAILED;
1990 }
1991
FileManagerMsgHandle(FileManagerMsgType msgType,int32_t errCode,void * context)1992 static void FileManagerMsgHandle(FileManagerMsgType msgType, int32_t errCode, void *context)
1993 {
1994 DFileSession *session = context;
1995 if (msgType == FILE_MANAGER_INNER_ERROR) {
1996 DFILE_LOGE(TAG, "Session (%u) fatal error -- File Manager error: %d", session->sessionId, errCode);
1997 PostFatalEvent(session);
1998 }
1999
2000 if (msgType == FILE_MANAGER_IN_PROGRESS) {
2001 NoticeSessionProgress(session);
2002 }
2003 }
2004
CreateFileManager(DFileSession * session,const uint8_t * key,uint32_t keyLen,uint8_t isSender,uint16_t connType)2005 int32_t CreateFileManager(DFileSession *session, const uint8_t *key, uint32_t keyLen, uint8_t isSender,
2006 uint16_t connType)
2007 {
2008 FileManagerMsgPara msgPara;
2009 if (session == NULL) {
2010 DFILE_LOGE(TAG, "invalid input");
2011 return NSTACKX_EINVAL;
2012 }
2013 if (isSender && (connType != CONNECT_TYPE_P2P && connType != CONNECT_TYPE_WLAN)) {
2014 DFILE_LOGE(TAG, "connType for sender is illagal");
2015 return NSTACKX_EINVAL;
2016 }
2017 if (keyLen > 0) {
2018 if ((keyLen != AES_128_KEY_LENGTH && keyLen != CHACHA20_KEY_LENGTH) || key == NULL) {
2019 DFILE_LOGE(TAG, "error key or key len");
2020 return NSTACKX_EFAILED;
2021 }
2022 if (!IsCryptoIncluded()) {
2023 DFILE_LOGE(TAG, "crypto is not opened");
2024 return NSTACKX_EFAILED;
2025 }
2026 }
2027 msgPara.epollfd = session->epollfd;
2028 msgPara.eventNodeChain = &session->eventNodeChain;
2029 msgPara.msgReceiver = FileManagerMsgHandle;
2030 msgPara.context = session;
2031 session->fileManager = FileManagerCreate(isSender, &msgPara, key, keyLen, connType);
2032 if (session->fileManager == NULL) {
2033 DFILE_LOGE(TAG, "create filemanager failed");
2034 return NSTACKX_EFAILED;
2035 }
2036 return NSTACKX_EOK;
2037 }
2038
DestroyReceiverPipe(DFileSession * session)2039 void DestroyReceiverPipe(DFileSession *session)
2040 {
2041 if (session->receiverPipe[PIPE_OUT] != INVALID_PIPE_DESC) {
2042 CloseDesc(session->receiverPipe[PIPE_OUT]);
2043 session->receiverPipe[PIPE_OUT] = INVALID_PIPE_DESC;
2044 }
2045 if (session->receiverPipe[PIPE_IN] != INVALID_PIPE_DESC) {
2046 CloseDesc(session->receiverPipe[PIPE_IN]);
2047 session->receiverPipe[PIPE_IN] = INVALID_PIPE_DESC;
2048 }
2049 }
2050
DFileSetEvent(void * softObj,DFileEventFunc func)2051 void DFileSetEvent(void *softObj, DFileEventFunc func)
2052 {
2053 g_dfileEventFunc = func;
2054 (void)softObj;
2055 }
2056
NSTACKX_DFileAssembleFunc(void * softObj,const DFileEvent * info)2057 void NSTACKX_DFileAssembleFunc(void *softObj, const DFileEvent *info)
2058 {
2059 if (g_dfileEventFunc != NULL) {
2060 g_dfileEventFunc(softObj, info);
2061 }
2062 }
2063