1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "nstackx_dfile_session.h"
17
18 #include "nstackx_congestion.h"
19 #include "nstackx_dfile.h"
20 #include "nstackx_dfile_config.h"
21 #include "nstackx_dfile_frame.h"
22 #include "nstackx_dfile_send.h"
23 #include "nstackx_epoll.h"
24 #include "nstackx_error.h"
25 #include "nstackx_event.h"
26 #include "nstackx_dfile_log.h"
27 #ifdef MBEDTLS_INCLUDED
28 #include "nstackx_mbedtls.h"
29 #else
30 #include "nstackx_openssl.h"
31 #endif
32 #include "nstackx_timer.h"
33 #include "nstackx_dfile_control.h"
34 #include "nstackx_dfile_mp.h"
35 #include "securec.h"
36 #include "nstackx_util.h"
37 #include "nstackx_dfile_dfx.h"
38
39 #define TAG "nStackXDFile"
40
41 #define DEFAULT_WAIT_TIME_MS 1000
42 #define MULTI_THREADS_SOCKET_WAIT_TIME_MS 1
43 #define DEFAULT_NEGOTIATE_TIMEOUT 1000
44 #define TCP_NEGOTIATE_TIMEOUT 10000
45 #define MAX_SERVER_NEGOTIATE_VALID_TIMEOUT (600 * DEFAULT_NEGOTIATE_TIMEOUT)
46 #define MAX_NEGOTIATE_TIMEOUT_COUNT 3
47 #define MAX_UNPROCESSED_READ_EVENT_COUNT 3
48 #define MAX_RECVBUF_COUNT 130000
49 #define MAX_NOMEM_PRINT 10000
50
51 static void ReadEventHandle(void *arg);
52 static void ProcessSessionTrans(const DFileSession *session, uint16_t exceptTransId);
53 DFileEventFunc g_dfileEventFunc;
54
CreateQueueNode(const uint8_t * frame,size_t length,const struct sockaddr_in * peerAddr,uint8_t socketIndex)55 static QueueNode *CreateQueueNode(const uint8_t *frame, size_t length,
56 const struct sockaddr_in *peerAddr, uint8_t socketIndex)
57 {
58 QueueNode *queueNode = NULL;
59
60 if (frame == NULL || length == 0 || length > NSTACKX_MAX_FRAME_SIZE) {
61 return NULL;
62 }
63
64 queueNode = calloc(1, sizeof(QueueNode));
65 if (queueNode == NULL) {
66 return NULL;
67 }
68
69 queueNode->frame = calloc(1, length);
70 if (queueNode->frame == NULL) {
71 free(queueNode);
72 return NULL;
73 }
74 queueNode->length = length;
75 queueNode->sendLen = 0;
76
77 if (memcpy_s(queueNode->frame, length, frame, length) != EOK) {
78 free(queueNode->frame);
79 free(queueNode);
80 return NULL;
81 }
82 if (peerAddr != NULL) {
83 if (memcpy_s(&queueNode->peerAddr, sizeof(struct sockaddr_in), peerAddr, sizeof(struct sockaddr_in)) != EOK) {
84 free(queueNode->frame);
85 free(queueNode);
86 return NULL;
87 }
88 }
89 queueNode->socketIndex = socketIndex;
90
91 return queueNode;
92 }
93
DestroyQueueNode(QueueNode * queueNode)94 void DestroyQueueNode(QueueNode *queueNode)
95 {
96 if (queueNode != NULL) {
97 free(queueNode->frame);
98 free(queueNode);
99 }
100 }
101
NotifyMsgRecver(const DFileSession * session,DFileMsgType msgType,const DFileMsg * msg)102 void NotifyMsgRecver(const DFileSession *session, DFileMsgType msgType, const DFileMsg *msg)
103 {
104 if (session == NULL) {
105 DFILE_LOGI(TAG, "session is NULL");
106 return;
107 }
108
109 if (session->msgReceiver == NULL) {
110 DFILE_LOGI(TAG, "msgReceiver is NULL");
111 return;
112 }
113
114 session->msgReceiver(session->sessionId, msgType, msg);
115 }
116
CalculateSessionTransferRatePrepare(DFileSession * session)117 void CalculateSessionTransferRatePrepare(DFileSession *session)
118 {
119 #ifdef NSTACKX_SMALL_FILE_SUPPORT
120 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && (!ListIsEmpty(&session->pendingFileLists)
121 || !ListIsEmpty(&session->smallFileLists))) {
122 return;
123 }
124 #else
125 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && !ListIsEmpty(&session->pendingFileLists)) {
126 return;
127 }
128 #endif
129 if (!ListIsEmpty(&session->dFileTransChain)) {
130 return;
131 }
132 DFILE_LOGI(TAG, "begin to calculate transfer rate");
133 session->bytesTransferred = 0;
134 session->transCount = 0;
135 ClockGetTime(CLOCK_MONOTONIC, &session->startTs);
136 }
137
138 #ifdef NSTACKX_SMALL_FILE_SUPPORT
SendSmallList(DFileSession * session)139 static int32_t SendSmallList(DFileSession *session)
140 {
141 FileListInfo *fileListInfo = NULL;
142 DFileMsg data;
143 if (session->fileListProcessingCnt > 0 || session->fileListPendingCnt == 0) {
144 while (!ListIsEmpty(&session->smallFileLists)) {
145 fileListInfo = (FileListInfo *)ListPopFront(&session->smallFileLists);
146 session->smallListPendingCnt--;
147 if (fileListInfo == NULL) {
148 continue;
149 }
150 int32_t ret = DFileStartTrans(session, fileListInfo);
151 if (ret == NSTACKX_EOK) {
152 return NSTACKX_TRUE;
153 }
154 DFILE_LOGE(TAG, "DFileStartTrans fail, error: %d", ret);
155 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
156 data.errorCode = ret;
157 data.fileList.files = (const char **)fileListInfo->files;
158 data.fileList.fileNum = fileListInfo->fileNum;
159 data.fileList.userData = fileListInfo->userData;
160 NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, &data);
161 DestroyFileListInfo(fileListInfo);
162 }
163 }
164 return NSTACKX_FALSE;
165 }
166 #endif
167
SendPendingList(DFileSession * session)168 static void SendPendingList(DFileSession *session)
169 {
170 FileListInfo *fileListInfo = NULL;
171 DFileMsg data;
172 while (!ListIsEmpty(&session->pendingFileLists)) {
173 fileListInfo = (FileListInfo *)ListPopFront(&session->pendingFileLists);
174 session->fileListPendingCnt--;
175 if (fileListInfo == NULL) {
176 continue;
177 }
178 int32_t ret = DFileStartTrans(session, fileListInfo);
179 if (ret == NSTACKX_EOK) {
180 break;
181 }
182 DFILE_LOGE(TAG, "DFileStartTrans fail, error: %d", ret);
183 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
184 data.errorCode = ret;
185 data.fileList.files = (const char **)fileListInfo->files;
186 data.fileList.fileNum = fileListInfo->fileNum;
187 data.fileList.userData = fileListInfo->userData;
188 NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, &data);
189 DestroyFileListInfo(fileListInfo);
190 }
191 }
192
SendSmallAndPendingList(DFileSession * session)193 static void SendSmallAndPendingList(DFileSession *session)
194 {
195 #ifdef NSTACKX_SMALL_FILE_SUPPORT
196 DFILE_LOGI(TAG, "fileListPendingCnt %u fileListProcessingCnt %u smallListPendingCnt %u smallListProcessingCnt %u",
197 session->fileListPendingCnt, session->fileListProcessingCnt, session->smallListPendingCnt,
198 session->smallListProcessingCnt);
199 if (SendSmallList(session) != NSTACKX_TRUE) {
200 SendPendingList(session);
201 }
202 #else
203 DFILE_LOGI(TAG, "fileListPendingCnt %u fileListProcessingCnt %u",
204 session->fileListPendingCnt, session->fileListProcessingCnt);
205 SendPendingList(session);
206 #endif
207 }
208
NoticeSessionProgress(DFileSession * session)209 void NoticeSessionProgress(DFileSession *session)
210 {
211 DFileMsg data;
212 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
213 if ((FileManagerGetTotalBytes(session->fileManager, &data.transferUpdate.totalBytes) == NSTACKX_EOK) &&
214 (FileManagerGetBytesTransferred(session->fileManager, &data.transferUpdate.bytesTransferred) == NSTACKX_EOK) &&
215 (data.transferUpdate.bytesTransferred <= data.transferUpdate.totalBytes) &&
216 (data.transferUpdate.bytesTransferred > 0)) {
217 NotifyMsgRecver(session, DFILE_ON_SESSION_IN_PROGRESS, &data);
218 }
219 }
220
UpdateMsgProcessInfo(const DFileSession * session,struct DFileTrans * dFileTrans,DFileTransMsgType msgType,DFileTransMsg * msg)221 static void UpdateMsgProcessInfo(const DFileSession *session, struct DFileTrans *dFileTrans,
222 DFileTransMsgType msgType, DFileTransMsg *msg)
223 {
224 uint64_t totalBytes = 0;
225 uint64_t bytesTrans = 0;
226
227 if (session == NULL || dFileTrans == NULL || msg == NULL ||
228 (msgType != DFILE_TRANS_MSG_FILE_RECEIVED && msgType != DFILE_TRANS_MSG_FILE_RECEIVE_FAIL &&
229 msgType != DFILE_TRANS_MSG_FILE_SEND_FAIL && msgType != DFILE_TRANS_MSG_FILE_SENT)) {
230 return;
231 }
232
233 msg->transferUpdate.transId = dFileTrans->transId;
234
235 if (msgType == DFILE_TRANS_MSG_FILE_RECEIVED || msgType == DFILE_TRANS_MSG_FILE_SENT) {
236 totalBytes = DFileTransGetTotalBytes(dFileTrans);
237 if (totalBytes > 0) {
238 bytesTrans = totalBytes;
239 goto L_END;
240 }
241 }
242
243 if (FileManagerGetTransUpdateInfo(session->fileManager, dFileTrans->transId, &totalBytes, &bytesTrans) !=
244 NSTACKX_EOK) {
245 return;
246 }
247
248 L_END:
249 msg->transferUpdate.totalBytes = totalBytes;
250 msg->transferUpdate.bytesTransferred = bytesTrans;
251 if (msgType == DFILE_TRANS_MSG_FILE_SENT) {
252 if (dFileTrans->fileList->vtransFlag) {
253 msg->fileList.transId = dFileTrans->fileList->vtransRealTransId;
254 msg->transferUpdate.transId = dFileTrans->fileList->vtransRealTransId;
255 }
256 NotifyMsgRecver(session, DFILE_ON_TRANS_IN_PROGRESS, msg);
257 }
258 }
259
WakeSendThread(DFileSession * session,uint8_t isSender,uint8_t socketIndex)260 static void WakeSendThread(DFileSession *session, uint8_t isSender, uint8_t socketIndex)
261 {
262 SemPost(&session->outboundQueueWait[socketIndex]);
263 if (isSender && session->clientSendThreadNum > 1) {
264 for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
265 SemPost(&session->sendThreadPara[i].sendWait);
266 }
267 }
268 }
269
CalculateSessionTransferRate(DFileSession * session,uint64_t totalBytes,DFileTransMsgType msgType)270 static void CalculateSessionTransferRate(DFileSession *session, uint64_t totalBytes, DFileTransMsgType msgType)
271 {
272 if (msgType != DFILE_TRANS_MSG_FILE_SENT && msgType != DFILE_TRANS_MSG_END) {
273 return;
274 }
275 if (totalBytes <= UINT64_MAX - session->bytesTransferred) {
276 session->bytesTransferred += totalBytes;
277 } else {
278 session->bytesTransferred = UINT64_MAX;
279 }
280 session->transCount++;
281 if (!ListIsEmpty(&session->dFileTransChain)) {
282 return;
283 }
284 #ifdef NSTACKX_SMALL_FILE_SUPPORT
285 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && (!ListIsEmpty(&session->pendingFileLists)
286 || !ListIsEmpty(&session->smallFileLists))) {
287 return;
288 }
289 #else
290 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && !ListIsEmpty(&session->pendingFileLists)) {
291 return;
292 }
293 #endif
294 struct timespec endTs;
295 ClockGetTime(CLOCK_MONOTONIC, &endTs);
296
297 uint32_t spendTime = GetTimeDiffMs(&endTs, &session->startTs);
298 if (spendTime == 0) {
299 return;
300 }
301 const double rate = 1.0 * session->bytesTransferred / DFILE_MEGABYTES * MSEC_TICKS_PER_SEC / spendTime;
302 DFILE_LOGI(TAG, "Total %u trans, %llu bytes, used %u ms. rate %.2f MB/s",
303 session->transCount, session->bytesTransferred, spendTime, rate);
304 DFileMsg msgData;
305 (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
306 msgData.rate = (uint32_t)rate;
307 NotifyMsgRecver(session, DFILE_ON_SESSION_TRANSFER_RATE, &msgData);
308
309 TransferCompleteEvent(rate);
310 }
311
CheckTransDone(DFileSession * session,struct DFileTrans * dFileTrans,DFileTransMsgType msgType)312 static void CheckTransDone(DFileSession *session, struct DFileTrans *dFileTrans, DFileTransMsgType msgType)
313 {
314 if (msgType == DFILE_TRANS_MSG_FILE_RECEIVE_FAIL || msgType == DFILE_TRANS_MSG_FILE_SENT ||
315 msgType == DFILE_TRANS_MSG_FILE_SEND_FAIL || msgType == DFILE_TRANS_MSG_END) {
316 uint8_t flag = dFileTrans->fileList->smallFlag;
317 if (SetTransIdState(session, dFileTrans->transId, STATE_TRANS_DONE) != NSTACKX_EOK) {
318 DFILE_LOGE(TAG, "set trans id state fail");
319 }
320 ((PeerInfo *)dFileTrans->context)->currentTransCount--;
321 ListRemoveNode(&dFileTrans->list);
322 uint64_t totalBytes = DFileTransGetTotalBytes(dFileTrans);
323 DFileTransDestroy(dFileTrans);
324 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
325 if (flag == NSTACKX_TRUE && session->smallListProcessingCnt > 0) {
326 session->smallListProcessingCnt--;
327 } else if (session->fileListProcessingCnt > 0) {
328 session->fileListProcessingCnt--;
329 }
330 SendSmallAndPendingList(session);
331 }
332 CalculateSessionTransferRate(session, totalBytes, msgType);
333 }
334 }
335
DTransMsgReceiver(struct DFileTrans * dFileTrans,DFileTransMsgType msgType,DFileTransMsg * msg)336 static void DTransMsgReceiver(struct DFileTrans *dFileTrans, DFileTransMsgType msgType,
337 DFileTransMsg *msg)
338 {
339 PeerInfo *peerInfo = dFileTrans->context;
340 DFileSession *session = peerInfo->session;
341
342 UpdateMsgProcessInfo(session, dFileTrans, msgType, msg);
343
344 switch (msgType) {
345 case DFILE_TRANS_MSG_FILE_SEND_DATA:
346 WakeSendThread(session, dFileTrans->isSender, peerInfo->socketIndex);
347 break;
348 case DFILE_TRANS_MSG_FILE_LIST_RECEIVED:
349 NotifyMsgRecver(session, DFILE_ON_FILE_LIST_RECEIVED, msg);
350 break;
351 case DFILE_TRANS_MSG_FILE_RECEIVED: /* Receiver receive all the file data */
352 NotifyMsgRecver(session, DFILE_ON_FILE_RECEIVE_SUCCESS, msg);
353 break;
354 case DFILE_TRANS_MSG_FILE_RECEIVED_TO_FAIL:
355 case DFILE_TRANS_MSG_FILE_RECEIVE_FAIL:
356 NotifyMsgRecver(session, DFILE_ON_FILE_RECEIVE_FAIL, msg);
357 break;
358 case DFILE_TRANS_MSG_FILE_SENT: /* Sender send TRANSFER DONE ACK frame and come to end */
359 NoticeSessionProgress(session);
360 NotifyMsgRecver(session, DFILE_ON_FILE_SEND_SUCCESS, msg);
361 break;
362 case DFILE_TRANS_MSG_FILE_SEND_FAIL:
363 NotifyMsgRecver(session, DFILE_ON_FILE_SEND_FAIL, msg);
364 break;
365 case DFILE_TRANS_MSG_IN_PROGRESS:
366 NotifyMsgRecver(session, DFILE_ON_TRANS_IN_PROGRESS, msg);
367 break;
368 case DFILE_TRANS_MSG_FILE_SEND_ACK:
369 ProcessSessionTrans(session, dFileTrans->transId);
370 break;
371 default:
372 DFILE_LOGI(TAG, "transId %u, recv DFileTrans event %d", dFileTrans->transId, msgType);
373 }
374
375 CheckTransDone(session, dFileTrans, msgType);
376 }
377
ServerSettingTimeoutHandle(void * data)378 static void ServerSettingTimeoutHandle(void *data)
379 {
380 PeerInfo *peerInfo = data;
381 ListRemoveNode(&peerInfo->list);
382 TimerDelete(peerInfo->settingTimer);
383 peerInfo->session->peerInfoCnt--;
384 free(peerInfo);
385 DFILE_LOGI(TAG, "DFileServer Setting Negotationion timeout");
386 }
387
ClientSettingTimeoutHandle(void * data)388 static void ClientSettingTimeoutHandle(void *data)
389 {
390 PeerInfo *peerInfo = data;
391 uint8_t cnt = peerInfo->settingTimeoutCnt++;
392 DFileMsg msgData;
393 uint32_t timeout = CapsTcp(peerInfo->session) ? TCP_NEGOTIATE_TIMEOUT : DEFAULT_NEGOTIATE_TIMEOUT;
394 (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
395 if (cnt >= MAX_NEGOTIATE_TIMEOUT_COUNT) {
396 TimerDelete(peerInfo->settingTimer);
397 peerInfo->settingTimer = NULL;
398 peerInfo->settingTimeoutCnt = 0;
399 msgData.errorCode = NSTACKX_EFAILED;
400 DFILE_LOGI(TAG, "DFileClient Setting Negotationion timeout");
401 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &msgData);
402 } else {
403 DFileSessionSendSetting(peerInfo);
404 DFILE_LOGI(TAG, "Client Setting Negotationion timeout %u times", peerInfo->settingTimeoutCnt);
405 if (TimerSetTimeout(peerInfo->settingTimer, timeout, NSTACKX_FALSE) != NSTACKX_EOK) {
406 msgData.errorCode = NSTACKX_EFAILED;
407 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &msgData);
408 DFILE_LOGE(TAG, "Timer setting timer fail");
409 }
410 }
411 }
412
SearchDFileTransNode(List * dFileTransChain,uint16_t transId)413 static DFileTrans *SearchDFileTransNode(List *dFileTransChain, uint16_t transId)
414 {
415 List *pos = NULL;
416 DFileTrans *trans = NULL;
417
418 if (dFileTransChain == NULL || transId == 0) {
419 return NULL;
420 }
421
422 LIST_FOR_EACH(pos, dFileTransChain) {
423 trans = (DFileTrans *)pos;
424 if (trans->transId == transId) {
425 return trans;
426 }
427 }
428 return NULL;
429 }
430
SearchPeerInfoNode(const DFileSession * session,const struct sockaddr_in * peerAddr)431 static PeerInfo *SearchPeerInfoNode(const DFileSession *session, const struct sockaddr_in *peerAddr)
432 {
433 List *pos = NULL;
434 PeerInfo *peerInfo = NULL;
435 LIST_FOR_EACH(pos, &session->peerInfoChain) {
436 peerInfo = (PeerInfo *)pos;
437 if (memcmp(&peerInfo->dstAddr, peerAddr, sizeof(struct sockaddr_in)) == 0 &&
438 peerInfo->session->sessionId == session->sessionId) {
439 return peerInfo;
440 }
441 }
442 return NULL;
443 }
444
DFileSessionSendSetting(PeerInfo * peerInfo)445 void DFileSessionSendSetting(PeerInfo *peerInfo)
446 {
447 uint32_t timeout = CapsTcp(peerInfo->session) ? TCP_NEGOTIATE_TIMEOUT : DEFAULT_NEGOTIATE_TIMEOUT;
448 uint8_t buf[NSTACKX_DEFAULT_FRAME_SIZE];
449 size_t frameLen = 0;
450 DFileMsg data;
451 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
452 SettingFrame settingFramePara;
453 settingFramePara.connType = peerInfo->connType;
454 settingFramePara.mtu = peerInfo->localMtu;
455 settingFramePara.capability = peerInfo->session->capability & NSTACKX_CAPS_LINK_SEQUENCE;
456 settingFramePara.dataFrameSize = 0;
457 settingFramePara.capsCheck = NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
458 if (peerInfo->session->fileManager->keyLen) {
459 DFileGetCipherCaps(peerInfo->session, &settingFramePara);
460 settingFramePara.deviceBits = DFileGetDeviceBits();
461 }
462 EncodeSettingFrame(buf, NSTACKX_DEFAULT_FRAME_SIZE, &frameLen, &settingFramePara);
463
464 if (peerInfo->settingTimer != NULL) {
465 int32_t ret = DFileWriteHandle(buf, frameLen, peerInfo);
466 if (ret != (int32_t)frameLen && ret != NSTACKX_EAGAIN) {
467 data.errorCode = NSTACKX_EFAILED;
468 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
469 }
470 return;
471 }
472
473 if (peerInfo->session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
474 peerInfo->settingTimer = TimerStart(peerInfo->session->epollfd, timeout,
475 NSTACKX_FALSE, ClientSettingTimeoutHandle, peerInfo);
476 if (peerInfo->settingTimer == NULL) {
477 DFILE_LOGE(TAG, "setting timmer creat fail");
478 data.errorCode = NSTACKX_EFAILED;
479 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
480 return;
481 }
482 } else {
483 peerInfo->settingTimer = TimerStart(peerInfo->session->epollfd, MAX_SERVER_NEGOTIATE_VALID_TIMEOUT,
484 NSTACKX_FALSE, ServerSettingTimeoutHandle, peerInfo);
485 if (peerInfo->settingTimer == NULL) {
486 return;
487 }
488 }
489
490 int32_t ret = DFileWriteHandle(buf, frameLen, peerInfo);
491 if (ret != (int32_t)frameLen && ret != NSTACKX_EAGAIN) {
492 data.errorCode = NSTACKX_EFAILED;
493 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_FAIL, &data);
494 }
495 }
496
SetDFileSessionConfig(DFileSession * session,DFileConfig * dFileConfig,uint16_t connType,PeerInfo * peerInfo)497 static void SetDFileSessionConfig(DFileSession *session, DFileConfig *dFileConfig, uint16_t connType,
498 PeerInfo *peerInfo)
499 {
500 peerInfo->maxSendRate = dFileConfig->sendRate;
501 (void)memset_s(peerInfo->integralLossRate, INTEGRAL_TIME * sizeof(double), 0, INTEGRAL_TIME * sizeof(double));
502 peerInfo->fastStartCounter = 0;
503 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->measureBefore);
504 peerInfo->dataFrameSize = dFileConfig->dataFrameSize;
505 if (connType == CONNECT_TYPE_WLAN) {
506 peerInfo->sendRate = dFileConfig->sendRate / NSTACKX_WLAN_INIT_SPEED_DIVISOR;
507 } else {
508 peerInfo->sendRate = dFileConfig->sendRate / NSTACKX_P2P_INIT_SPEED_DIVISOR;
509 }
510
511 #ifndef NSTACKX_WITH_LITEOS
512 if ((!peerInfo->gotWifiRate) && (connType == CONNECT_TYPE_WLAN)) {
513 peerInfo->sendRate = (uint16_t)(NSTACKX_WLAN_INIT_RATE / MSEC_TICKS_PER_SEC * DATA_FRAME_SEND_INTERVAL_MS
514 / dFileConfig->dataFrameSize + NSTACKX_WLAN_COMPENSATION_RATE);
515 }
516 #endif
517 if (peerInfo->sendRate < NSTACKX_MIN_SENDRATE) {
518 peerInfo->sendRate = NSTACKX_MIN_SENDRATE;
519 }
520
521 if (FileManagerSetMaxFrameLength(session->fileManager, peerInfo->dataFrameSize) != NSTACKX_EOK) {
522 DFILE_LOGE(TAG, "failed to set max frame length");
523 }
524
525 DFILE_LOGI(TAG, "connType is %u set sendrate is %u maxSendRate is %u peerInfo->dataFrameSize is %u",
526 connType, peerInfo->sendRate, peerInfo->maxSendRate, peerInfo->dataFrameSize);
527 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
528 if (FileManagerSetRecvParaWithConnType(session->fileManager, connType) != NSTACKX_EOK) {
529 DFILE_LOGE(TAG, "failed to set recv para");
530 }
531 }
532 }
533
DFileSessionSetPeerInfo(PeerInfo * peerInfo,SettingState state,SettingFrame * settingFrame)534 static void DFileSessionSetPeerInfo(PeerInfo *peerInfo, SettingState state, SettingFrame *settingFrame)
535 {
536 peerInfo->state = state;
537 peerInfo->mtu = settingFrame->mtu;
538 peerInfo->connType = settingFrame->connType;
539 uint16_t localMtu = peerInfo->localMtu;
540 peerInfo->mtuInuse = (localMtu < peerInfo->mtu) ? localMtu : peerInfo->mtu;
541 peerInfo->remoteSessionId = settingFrame->header.sessionId;
542 }
543
DFileSessionHandleClientSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)544 static void DFileSessionHandleClientSetting(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
545 {
546 List *pos = NULL;
547 SettingFrame hostSettingFrame;
548 (void)memset_s(&hostSettingFrame, sizeof(hostSettingFrame), 0, sizeof(hostSettingFrame));
549 DFILE_LOGI(TAG, "handle Setting Frame, DFileSessionType %u", session->sessionType);
550 /* unsupport version */
551 if (DecodeSettingFrame((SettingFrame *)dFileFrame, &hostSettingFrame) != NSTACKX_EOK || hostSettingFrame.mtu == 0) {
552 return;
553 }
554 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
555 if (peerInfo == NULL) {
556 DFILE_LOGE(TAG, "recv unknown peer setting, maybe be attacked");
557 return;
558 }
559 peerInfo->remoteDFileVersion = hostSettingFrame.dFileVersion;
560 TimerDelete(peerInfo->settingTimer);
561 peerInfo->settingTimer = NULL;
562 DFileSessionSetPeerInfo(peerInfo, SETTING_NEGOTIATED, &hostSettingFrame);
563 LIST_FOR_EACH(pos, &session->dFileTransChain) {
564 DFileTrans *trans = (DFileTrans *)pos;
565 if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
566 DFILE_LOGE(TAG, "set trans mtu failed");
567 }
568 }
569 DFileMsg data;
570 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
571 data.errorCode = NSTACKX_EOK;
572 NotifyMsgRecver(peerInfo->session, DFILE_ON_CONNECT_SUCCESS, &data);
573
574 DFileConfig dFileConfig;
575 (void)memset_s(&dFileConfig, sizeof(dFileConfig), 0, sizeof(dFileConfig));
576 if (GetDFileConfig(&dFileConfig, peerInfo->mtuInuse, hostSettingFrame.connType) == NSTACKX_EOK) {
577 SetDFileSessionConfig(session, &dFileConfig, hostSettingFrame.connType, peerInfo);
578 }
579 if (hostSettingFrame.capability & NSTACKX_CAPS_LINK_SEQUENCE) {
580 DFILE_LOGI(TAG, "server replies not support Link Sequence");
581 } else {
582 DFILE_LOGI(TAG, "server replies using normal ACK");
583 session->capability &= ~NSTACKX_CAPS_LINK_SEQUENCE;
584 }
585 DFileChooseCipherType(&hostSettingFrame, session);
586 }
587
DFileGetMTU(SocketProtocol protocol)588 static uint16_t DFileGetMTU(SocketProtocol protocol)
589 {
590 /* for udp, return NSTACKX_DEFAULT_FRAME_SIZE, for D2D, need call D2D MTU interface */
591 uint16_t mtu = 0;
592 if (protocol == NSTACKX_PROTOCOL_UDP) {
593 mtu = NSTACKX_DEFAULT_FRAME_SIZE;
594 } else if (protocol == NSTACKX_PROTOCOL_D2D) {
595 DFILE_LOGE(TAG, "d2d not support");
596 } else if (protocol == NSTACKX_PROTOCOL_TCP) {
597 mtu = NSTACKX_DEFAULT_FRAME_SIZE;
598 }
599
600 return mtu;
601 }
602
CreatePeerInfo(DFileSession * session,const struct sockaddr_in * dstAddr,uint16_t peerMtu,uint16_t connType,uint8_t socketIndex)603 PeerInfo *CreatePeerInfo(DFileSession *session, const struct sockaddr_in *dstAddr, uint16_t peerMtu,
604 uint16_t connType, uint8_t socketIndex)
605 {
606 if (session->peerInfoCnt >= MAX_PEERINFO_SIZE) {
607 return NULL;
608 }
609 PeerInfo *peerInfo = calloc(1, sizeof(PeerInfo));
610 if (peerInfo == NULL) {
611 return NULL;
612 }
613 peerInfo->session = session;
614 peerInfo->dstAddr = *dstAddr;
615 peerInfo->connType = connType;
616 peerInfo->socketIndex = socketIndex;
617 peerInfo->localMtu = DFileGetMTU(session->protocol);
618 session->peerInfoCnt++;
619 peerInfo->gotWifiRate = 0;
620 peerInfo->ackInterval = NSTACKX_INIT_ACK_INTERVAL;
621 peerInfo->rateStateInterval = NSTACKX_INIT_RATE_STAT_INTERVAL;
622
623 if (GetInterfaceNameByIP(session->socket[socketIndex]->srcAddr.sin_addr.s_addr,
624 peerInfo->localInterfaceName, sizeof(peerInfo->localInterfaceName)) != NSTACKX_EOK) {
625 DFILE_LOGE(TAG, "GetInterfaceNameByIP failed %d", errno);
626 }
627
628 if (peerMtu == 0) {
629 return peerInfo;
630 }
631 peerInfo->mtu = peerMtu;
632 peerInfo->mtuInuse = (peerInfo->localMtu < peerInfo->mtu) ? peerInfo->localMtu : peerInfo->mtu;
633 return peerInfo;
634 }
635
HandleLinkSeqCap(DFileSession * session,SettingFrame * hostSettingFrame)636 static void HandleLinkSeqCap(DFileSession *session, SettingFrame *hostSettingFrame)
637 {
638 if (hostSettingFrame->capability & NSTACKX_CAPS_LINK_SEQUENCE) {
639 DFILE_LOGI(TAG, "client wants to enable Link Sequence");
640 } else {
641 DFILE_LOGI(TAG, "client wants to use normal ACK");
642 session->capability &= ~NSTACKX_CAPS_LINK_SEQUENCE;
643 }
644
645 if (hostSettingFrame->capability & NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK) {
646 DFILE_LOGI(TAG, "client support recv feedback");
647 session->capsCheck |= NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
648 } else {
649 DFILE_LOGI(TAG, "client do not support recv feedback");
650 session->capsCheck &= ~NSTACKX_INTERNAL_CAPS_RECV_FEEDBACK;
651 }
652 }
653
AllocPeerInfo(DFileSession * session,const struct sockaddr_in * peerAddr,const SettingFrame * frame,uint8_t socketIndex)654 static PeerInfo *AllocPeerInfo(DFileSession *session, const struct sockaddr_in *peerAddr, const SettingFrame *frame,
655 uint8_t socketIndex)
656 {
657 PeerInfo *peerInfo = NULL;
658
659 peerInfo = CreatePeerInfo(session, peerAddr, frame->mtu, frame->connType, socketIndex);
660 if (peerInfo == NULL) {
661 return NULL;
662 }
663
664 peerInfo->remoteSessionId = frame->header.sessionId;
665 peerInfo->state = SETTING_NEGOTIATING;
666 DFileSessionSendSetting(peerInfo);
667 if (peerInfo->settingTimer == NULL) {
668 free(peerInfo);
669 session->peerInfoCnt--;
670 return NULL;
671 }
672 ListInsertTail(&peerInfo->session->peerInfoChain, &peerInfo->list);
673 return peerInfo;
674 }
675
DFileSessionHandleServerSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr,uint8_t socketIndex)676 static void DFileSessionHandleServerSetting(DFileSession *session, DFileFrame *dFileFrame,
677 struct sockaddr_in *peerAddr, uint8_t socketIndex)
678 {
679 SettingFrame hostSettingFrame;
680 DFileConfig dFileConfig;
681 (void)memset_s(&hostSettingFrame, sizeof(hostSettingFrame), 0, sizeof(hostSettingFrame));
682 (void)memset_s(&dFileConfig, sizeof(dFileConfig), 0, sizeof(dFileConfig));
683 DFILE_LOGI(TAG, "handle Setting Frame, DFileSessionType %u", session->sessionType);
684 if (DecodeSettingFrame((SettingFrame *)dFileFrame, &hostSettingFrame) != NSTACKX_EOK || hostSettingFrame.mtu == 0) {
685 return;
686 }
687
688 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
689 if (peerInfo != NULL) {
690 if (peerInfo->settingTimeoutCnt >= MAX_NEGOTIATE_TIMEOUT_COUNT) {
691 DFILE_LOGE(TAG, "receive more than %d Setting for one peer, drop", MAX_NEGOTIATE_TIMEOUT_COUNT);
692 return;
693 } else {
694 DFileSessionSetPeerInfo(peerInfo, SETTING_NEGOTIATING, &hostSettingFrame);
695 DFileSessionSendSetting(peerInfo);
696 goto L_END;
697 }
698 }
699
700 peerInfo = AllocPeerInfo(session, peerAddr, &hostSettingFrame, socketIndex);
701 if (peerInfo == NULL) {
702 return;
703 }
704
705 L_END:
706 peerInfo->settingTimeoutCnt++;
707 DFILE_LOGI(TAG, "DFileServer response Setting Frame. count %u", peerInfo->settingTimeoutCnt);
708 peerInfo->remoteDFileVersion = hostSettingFrame.dFileVersion;
709 if (GetDFileConfig(&dFileConfig, peerInfo->mtuInuse, hostSettingFrame.connType) == NSTACKX_EOK) {
710 SetDFileSessionConfig(session, &dFileConfig, hostSettingFrame.connType, peerInfo);
711 }
712 HandleLinkSeqCap(session, &hostSettingFrame);
713 DFileChooseCipherType(&hostSettingFrame, session);
714 }
715
DFileSessionHandleSetting(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr,uint8_t socketIndex)716 static void DFileSessionHandleSetting(DFileSession *session, DFileFrame *dFileFrame,
717 struct sockaddr_in *peerAddr, uint8_t socketIndex)
718 {
719 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
720 DFileSessionHandleServerSetting(session, dFileFrame, peerAddr, socketIndex);
721 } else if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
722 DFileSessionHandleClientSetting(session, dFileFrame, peerAddr);
723 } else {
724 return;
725 }
726 }
727
HandleWithoutSettingError(DFileSession * session,const struct sockaddr_in * peerAddr)728 static void HandleWithoutSettingError(DFileSession *session, const struct sockaddr_in *peerAddr)
729 {
730 List *pos = NULL;
731 DFileTrans *trans = NULL;
732
733 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
734 if (peerInfo == NULL) {
735 DFILE_LOGE(TAG, "recv unknown peer rst, maybe be attacked");
736 return;
737 }
738
739 LIST_FOR_EACH(pos, &session->dFileTransChain) {
740 trans = (DFileTrans *)pos;
741 /*
742 * when client recv peer RST NSTACKX_DFILE_WITHOUT_SETTING_ERROR, Set Trans MTU 0,
743 * and will reset after new setting negotion.
744 */
745 if (DFileTransSetMtu(trans, 0) != NSTACKX_EOK) {
746 DFILE_LOGE(TAG, "DFileTransSetMtu(trans, 0) failed %d", errno);
747 }
748 }
749
750 LIST_FOR_EACH(pos, &session->peerInfoChain) {
751 peerInfo = (PeerInfo *)pos;
752 if (memcmp(&peerInfo->dstAddr, peerAddr, sizeof(struct sockaddr_in)) == 0 &&
753 peerInfo->session->sessionId == session->sessionId) {
754 TimerDelete(peerInfo->settingTimer);
755 peerInfo->settingTimer = NULL;
756 peerInfo->state = SETTING_NEGOTIATING;
757 DFILE_LOGD(TAG, "Send Setting Frame");
758 DFileSessionSendSetting(peerInfo);
759 break;
760 }
761 }
762 }
763
DFileSessionHandleRst(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)764 static void DFileSessionHandleRst(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
765 {
766 uint16_t errCode = 0;
767 if (DecodeRstFrame((RstFrame *)dFileFrame, &errCode, NULL, NULL) != NSTACKX_EOK) {
768 return;
769 }
770
771 uint16_t transId = ntohs(dFileFrame->header.transId);
772 DFILE_LOGD(TAG, "handle RST (%hu) frame, transId %hu", errCode, transId);
773
774 switch (errCode) {
775 case NSTACKX_DFILE_WITHOUT_SETTING_ERROR:
776 HandleWithoutSettingError(session, peerAddr);
777 break;
778 default:
779 DFILE_LOGE(TAG, "Unspported error code %hu", errCode);
780 break;
781 }
782 }
783
DFileSessionResolveBackPress(DFileSession * session,DataBackPressure backPress,uint32_t count)784 static void DFileSessionResolveBackPress(DFileSession *session, DataBackPressure backPress, uint32_t count)
785 {
786 uint32_t index;
787
788 if (PthreadMutexLock(&session->backPressLock) != 0) {
789 DFILE_LOGE(TAG, "pthread backPressLock mutex lock failed");
790 return;
791 }
792
793 if (backPress.recvListOverIo == 1) {
794 for (index = 0; index < count; index++) {
795 session->stopSendCnt[index]++;
796 }
797 } else {
798 for (index = 0; index < count; index++) {
799 session->stopSendCnt[index] = 0;
800 }
801 }
802
803 if (PthreadMutexUnlock(&session->backPressLock) != 0) {
804 DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
805 return;
806 }
807
808 return;
809 }
810
DFileSessionHandleBackPressure(DFileSession * session,const DFileFrame * dFileFrame,const struct sockaddr_in * peerAddr)811 static void DFileSessionHandleBackPressure(DFileSession *session, const DFileFrame *dFileFrame,
812 const struct sockaddr_in *peerAddr)
813 {
814 DataBackPressure backPress;
815 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
816 if (peerInfo == NULL) {
817 DFILE_LOGE(TAG, "can't get valid peerinfo");
818 return;
819 }
820
821 if (DecodeBackPressFrame((BackPressureFrame *)dFileFrame, &backPress) != NSTACKX_EOK) {
822 return;
823 }
824
825 DFileSessionResolveBackPress(session, backPress, session->clientSendThreadNum);
826
827 DFILE_LOGI(TAG, "handle back pressure recvListOverIo %u recvBufThreshold %u stopSendPeriod %u",
828 backPress.recvListOverIo, backPress.recvBufThreshold,
829 backPress.stopSendPeriod);
830
831 return;
832 }
833
CreateTrans(uint16_t transId,DFileSession * session,PeerInfo * peerInfo,uint8_t isSender)834 static DFileTrans *CreateTrans(uint16_t transId, DFileSession *session, PeerInfo *peerInfo, uint8_t isSender)
835 {
836 DFileTransPara transPara;
837
838 if (peerInfo == NULL) {
839 return NULL;
840 }
841 (void)memset_s(&transPara, sizeof(transPara), 0, sizeof(transPara));
842 transPara.isSender = isSender;
843 transPara.transId = transId; /* for receiver, use transId of sender */
844 transPara.fileManager = session->fileManager;
845 transPara.writeHandle = DFileWriteHandle;
846 transPara.msgReceiver = DTransMsgReceiver;
847 transPara.connType = peerInfo->connType;
848 transPara.context = peerInfo;
849 transPara.session = session;
850 transPara.onRenameFile = session->onRenameFile;
851
852 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
853 return DFileTransCreate(&transPara);
854 }
855
DFileSessionHandleFrame(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)856 static int32_t DFileSessionHandleFrame(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
857 {
858 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
859 if (peerInfo == NULL) {
860 DFILE_LOGI(TAG, "can't find peerInfo");
861 return NSTACKX_EFAILED;
862 }
863
864 if (peerInfo->session->sessionType == DFILE_SESSION_TYPE_SERVER && peerInfo->state == SETTING_NEGOTIATING) {
865 peerInfo->state = SETTING_NEGOTIATED;
866 TimerDelete(peerInfo->settingTimer);
867 peerInfo->settingTimer = NULL;
868 }
869
870 uint16_t transId = ntohs(dFileFrame->header.transId);
871 if (transId == 0) {
872 DFILE_LOGE(TAG, "transId is 0");
873 return NSTACKX_EFAILED;
874 }
875
876 DFileTrans *trans = SearchDFileTransNode(&(session->dFileTransChain), transId);
877 if (trans == NULL) {
878 if (dFileFrame->header.type != NSTACKX_DFILE_FILE_HEADER_FRAME) {
879 /* Only HEADER frame can start dfile transfer (receiver) */
880 DFILE_LOGE(TAG, "trans %u is NULL && type is %u", transId, dFileFrame->header.type);
881 return NSTACKX_EFAILED;
882 }
883 if (IsTransIdDone(session, transId) == NSTACKX_EOK) {
884 return NSTACKX_EFAILED;
885 }
886 trans = CreateTrans(transId, session, peerInfo, NSTACKX_FALSE);
887 if (trans == NULL) {
888 DFileMsg data;
889 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
890 data.errorCode = NSTACKX_ENOMEM;
891 NotifyMsgRecver(session, DFILE_ON_FATAL_ERROR, &data);
892 DFILE_LOGE(TAG, "trans is NULL");
893 return NSTACKX_EFAILED;
894 }
895 if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
896 DFILE_LOGE(TAG, "set trans mtu failed");
897 }
898 CalculateSessionTransferRatePrepare(session);
899 ListInsertTail(&(session->dFileTransChain), &(trans->list));
900 peerInfo->currentTransCount++;
901 }
902
903 return HandleDFileFrame(trans, dFileFrame);
904 }
905
DFileWriteHandle(const uint8_t * frame,size_t len,void * context)906 int32_t DFileWriteHandle(const uint8_t *frame, size_t len, void *context)
907 {
908 PeerInfo *peerInfo = context;
909 DFileSession *session = peerInfo->session;
910 QueueNode *queueNode = NULL;
911 struct sockaddr_in peerAddr;
912
913 peerAddr = peerInfo->dstAddr;
914 queueNode = CreateQueueNode(frame, len, &peerAddr, peerInfo->socketIndex);
915 if (queueNode == NULL) {
916 return NSTACKX_ENOMEM;
917 }
918
919 if (PthreadMutexLock(&session->outboundQueueLock) != 0) {
920 DFILE_LOGE(TAG, "pthread mutex lock failed");
921 free(queueNode->frame);
922 free(queueNode);
923 return NSTACKX_EFAILED;
924 }
925 ListInsertTail(&session->outboundQueue, &queueNode->list);
926 session->outboundQueueSize++;
927 if (PthreadMutexUnlock(&session->outboundQueueLock) != 0) {
928 /* Don't need to free node, as it's mount to outboundQueue */
929 DFILE_LOGE(TAG, "pthread mutex unlock failed");
930 return NSTACKX_EFAILED;
931 }
932 SemPost(&session->outboundQueueWait[peerInfo->socketIndex]);
933 return (int32_t)len;
934 }
935
BindMainLoopToTargetCpu(void)936 static void BindMainLoopToTargetCpu(void)
937 {
938 int32_t cpu;
939 int32_t cpus = GetCpuNum();
940 if (cpus >= FIRST_CPU_NUM_LEVEL) {
941 return;
942 } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
943 cpu = CPU_IDX_0;
944 } else {
945 return;
946 }
947 StartThreadBindCore(cpu);
948 }
949
GetEpollWaitTimeOut(DFileSession * session)950 static int64_t GetEpollWaitTimeOut(DFileSession *session)
951 {
952 int64_t minTimeout = DEFAULT_WAIT_TIME_MS;
953 if (session->mainLoopActiveReadFlag && session->inboundQueueSize) {
954 minTimeout = 0;
955 return minTimeout;
956 }
957 List *pos = NULL;
958 DFileTrans *trans = NULL;
959 int64_t timeout;
960 LIST_FOR_EACH(pos, &session->dFileTransChain) {
961 trans = (DFileTrans *)pos;
962 timeout = DFileTransGetTimeout(trans);
963 if (timeout >= 0 && timeout < minTimeout) {
964 minTimeout = timeout;
965 }
966 }
967 if (minTimeout > DEFAULT_WAIT_TIME_MS) {
968 minTimeout = DEFAULT_WAIT_TIME_MS;
969 }
970 return minTimeout;
971 }
972
ProcessSessionTrans(const DFileSession * session,uint16_t exceptTransId)973 static void ProcessSessionTrans(const DFileSession *session, uint16_t exceptTransId)
974 {
975 List *tmp = NULL;
976 List *pos = NULL;
977 LIST_FOR_EACH_SAFE(pos, tmp, &session->dFileTransChain) {
978 DFileTrans *trans = (DFileTrans *)pos;
979 if (trans->transId != exceptTransId) {
980 DFileTransProcess(trans);
981 }
982 }
983 }
984
NotifyBindPort(const DFileSession * session)985 static void NotifyBindPort(const DFileSession *session)
986 {
987 DFileMsg data;
988 int32_t socketNum;
989 (void)memset_s(&data, sizeof(data), 0, sizeof(data));
990 socketNum = 1;
991
992 for (int i = 0; i < socketNum; i++) {
993 data.sockAddr[i].sin_port = ntohs(session->socket[i]->srcAddr.sin_port);
994 data.sockAddr[i].sin_addr.s_addr = ntohl(session->socket[i]->srcAddr.sin_addr.s_addr);
995 }
996 NotifyMsgRecver(session, DFILE_ON_BIND, &data);
997 }
998
DFileMainLoop(void * arg)999 void *DFileMainLoop(void *arg)
1000 {
1001 DFileSession *session = arg;
1002 int32_t ret = NSTACKX_EOK;
1003 DFileMsg msgData;
1004 (void)memset_s(&msgData, sizeof(msgData), 0, sizeof(msgData));
1005 uint8_t isBind = NSTACKX_FALSE;
1006 DFILE_LOGI(TAG, "main thread start");
1007 SetThreadName(DFFILE_MAIN_THREAD_NAME);
1008 SetMaximumPriorityForThread();
1009 SetTidToBindInfo(session, POS_MAIN_THERD_START);
1010 NotifyBindPort(session);
1011 while (!session->closeFlag) {
1012 int64_t minTimeout = GetEpollWaitTimeOut(session);
1013 ret = EpollLoop(session->epollfd, (int32_t)minTimeout);
1014 if (ret == NSTACKX_EFAILED) {
1015 DFILE_LOGE(TAG, "epoll wait failed");
1016 break;
1017 }
1018 if (isBind == NSTACKX_FALSE && session->transFlag == NSTACKX_TRUE) {
1019 BindMainLoopToTargetCpu();
1020 isBind = NSTACKX_TRUE;
1021 }
1022 ProcessSessionTrans(session, 0);
1023 if (session->mainLoopActiveReadFlag && session->inboundQueueSize) {
1024 session->partReadFlag = NSTACKX_TRUE;
1025 ReadEventHandle(session);
1026 session->partReadFlag = NSTACKX_FALSE;
1027 }
1028 }
1029
1030 if (ret == NSTACKX_EFAILED || DFileSessionCheckFatalFlag(session)) {
1031 msgData.errorCode = NSTACKX_EFAILED;
1032 NotifyMsgRecver(session, DFILE_ON_FATAL_ERROR, &msgData);
1033 }
1034
1035 /* Notify sender thread to terminate */
1036 PostOutboundQueueWait(session);
1037
1038 /* Unblock "select" and notify receiver thread to terminate */
1039 NotifyPipeEvent(session);
1040 return NULL;
1041 }
1042
AmendPeerInfoSendRate(PeerInfo * peerInfo)1043 static void AmendPeerInfoSendRate(PeerInfo *peerInfo)
1044 {
1045 peerInfo->amendSendRate = peerInfo->sendRate;
1046 DFILE_LOGI(TAG, "current: sendrate %u, realsendframerate %u---new amendSendRate %d",
1047 peerInfo->sendRate, peerInfo->sendFrameRate, peerInfo->amendSendRate);
1048 }
1049
DFileSendCalculateRate(DFileSession * session,PeerInfo * peerInfo)1050 static void DFileSendCalculateRate(DFileSession *session, PeerInfo *peerInfo)
1051 {
1052 uint64_t sendCount;
1053
1054 if (session->sessionType != DFILE_SESSION_TYPE_CLIENT) {
1055 return;
1056 }
1057
1058 struct timespec nowTime;
1059 ClockGetTime(CLOCK_MONOTONIC, &nowTime);
1060 uint64_t measureElapse = GetTimeDiffUs(&nowTime, &peerInfo->startTime);
1061 /* just calculate io rate */
1062 if (measureElapse > peerInfo->rateStateInterval) {
1063 /* ONLY PEER 0 calculate io rate */
1064 sendCount = (uint64_t)peerInfo->sendCount;
1065 if (peerInfo->socketIndex == 0) {
1066 session->fileManager->iorRate = (uint32_t)(session->fileManager->iorBytes *
1067 NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1068 DFILE_LOGI(TAG, "IO read rate: %u MB/s send list full times %u", session->fileManager->iorRate,
1069 session->fileManager->sendListFullTimes);
1070 session->fileManager->sendListFullTimes = 0;
1071 session->fileManager->iorBytes = 0;
1072 }
1073
1074 peerInfo->sendFrameRate = (uint32_t)(sendCount * DATA_FRAME_SEND_INTERVAL_US / measureElapse);
1075 peerInfo->sendCountRateMB = (uint32_t)(sendCount * peerInfo->dataFrameSize *
1076 NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1077 if (peerInfo->qdiscSearchNum != 0) {
1078 peerInfo->qdiscAveLeft = peerInfo->qdiscAveLeft / peerInfo->qdiscSearchNum;
1079 }
1080
1081 DFILE_LOGI(TAG, "framesize %u maxsendrate %u sendRate %u, amendSendRate %d sendCount %llu,"
1082 "measureElapse %llu sendFrameRate %u %uMB/s,"
1083 "total send block num %llu eAgainCount %u send list empty times %u sleep times %u, noPendingData %u,"
1084 "min qdisc %u max qdisc %u search num %u ave qdisc %u"
1085 "totalRecvBlocks %llu socket:%u "
1086 "overRun %llu maxRetryCountPerSec %u maxRetryCountLastSec %u wlanCatagory %u",
1087 peerInfo->dataFrameSize, peerInfo->maxSendRate, peerInfo->sendRate, peerInfo->amendSendRate,
1088 peerInfo->sendCount, measureElapse, peerInfo->sendFrameRate, peerInfo->sendCountRateMB,
1089 NSTACKX_ATOM_FETCH(&(session->totalSendBlocks)), peerInfo->eAgainCount,
1090 NSTACKX_ATOM_FETCH(&(session->sendBlockListEmptyTimes)), session->sleepTimes,
1091 NSTACKX_ATOM_FETCH(&(session->noPendingDataTimes)), peerInfo->qdiscMinLeft,
1092 peerInfo->qdiscMaxLeft, peerInfo->qdiscSearchNum, peerInfo->qdiscAveLeft,
1093 NSTACKX_ATOM_FETCH(&(session->totalRecvBlocks)), peerInfo->socketIndex,
1094 peerInfo->overRun, peerInfo->maxRetryCountPerSec, peerInfo->maxRetryCountLastSec, session->wlanCatagory);
1095 AmendPeerInfoSendRate(peerInfo);
1096 ClearSessionStats(session);
1097 ClearPeerinfoStats(peerInfo);
1098 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1099 }
1100 }
1101
UpdateAllTransRetryCount(DFileSession * session,PeerInfo * peerInfo)1102 void UpdateAllTransRetryCount(DFileSession *session, PeerInfo *peerInfo)
1103 {
1104 List *pos = NULL;
1105 DFileTrans *trans = NULL;
1106 uint32_t allRetryCount = 0;
1107 LIST_FOR_EACH(pos, &session->dFileTransChain) { /* for client , only one peer */
1108 trans = (DFileTrans *)pos;
1109 allRetryCount += trans->retryCount;
1110 }
1111 peerInfo->allDtransRetryCount = allRetryCount;
1112 }
1113
DFileRecvCalculateRate(DFileSession * session,DFileFrame * dFileFrame,struct sockaddr_in * peerAddr)1114 static void DFileRecvCalculateRate(DFileSession *session, DFileFrame *dFileFrame, struct sockaddr_in *peerAddr)
1115 {
1116 struct timespec nowTime;
1117 uint64_t recvCount;
1118 uint32_t timeOut;
1119
1120 if (session->sessionType != DFILE_SESSION_TYPE_SERVER ||
1121 dFileFrame->header.type != NSTACKX_DFILE_FILE_DATA_FRAME) {
1122 return;
1123 }
1124
1125 PeerInfo *peerInfo = SearchPeerInfoNode(session, peerAddr);
1126 if (peerInfo == NULL) {
1127 return;
1128 }
1129
1130 timeOut = (peerInfo->connType == CONNECT_TYPE_P2P) ? NSTACKX_P2P_MAX_CONTROL_FRAME_TIMEOUT :
1131 NSTACKX_WLAN_MAX_CONTROL_FRAME_TIMEOUT;
1132
1133 peerInfo->recvCount++;
1134 ClockGetTime(CLOCK_MONOTONIC, &nowTime);
1135 uint64_t measureElapse = GetTimeDiffUs(&nowTime, &session->measureBefore);
1136 if (measureElapse > peerInfo->rateStateInterval) {
1137 session->fileManager->iowRate = (uint32_t)(session->fileManager->iowBytes *
1138 NSTACKX_MICRO_TICKS / measureElapse / DFILE_KILOBYTES);
1139 session->fileManager->iowCount = session->fileManager->iowBytes / peerInfo->dataFrameSize *
1140 (NSTACKX_MILLI_TICKS * timeOut - peerInfo->rateStateInterval) / measureElapse;
1141 if (session->fileManager->iowRate > session->fileManager->iowMaxRate) {
1142 session->fileManager->iowMaxRate = session->fileManager->iowRate;
1143 }
1144 DFILE_LOGI(TAG, "measureElapse %llu iowBytes %llu iowCount %llu IO write rate : %u KB/s", measureElapse,
1145 session->fileManager->iowBytes, session->fileManager->iowCount, session->fileManager->iowRate);
1146 session->fileManager->iowBytes = 0;
1147 ClockGetTime(CLOCK_MONOTONIC, &session->measureBefore);
1148 }
1149
1150 measureElapse = GetTimeDiffUs(&nowTime, &peerInfo->startTime);
1151 if (measureElapse > peerInfo->rateStateInterval) {
1152 recvCount = (uint64_t)peerInfo->recvCount;
1153 peerInfo->recvFrameRate = (uint32_t)(recvCount * DATA_FRAME_SEND_INTERVAL_US / measureElapse);
1154 peerInfo->recvCountRateMB = (uint32_t)(recvCount * peerInfo->dataFrameSize *
1155 NSTACKX_MICRO_TICKS / measureElapse / DFILE_MEGABYTES);
1156 peerInfo->recvCount = 0;
1157 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1158 }
1159 }
1160
CheckElapseTime(const struct timespec * before,uint64_t overRun)1161 static uint64_t CheckElapseTime(const struct timespec *before, uint64_t overRun)
1162 {
1163 struct timespec now;
1164 ClockGetTime(CLOCK_MONOTONIC, &now);
1165 uint64_t elapseUs = GetTimeDiffUs(&now, before);
1166 elapseUs += overRun;
1167 if (elapseUs < DATA_FRAME_SEND_INTERVAL_US) {
1168 if (usleep((useconds_t)((uint64_t)DATA_FRAME_SEND_INTERVAL_US - elapseUs)) != NSTACKX_EOK) {
1169 DFILE_LOGE(TAG, "usleep(DATA_FRAME_SEND_INTERVAL_US - elapseUs) failed %d", errno);
1170 }
1171 return 0;
1172 } else {
1173 uint64_t delta = (elapseUs - DATA_FRAME_SEND_INTERVAL_US);
1174 return delta;
1175 }
1176 }
1177
BindClientSendThreadToTargetCpu(uint32_t idx)1178 static void BindClientSendThreadToTargetCpu(uint32_t idx)
1179 {
1180 int32_t cpu;
1181 int32_t cpus = GetCpuNum();
1182 if (cpus >= FIRST_CPU_NUM_LEVEL) {
1183 return;
1184 } else if (cpus >= SECOND_CPU_NUM_LEVEL) {
1185 cpu = CPU_IDX_1 + (int32_t)idx;
1186 } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
1187 cpu = CPU_IDX_1;
1188 } else {
1189 return;
1190 }
1191 if (cpu > cpus) {
1192 cpu = cpus - 1;
1193 }
1194 StartThreadBindCore(cpu);
1195 }
1196
DFileSenderUpdateMeasureTime(DFileSession * session,uint8_t socketIndex)1197 static void DFileSenderUpdateMeasureTime(DFileSession *session, uint8_t socketIndex)
1198 {
1199 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
1200 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
1201 if (peerInfo == NULL) {
1202 return;
1203 }
1204 ClockGetTime(CLOCK_MONOTONIC, &peerInfo->startTime);
1205 BindClientSendThreadToTargetCpu(0);
1206 }
1207 }
1208
TerminateMainThreadFatalInner(void * arg)1209 static void TerminateMainThreadFatalInner(void *arg)
1210 {
1211 DFileSession *session = (DFileSession *)arg;
1212 DFileSessionSetFatalFlag(session);
1213 }
1214
PostFatalEvent(DFileSession * session)1215 static void PostFatalEvent(DFileSession *session)
1216 {
1217 if (PostEvent(&session->eventNodeChain, session->epollfd, TerminateMainThreadFatalInner, session) != NSTACKX_EOK) {
1218 DFILE_LOGE(TAG, "PostEvent TerminateMainThreadFatalInner failed");
1219 DFileSessionSetFatalFlag(session);
1220 }
1221 }
1222
GetSocketWaitMs(uint32_t threadNum)1223 static uint32_t GetSocketWaitMs(uint32_t threadNum)
1224 {
1225 if (threadNum > 1) {
1226 return MULTI_THREADS_SOCKET_WAIT_TIME_MS;
1227 }
1228 return DEFAULT_WAIT_TIME_MS;
1229 }
1230
SetSendThreadName(uint32_t threadIdx)1231 static void SetSendThreadName(uint32_t threadIdx)
1232 {
1233 char name[MAX_THREAD_NAME_LEN] = {0};
1234 if (sprintf_s(name, sizeof(name), "%s%u", DFFILE_SEND_THREAD_NAME_PREFIX, threadIdx) < 0) {
1235 DFILE_LOGE(TAG, "sprintf send thead name failed");
1236 }
1237 SetThreadName(name);
1238 }
1239
1240 typedef struct {
1241 struct DFileSession *session;
1242 uint32_t threadIdx;
1243 }SendThreadCtx;
1244
DFileAddiSenderHandle(void * arg)1245 static void *DFileAddiSenderHandle(void *arg)
1246 {
1247 SendThreadCtx *sendThreadCtx = (SendThreadCtx *)arg;
1248 struct DFileSession *session = sendThreadCtx->session;
1249 uint32_t threadIdx = sendThreadCtx->threadIdx;
1250 free(sendThreadCtx);
1251 int32_t ret = NSTACKX_EOK;
1252 DFILE_LOGI(TAG, "send thread %u start", threadIdx);
1253 SetSendThreadName(threadIdx);
1254 SetTidToBindInfo(session, threadIdx + POS_SEND_THERD_START);
1255 List unsent;
1256 uint8_t canWrite = NSTACKX_FALSE;
1257 uint32_t socketWaitMs = GetSocketWaitMs(session->clientSendThreadNum);
1258 BindClientSendThreadToTargetCpu(threadIdx + 1);
1259 ListInitHead(&unsent);
1260 while (!session->addiSenderCloseFlag) {
1261 if (ListIsEmpty(&unsent) && !FileManagerHasPendingData(session->fileManager)) {
1262 NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
1263 SemWait(&session->sendThreadPara[threadIdx].sendWait);
1264 if (session->addiSenderCloseFlag) {
1265 break;
1266 }
1267 }
1268 if (ret == NSTACKX_EAGAIN) {
1269 ret = WaitSocketEvent(NULL, session->socket[0]->sockfd, socketWaitMs, NULL, &canWrite);
1270 if (ret != NSTACKX_EOK || session->closeFlag) {
1271 break;
1272 }
1273 if (!canWrite) {
1274 ret = NSTACKX_EAGAIN;
1275 continue;
1276 }
1277 }
1278 ret = SendDataFrame(session, &unsent, threadIdx, 0);
1279 if ((ret < 0 && ret != NSTACKX_EAGAIN) || session->closeFlag) {
1280 break;
1281 }
1282 if (ret == NSTACKX_EAGAIN) {
1283 continue;
1284 }
1285 SemWait(&session->sendThreadPara[threadIdx].semNewCycle);
1286 }
1287 if (ret < 0 && ret != NSTACKX_EAGAIN) {
1288 PostFatalEvent(session);
1289 }
1290 DestroyIovList(&unsent, session, threadIdx);
1291 return NULL;
1292 }
1293
CloseAddiSendThread(struct DFileSession * session)1294 static void CloseAddiSendThread(struct DFileSession *session)
1295 {
1296 if (session->sessionType == DFILE_SESSION_TYPE_SERVER || session->clientSendThreadNum <= 1) {
1297 return;
1298 }
1299 session->addiSenderCloseFlag = NSTACKX_TRUE;
1300 for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
1301 SemPost(&session->sendThreadPara[i].sendWait);
1302 SemPost(&session->sendThreadPara[i].semNewCycle);
1303 PthreadJoin(session->sendThreadPara[i].senderTid, NULL);
1304 SemDestroy(&session->sendThreadPara[i].sendWait);
1305 SemDestroy(&session->sendThreadPara[i].semNewCycle);
1306 }
1307 }
1308
ErrorHandle(struct DFileSession * session,uint16_t cnt)1309 static void ErrorHandle(struct DFileSession *session, uint16_t cnt)
1310 {
1311 while (cnt > 0) {
1312 SemPost(&session->sendThreadPara[cnt - 1].sendWait);
1313 SemPost(&session->sendThreadPara[cnt - 1].semNewCycle);
1314 PthreadJoin(session->sendThreadPara[cnt - 1].senderTid, NULL);
1315 SemDestroy(&session->sendThreadPara[cnt - 1].sendWait);
1316 SemDestroy(&session->sendThreadPara[cnt - 1].semNewCycle);
1317 cnt--;
1318 }
1319 }
1320
CreateAddiSendThread(struct DFileSession * session)1321 static int32_t CreateAddiSendThread(struct DFileSession *session)
1322 {
1323 uint16_t i;
1324 if (session->sessionType == DFILE_SESSION_TYPE_SERVER || session->clientSendThreadNum <= 1) {
1325 return NSTACKX_EOK;
1326 }
1327 session->addiSenderCloseFlag = NSTACKX_FALSE;
1328 for (i = 0; i < session->clientSendThreadNum - 1; i++) {
1329 SendThreadCtx *sendThreadCtx = (SendThreadCtx *)calloc(1, sizeof(SendThreadCtx));
1330 if (sendThreadCtx == NULL) {
1331 goto L_ERR_SENDER_THREAD;
1332 }
1333
1334 if (SemInit(&session->sendThreadPara[i].sendWait, 0, 0) != 0) {
1335 free(sendThreadCtx);
1336 goto L_ERR_SENDER_THREAD;
1337 }
1338
1339 if (SemInit(&session->sendThreadPara[i].semNewCycle, 0, 0) != 0) {
1340 SemDestroy(&session->sendThreadPara[i].sendWait);
1341 free(sendThreadCtx);
1342 goto L_ERR_SENDER_THREAD;
1343 }
1344
1345 sendThreadCtx->session = session;
1346 sendThreadCtx->threadIdx = i;
1347 if (PthreadCreate(&(session->sendThreadPara[i].senderTid), NULL, DFileAddiSenderHandle, sendThreadCtx)) {
1348 DFILE_LOGE(TAG, "Create sender thread failed");
1349 SemDestroy(&session->sendThreadPara[i].sendWait);
1350 SemDestroy(&session->sendThreadPara[i].semNewCycle);
1351 free(sendThreadCtx);
1352 goto L_ERR_SENDER_THREAD;
1353 }
1354 }
1355 return NSTACKX_EOK;
1356
1357 L_ERR_SENDER_THREAD:
1358 session->addiSenderCloseFlag = NSTACKX_TRUE;
1359 ErrorHandle(session, i);
1360 return NSTACKX_EFAILED;
1361 }
1362
UpdatePeerinfoQdiscInfo(PeerInfo * peerInfo,uint32_t qDiscLeft)1363 static void UpdatePeerinfoQdiscInfo(PeerInfo *peerInfo, uint32_t qDiscLeft)
1364 {
1365 if (peerInfo->qdiscMinLeft == 0) {
1366 peerInfo->qdiscMinLeft = (uint16_t)qDiscLeft;
1367 } else if (peerInfo->qdiscMinLeft > qDiscLeft) {
1368 peerInfo->qdiscMinLeft = (uint16_t)qDiscLeft;
1369 }
1370 if (peerInfo->qdiscMaxLeft < qDiscLeft) {
1371 peerInfo->qdiscMaxLeft = (uint16_t)qDiscLeft;
1372 }
1373 peerInfo->qdiscSearchNum++;
1374 peerInfo->qdiscAveLeft += qDiscLeft;
1375 }
1376
UpdatePeerinfoAmendSendrateByQdisc(PeerInfo * peerInfo)1377 static void UpdatePeerinfoAmendSendrateByQdisc(PeerInfo *peerInfo)
1378 {
1379 uint32_t qDiscLeft;
1380 uint32_t qDiscLeftSendRate;
1381
1382 if (GetQdiscLen(peerInfo->localInterfaceName, ROOT_QUEUE, &qDiscLeft) == NSTACKX_EOK) {
1383 if (peerInfo->mtuInuse == 0) {
1384 return;
1385 }
1386 uint32_t mtuNumInOneFrame = peerInfo->dataFrameSize / peerInfo->mtuInuse;
1387 if (mtuNumInOneFrame == 0) {
1388 mtuNumInOneFrame = 1;
1389 }
1390 UpdatePeerinfoQdiscInfo(peerInfo, qDiscLeft);
1391 qDiscLeftSendRate = qDiscLeft / mtuNumInOneFrame;
1392 if (peerInfo->sendRate >= qDiscLeftSendRate) {
1393 peerInfo->amendSendRate = (int32_t)qDiscLeftSendRate;
1394 } else {
1395 peerInfo->amendSendRate = peerInfo->sendRate;
1396 }
1397 }
1398 return;
1399 }
1400
UpdateInfoAfterSend(DFileSession * session,PeerInfo * peerInfo,int32_t curAmendSendRate,struct timespec * before,uint8_t socketIndex)1401 static void UpdateInfoAfterSend(DFileSession *session, PeerInfo *peerInfo, int32_t curAmendSendRate,
1402 struct timespec *before, uint8_t socketIndex)
1403 {
1404 session->cycleRunning[socketIndex] = NSTACKX_FALSE;
1405 if (curAmendSendRate > 0 && peerInfo->intervalSendCount >= (uint32_t)curAmendSendRate) {
1406 if (peerInfo->decreaseStatus == 1) {
1407 peerInfo->overRun = 0;
1408 peerInfo->decreaseStatus = 0;
1409 }
1410 peerInfo->overRun = CheckElapseTime(before, peerInfo->overRun);
1411 if (peerInfo->overRun == 0) {
1412 session->sleepTimes++;
1413 }
1414 }
1415 peerInfo->intervalSendCount = 0;
1416
1417 UpdatePeerinfoAmendSendrateByQdisc(peerInfo);
1418 }
1419
DFileSessionSendFrame(DFileSession * session,QueueNode ** preQueueNode,List * unsent,struct timespec * before,uint8_t socketIndex)1420 static int32_t DFileSessionSendFrame(DFileSession *session, QueueNode **preQueueNode, List *unsent,
1421 struct timespec *before, uint8_t socketIndex)
1422 {
1423 int32_t ret;
1424
1425 if (CapsTcp(session) && !ListIsEmpty(unsent)) {
1426 session->sendRemain = 1;
1427 ret = SendDataFrame(session, unsent, (uint32_t)(session->clientSendThreadNum - 1), socketIndex);
1428 session->sendRemain = 0;
1429 if ((ret == NSTACKX_EFAILED || ret == NSTACKX_EAGAIN) || session->closeFlag) {
1430 DFILE_LOGI(TAG, "ret is %d", ret);
1431 return ret;
1432 }
1433 }
1434
1435 ret = SendOutboundFrame(session, preQueueNode);
1436 if (session->sessionType != DFILE_SESSION_TYPE_CLIENT) {
1437 return ret;
1438 }
1439 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
1440 if (peerInfo == NULL) {
1441 DFILE_LOGE(TAG, "can't get valid peerinfo");
1442 return NSTACKX_EFAILED;
1443 }
1444 if (peerInfo->mtuInuse == 0) {
1445 return ret;
1446 }
1447
1448 if (ret == NSTACKX_EOK) {
1449 if (!session->cycleRunning[socketIndex]) {
1450 session->cycleRunning[socketIndex] = NSTACKX_TRUE;
1451 }
1452 ret = SendDataFrame(session, unsent, (uint32_t)(session->clientSendThreadNum - 1), socketIndex);
1453 }
1454 int32_t curAmendSendRate = peerInfo->amendSendRate;
1455 DFileSendCalculateRate(session, peerInfo);
1456
1457 if ((ret == NSTACKX_EFAILED || ret == NSTACKX_EAGAIN) || session->closeFlag) {
1458 DFILE_LOGI(TAG, "ret is %d and peerInfo->intervalSendCount is %u", ret, peerInfo->intervalSendCount);
1459 return ret;
1460 }
1461
1462 UpdateInfoAfterSend(session, peerInfo, curAmendSendRate, before, socketIndex);
1463
1464 for (uint16_t i = 0; i < session->clientSendThreadNum - 1; i++) {
1465 SemPost(&session->sendThreadPara[i].semNewCycle);
1466 }
1467 return ret;
1468 }
1469
WaitNewSendData(const QueueNode * queueNode,const List * unsent,DFileSession * session,uint8_t socketIndex)1470 static void WaitNewSendData(const QueueNode *queueNode, const List *unsent, DFileSession *session, uint8_t socketIndex)
1471 {
1472 if (queueNode == NULL && ListIsEmpty(unsent) && !FileManagerHasPendingData(session->fileManager) &&
1473 !session->outboundQueueSize) {
1474 NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
1475 SemWait(&session->outboundQueueWait[socketIndex]);
1476 }
1477 }
1478
DFileSenderPre(DFileSession * session,uint8_t socketIndex)1479 static void DFileSenderPre(DFileSession *session, uint8_t socketIndex)
1480 {
1481 SetSendThreadName((uint32_t)(session->clientSendThreadNum + socketIndex - 1));
1482 DFileSenderUpdateMeasureTime(session, socketIndex);
1483 SetMaximumPriorityForThread();
1484 uint32_t pos = (uint32_t)(session->clientSendThreadNum - 1 + socketIndex + POS_SEND_THERD_START);
1485 if (pos >= POS_SEND_THERD_START + NSTACKX_MAX_CLIENT_SEND_THREAD_NUM) {
1486 return;
1487 }
1488
1489 SetTidToBindInfo(session, pos);
1490 }
1491
DFileSenderClose(DFileSession * session,QueueNode * queueNode,List * unsent,void * arg)1492 void DFileSenderClose(DFileSession *session, QueueNode *queueNode, List *unsent, void *arg)
1493 {
1494 DFILE_LOGI(TAG, "DFileSendCalculateRate: total send block num %llu.", session->totalSendBlocks);
1495 CloseAddiSendThread(session);
1496 DestroyIovList(unsent, session, session->clientSendThreadNum - 1U);
1497 DestroyQueueNode(queueNode);
1498 free(arg);
1499 return;
1500 }
1501
DFileSenderHandle(void * arg)1502 void *DFileSenderHandle(void *arg)
1503 {
1504 DFileSession *session = ((SenderThreadPara*)arg)->session;
1505 uint8_t socketIndex = ((SenderThreadPara*)arg)->socketIndex;
1506 QueueNode *queueNode = NULL;
1507 List unsent;
1508 int32_t ret = NSTACKX_EOK;
1509 struct timespec before;
1510 uint8_t canWrite = NSTACKX_FALSE;
1511 uint32_t socketWaitMs = GetSocketWaitMs(session->clientSendThreadNum);
1512 uint8_t isBind = NSTACKX_FALSE;
1513
1514 if (CreateAddiSendThread(session) != NSTACKX_EOK) {
1515 PostFatalEvent(session);
1516 return NULL;
1517 }
1518 ListInitHead(&unsent);
1519 DFileSenderPre(session, socketIndex);
1520 while (!session->closeFlag) {
1521 WaitNewSendData(queueNode, &unsent, session, socketIndex);
1522 if (session->closeFlag) {
1523 break;
1524 }
1525 if (!session->cycleRunning[socketIndex]) {
1526 ClockGetTime(CLOCK_MONOTONIC, &before);
1527 }
1528 if (ret == NSTACKX_EAGAIN) {
1529 ret = WaitSocketEvent(NULL, session->socket[socketIndex]->sockfd, socketWaitMs, NULL, &canWrite);
1530 if (ret != NSTACKX_EOK || session->closeFlag) {
1531 break;
1532 }
1533 if (!canWrite) {
1534 ret = NSTACKX_EAGAIN;
1535 continue;
1536 }
1537 }
1538 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT && isBind == NSTACKX_FALSE &&
1539 session->transFlag == NSTACKX_TRUE) {
1540 BindClientSendThreadToTargetCpu(0);
1541 isBind = NSTACKX_TRUE;
1542 }
1543 ret = DFileSessionSendFrame(session, &queueNode, &unsent, &before, socketIndex);
1544 if (ret < 0 && ret != NSTACKX_EAGAIN) {
1545 PeerShuttedEvent();
1546 PostFatalEvent(session);
1547 break;
1548 }
1549 }
1550 DFileSenderClose(session, queueNode, &unsent, arg);
1551 return NULL;
1552 }
1553
ClearDFileFrameList(List * head)1554 static void ClearDFileFrameList(List *head)
1555 {
1556 if (head == NULL || ListIsEmpty(head)) {
1557 return;
1558 }
1559 List *tmp = NULL;
1560 List *pos = NULL;
1561 LIST_FOR_EACH_SAFE(pos, tmp, head) {
1562 QueueNode *node = (QueueNode *)pos;
1563 ListRemoveNode(&node->list);
1564 DestroyQueueNode(node);
1565 }
1566 }
1567
CheckDfileType(uint8_t type)1568 static inline int32_t CheckDfileType(uint8_t type)
1569 {
1570 if (type >= NSTACKX_DFILE_TYPE_MAX || type < NSTACKX_DFILE_FILE_HEADER_FRAME) {
1571 return NSTACKX_EFAILED;
1572 }
1573 return NSTACKX_EOK;
1574 }
1575
CheckSessionType(DFileSessionType type)1576 static inline int32_t CheckSessionType(DFileSessionType type)
1577 {
1578 if (type < DFILE_SESSION_TYPE_CLIENT || type > DFILE_SESSION_TYPE_SERVER) {
1579 return NSTACKX_EFAILED;
1580 }
1581 return NSTACKX_EOK;
1582 }
1583
ProcessDFileFrameList(DFileSession * session,List * head)1584 static void ProcessDFileFrameList(DFileSession *session, List *head)
1585 {
1586 QueueNode *queueNode = NULL;
1587 DFileFrame *dFileFrame = NULL;
1588 struct sockaddr_in *peerAddr = NULL;
1589 int32_t handleFrameRet = NSTACKX_EOK;
1590 while (!ListIsEmpty(head) && !session->closeFlag) {
1591 queueNode = (QueueNode *)ListPopFront(head);
1592 if (queueNode == NULL) {
1593 continue;
1594 }
1595 dFileFrame = (DFileFrame *)(queueNode->frame);
1596 peerAddr = &queueNode->peerAddr;
1597 uint8_t type = dFileFrame->header.type;
1598
1599 if (ntohs(dFileFrame->header.length) > NSTACKX_MAX_FRAME_SIZE - sizeof(DFileFrameHeader)) {
1600 DFILE_LOGE(TAG, "header length %u is too big", ntohs(dFileFrame->header.length));
1601 DestroyQueueNode(queueNode);
1602 continue;
1603 }
1604 if (CheckDfileType(dFileFrame->header.type) != NSTACKX_EOK ||
1605 CheckSessionType(session->sessionType) != NSTACKX_EOK) {
1606 handleFrameRet = NSTACKX_EFAILED;
1607 } else if (dFileFrame->header.type == NSTACKX_DFILE_SETTING_FRAME) {
1608 DFileSessionHandleSetting(session, dFileFrame, peerAddr, queueNode->socketIndex);
1609 } else if (dFileFrame->header.type == NSTACKX_DFILE_RST_FRAME &&
1610 dFileFrame->header.transId == 0) {
1611 DFileSessionHandleRst(session, dFileFrame, peerAddr);
1612 } else if (dFileFrame->header.type == NSTACKX_DFILE_FILE_BACK_PRESSURE_FRAME) {
1613 DFileSessionHandleBackPressure(session, dFileFrame, peerAddr);
1614 } else {
1615 /*
1616 * For NSTACKX_DFILE_FILE_DATA_FRAME, "dFileFrame" may be free when handling failed, and become invalid.
1617 */
1618 handleFrameRet = DFileSessionHandleFrame(session, dFileFrame, peerAddr);
1619 }
1620
1621 if (handleFrameRet != NSTACKX_EOK || type != NSTACKX_DFILE_FILE_DATA_FRAME) {
1622 /* For FILE_DATA frame, the memory is passed to file manager. */
1623 free(queueNode->frame);
1624 queueNode->frame = NULL;
1625 }
1626 free(queueNode);
1627 }
1628 ClearDFileFrameList(head);
1629 }
1630
ReadEventHandle(void * arg)1631 static void ReadEventHandle(void *arg)
1632 {
1633 DFileSession *session = arg;
1634 List newHead;
1635 ListInitHead(&newHead);
1636 struct timespec before, now;
1637 if (!session->partReadFlag) {
1638 NSTACKX_ATOM_FETCH_DEC(&session->unprocessedReadEventCount);
1639 } else {
1640 ClockGetTime(CLOCK_MONOTONIC, &before);
1641 }
1642 while (session->inboundQueueSize && !session->closeFlag) {
1643 if (session->partReadFlag) {
1644 ClockGetTime(CLOCK_MONOTONIC, &now);
1645 if (GetTimeDiffMs(&now, &before) >= DEFAULT_WAIT_TIME_MS) {
1646 break;
1647 }
1648 }
1649 if (PthreadMutexLock(&session->inboundQueueLock) != 0) {
1650 DFILE_LOGE(TAG, "PthreadMutexLock error");
1651 return;
1652 }
1653 ListMove(&session->inboundQueue, &newHead);
1654 session->recvBlockNumInner += session->inboundQueueSize;
1655 session->inboundQueueSize = 0;
1656 if (PthreadMutexUnlock(&session->inboundQueueLock) != 0) {
1657 DFILE_LOGE(TAG, "PthreadMutexUnlock error");
1658 break;
1659 }
1660 ProcessDFileFrameList(session, &newHead);
1661 }
1662 ClearDFileFrameList(&newHead);
1663 }
1664
DFileAddInboundQueue(DFileSession * session,const uint8_t * frame,size_t frameLength,struct sockaddr_in * peerAddr,uint8_t socketIndex)1665 static int32_t DFileAddInboundQueue(DFileSession *session, const uint8_t *frame, size_t frameLength,
1666 struct sockaddr_in *peerAddr, uint8_t socketIndex)
1667 {
1668 if (session->inboundQueueSize > MAX_RECVBUF_COUNT) {
1669 if (session->inboundQueueSize % MAX_NOMEM_PRINT == 0) {
1670 DFILE_LOGI(TAG, "no mem inboundQueueSize:%llu", session->inboundQueueSize);
1671 }
1672 return NSTACKX_ENOMEM;
1673 }
1674 QueueNode *queueNode = CreateQueueNode(frame, frameLength, peerAddr, socketIndex);
1675 if (queueNode == NULL) {
1676 return NSTACKX_ENOMEM;
1677 }
1678
1679 if (PthreadMutexLock(&session->inboundQueueLock) != 0) {
1680 DestroyQueueNode(queueNode);
1681 return NSTACKX_EFAILED;
1682 }
1683 ListInsertTail(&session->inboundQueue, &queueNode->list);
1684 session->inboundQueueSize++;
1685 session->recvBlockNumDirect++;
1686
1687 if (PthreadMutexUnlock(&session->inboundQueueLock) != 0) {
1688 /* queue node is pushed to list, don't need destroy here. */
1689 return NSTACKX_EFAILED;
1690 }
1691
1692 return NSTACKX_EOK;
1693 }
1694
DFileReceiverUpdateSessionMeasureTime(DFileSession * session)1695 static void DFileReceiverUpdateSessionMeasureTime(DFileSession *session)
1696 {
1697 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
1698 ClockGetTime(CLOCK_MONOTONIC, &session->measureBefore);
1699 }
1700 }
1701
BindServerRecvThreadToTargetCpu(DFileSession * session)1702 static void BindServerRecvThreadToTargetCpu(DFileSession *session)
1703 {
1704 int32_t cpu;
1705 int32_t cpus = GetCpuNum();
1706 if (session->sessionType != DFILE_SESSION_TYPE_SERVER) {
1707 cpu = CPU_IDX_2;
1708 StartThreadBindCore(cpu);
1709 return;
1710 }
1711 if (cpus >= FIRST_CPU_NUM_LEVEL) {
1712 return;
1713 } else if (cpus >= SECOND_CPU_NUM_LEVEL) {
1714 cpu = CPU_IDX_1;
1715 } else if (cpus >= THIRD_CPU_NUM_LEVEL) {
1716 cpu = CPU_IDX_0;
1717 } else {
1718 return;
1719 }
1720 StartThreadBindCore(cpu);
1721 }
1722
PostReadEventToMainLoop(DFileSession * session)1723 static void PostReadEventToMainLoop(DFileSession *session)
1724 {
1725 if (NSTACKX_ATOM_FETCH(&(session->unprocessedReadEventCount)) >= MAX_UNPROCESSED_READ_EVENT_COUNT) {
1726 return;
1727 }
1728 NSTACKX_ATOM_FETCH_INC(&session->unprocessedReadEventCount);
1729 if (PostEvent(&session->eventNodeChain, session->epollfd, ReadEventHandle, session) != NSTACKX_EOK) {
1730 DFILE_LOGE(TAG, "post read event failed");
1731 NSTACKX_ATOM_FETCH_DEC(&session->unprocessedReadEventCount);
1732 session->mainLoopActiveReadFlag = NSTACKX_TRUE;
1733 } else {
1734 session->mainLoopActiveReadFlag = NSTACKX_FALSE;
1735 }
1736 }
1737
DFileSessionHandleReadBuffer(DFileSession * session,const uint8_t * buf,size_t bufLen,struct sockaddr_in * peerAddr,uint8_t socketIndex)1738 int32_t DFileSessionHandleReadBuffer(DFileSession *session, const uint8_t *buf, size_t bufLen,
1739 struct sockaddr_in *peerAddr, uint8_t socketIndex)
1740 {
1741 DFileFrame *dFileFrame = NULL;
1742 if (DecodeDFileFrame(buf, bufLen, &dFileFrame) != NSTACKX_EOK) {
1743 /* discard packet with non-zero trans id during cancel stage */
1744 DFILE_LOGE(TAG, "drop malformed frame");
1745 return NSTACKX_EOK;
1746 }
1747
1748 int32_t ret = DFileAddInboundQueue(session, buf, bufLen, peerAddr, socketIndex);
1749 if (ret == NSTACKX_ENOMEM) {
1750 return NSTACKX_EOK;
1751 }
1752 if (ret != NSTACKX_EOK) {
1753 DFILE_LOGE(TAG, "frame add in bound queue failed :%d", ret);
1754 return NSTACKX_EFAILED;
1755 }
1756 PostReadEventToMainLoop(session);
1757 DFileRecvCalculateRate(session, dFileFrame, peerAddr);
1758 return NSTACKX_EOK;
1759 }
1760
DFileRecverPre(DFileSession * session)1761 static void DFileRecverPre(DFileSession *session)
1762 {
1763 SetThreadName(DFFILE_RECV_THREAD_NAME);
1764 DFileReceiverUpdateSessionMeasureTime(session);
1765 SetMaximumPriorityForThread();
1766 SetTidToBindInfo(session, POS_RECV_THERD_START);
1767 }
1768
RcverWaitSocket(DFileSession * session,uint8_t * canRead)1769 int32_t RcverWaitSocket(DFileSession *session, uint8_t *canRead)
1770 {
1771 if (session->acceptFlag == 0) {
1772 return WaitSocketEvent(session, session->socket[0]->sockfd, DEFAULT_WAIT_TIME_MS, canRead, NULL);
1773 } else {
1774 return WaitSocketEvent(session, session->acceptSocket->sockfd, DEFAULT_WAIT_TIME_MS, canRead, NULL);
1775 }
1776 }
1777
DFileSocketRecv(DFileSession * session)1778 int32_t DFileSocketRecv(DFileSession *session)
1779 {
1780 return DFileSocketRecvSP(session);
1781 }
1782
DFileAcceptSocket(DFileSession * session)1783 int32_t DFileAcceptSocket(DFileSession *session)
1784 {
1785 session->acceptSocket = AcceptSocket(session->socket[0]);
1786 if (session->acceptSocket == NULL) {
1787 DFILE_LOGI(TAG, "accept socket failed");
1788 return NSTACKX_EFAILED;
1789 } else {
1790 DFILE_LOGI(TAG, "accept socket success");
1791 session->acceptFlag = 1;
1792 SetTcpKeepAlive(session->acceptSocket->sockfd);
1793 }
1794
1795 AcceptSocketEvent();
1796
1797 return NSTACKX_EOK;
1798 }
1799
DFileReceiverHandle(void * arg)1800 void *DFileReceiverHandle(void *arg)
1801 {
1802 DFileSession *session = arg;
1803 uint8_t canRead = NSTACKX_FALSE;
1804 int32_t ret = NSTACKX_EAGAIN;
1805 uint8_t isBind = NSTACKX_FALSE;
1806
1807 DFILE_LOGI(TAG, "recv thread start");
1808 DFileRecverPre(session);
1809 while (!session->closeFlag) {
1810 if (ret == NSTACKX_EAGAIN || !canRead) {
1811 ret = RcverWaitSocket(session, &canRead);
1812 if (ret != NSTACKX_EOK || session->closeFlag) {
1813 break;
1814 }
1815 }
1816 if (!canRead) {
1817 continue;
1818 }
1819 if (isBind == NSTACKX_FALSE && session->transFlag == NSTACKX_TRUE) {
1820 BindServerRecvThreadToTargetCpu(session);
1821 isBind = NSTACKX_TRUE;
1822 }
1823
1824 ret = DFileSocketRecv(session);
1825 if (ret != NSTACKX_EAGAIN && ret != NSTACKX_EOK) {
1826 PeerShuttedEvent();
1827 break;
1828 }
1829 }
1830 DFILE_LOGI(TAG, "Total recv blocks: direct %llu inner %llu", session->recvBlockNumDirect,
1831 session->recvBlockNumInner);
1832 if (ret < 0 && ret != NSTACKX_EAGAIN && ret != NSTACKX_PEER_CLOSE) {
1833 PostFatalEvent(session);
1834 }
1835
1836 DFILE_LOGI(TAG, "session %u Exit receiver thread ret %d", session->sessionId, ret);
1837 return NULL;
1838 }
1839
DFileControlHandle(void * arg)1840 void *DFileControlHandle(void *arg)
1841 {
1842 SetThreadName(DFFILE_CONTROL_THREAD_NAME);
1843 DFileSession *session = arg;
1844 if (session->sessionType == DFILE_SESSION_TYPE_CLIENT) {
1845 DFileSenderControlHandle(session);
1846 } else {
1847 DFileReceiverControlHandle(session);
1848 }
1849 return NULL;
1850 }
1851
RealPathFileName(FileListInfo * fileListInfo)1852 static int32_t RealPathFileName(FileListInfo *fileListInfo)
1853 {
1854 uint32_t i;
1855 int32_t ret = NSTACKX_EOK;
1856 for (i = 0; i < fileListInfo->fileNum; i++) {
1857 char *tmpFileName = fileListInfo->files[i];
1858 char *tmpFileNameRes = realpath(tmpFileName, NULL);
1859 if (tmpFileNameRes == NULL) {
1860 ret = NSTACKX_EFAILED;
1861 break;
1862 }
1863 fileListInfo->files[i] = tmpFileNameRes;
1864 free(tmpFileName);
1865 }
1866 if (ret != NSTACKX_EOK) {
1867 DFILE_LOGE(TAG, "realpath failed");
1868 }
1869 return ret;
1870 }
1871
FreeTransFileListInfo(FileListInfo * fileListInfo)1872 static void FreeTransFileListInfo(FileListInfo *fileListInfo)
1873 {
1874 free(fileListInfo->files);
1875 fileListInfo->files = NULL;
1876 if (fileListInfo->remotePath != NULL) {
1877 free(fileListInfo->remotePath);
1878 fileListInfo->remotePath = NULL;
1879 }
1880 free(fileListInfo);
1881 }
1882
DFileStartTransInner(DFileSession * session,FileListInfo * fileListInfo)1883 static int32_t DFileStartTransInner(DFileSession *session, FileListInfo *fileListInfo)
1884 {
1885 uint16_t transId = session->lastDFileTransId + 1;
1886 if (transId == 0) { /* overflow */
1887 transId = 1;
1888 }
1889
1890 PeerInfo *peerInfo = TransSelectPeerInfo(session);
1891 DFileTrans *trans = CreateTrans(transId, session, peerInfo, NSTACKX_TRUE);
1892 if (trans == NULL) {
1893 DFILE_LOGE(TAG, "CreateTrans error");
1894 return NSTACKX_ENOMEM;
1895 }
1896
1897 if (DFileTransSetMtu(trans, peerInfo->mtuInuse) != NSTACKX_EOK) {
1898 DFILE_LOGE(TAG, "set trans mtu failed");
1899 }
1900 if (RealPathFileName(fileListInfo) != NSTACKX_EOK) {
1901 DFileTransDestroy(trans);
1902 return NSTACKX_EFAILED;
1903 }
1904 int32_t ret = DFileTransSendFiles(trans, fileListInfo);
1905 if (ret != NSTACKX_EOK) {
1906 DFileTransDestroy(trans);
1907 DFILE_LOGE(TAG, "DFileTransSendFiles fail, error: %d", ret);
1908 return ret;
1909 }
1910 ret = DFileTransAddExtraInfo(trans, fileListInfo->pathType, fileListInfo->noticeFileNameType,
1911 fileListInfo->userData);
1912 if (ret != NSTACKX_EOK) {
1913 DFILE_LOGE(TAG, "DFileTransAddExtraInfo fail");
1914 DFileTransDestroy(trans);
1915 return NSTACKX_EFAILED;
1916 }
1917 trans->fileList->tarFlag = fileListInfo->tarFlag;
1918 trans->fileList->smallFlag = fileListInfo->smallFlag;
1919 trans->fileList->tarFile = fileListInfo->tarFile;
1920 trans->fileList->noSyncFlag = fileListInfo->noSyncFlag;
1921
1922 fileListInfo->userData = NULL;
1923 ListInsertTail(&(session->dFileTransChain), &(trans->list));
1924 session->lastDFileTransId = transId;
1925 if (fileListInfo->smallFlag == NSTACKX_TRUE) {
1926 session->smallListProcessingCnt++;
1927 } else {
1928 session->fileListProcessingCnt++;
1929 }
1930 /* Elements in ctx->fileListInfo->files[] are reused by dFileTranns, so don't need to free. */
1931 FreeTransFileListInfo(fileListInfo);
1932 return NSTACKX_EOK;
1933 }
1934
DFileStartTrans(DFileSession * session,FileListInfo * fileListInfo)1935 int32_t DFileStartTrans(DFileSession *session, FileListInfo *fileListInfo)
1936 {
1937 if (PthreadMutexLock(&session->transIdLock) != 0) {
1938 DFILE_LOGE(TAG, "pthread mutex lock error");
1939 return NSTACKX_EFAILED;
1940 }
1941 int32_t ret = DFileStartTransInner(session, fileListInfo);
1942 if (PthreadMutexUnlock(&session->transIdLock) != 0) {
1943 DFILE_LOGE(TAG, "pthread mutex unlock error");
1944 }
1945 return ret;
1946 }
1947
TerminateMainThreadInner(void * arg)1948 void TerminateMainThreadInner(void *arg)
1949 {
1950 DFileSession *session = (DFileSession *)arg;
1951 DFileSessionSetTerminateFlag(session);
1952 }
1953
StartDFileThreadsInner(DFileSession * session)1954 int32_t StartDFileThreadsInner(DFileSession *session)
1955 {
1956 if (PthreadCreate(&(session->tid), NULL, DFileMainLoop, session)) {
1957 DFILE_LOGE(TAG, "Create mainloop thread failed");
1958 goto L_ERR_MAIN_LOOP_THREAD;
1959 }
1960
1961 if (CreateSenderThread(session) != NSTACKX_EOK) {
1962 goto L_ERR_SENDER_THREAD;
1963 }
1964
1965 if (PthreadCreate(&(session->receiverTid), NULL, DFileReceiverHandle, session)) {
1966 DFILE_LOGE(TAG, "Create receiver thread failed");
1967 goto L_ERR_RECEIVER_THREAD;
1968 }
1969
1970 if (PthreadCreate(&(session->controlTid), NULL, DFileControlHandle, session)) {
1971 DFILE_LOGE(TAG, "Create control thread failed");
1972 goto L_ERR_CONTROL_THREAD;
1973 }
1974 return NSTACKX_EOK;
1975 L_ERR_CONTROL_THREAD:
1976 DFileSessionSetTerminateFlag(session);
1977 PthreadJoin(session->controlTid, NULL);
1978 session->receiverTid = INVALID_TID;
1979 L_ERR_RECEIVER_THREAD:
1980 DFileSessionSetTerminateFlag(session);
1981 PthreadJoin(session->senderTid[0], NULL);
1982 session->senderTid[0] = INVALID_TID;
1983 PostOutboundQueueWait(session);
1984 L_ERR_SENDER_THREAD:
1985 DFileSessionSetTerminateFlag(session);
1986 if (PostEvent(&session->eventNodeChain, session->epollfd, TerminateMainThreadInner, session) != NSTACKX_EOK) {
1987 DFILE_LOGE(TAG, "post terminate thread failed");
1988 }
1989 PthreadJoin(session->tid, NULL);
1990 session->tid = INVALID_TID;
1991 L_ERR_MAIN_LOOP_THREAD:
1992 return NSTACKX_EFAILED;
1993 }
1994
FileManagerMsgHandle(FileManagerMsgType msgType,int32_t errCode,void * context)1995 static void FileManagerMsgHandle(FileManagerMsgType msgType, int32_t errCode, void *context)
1996 {
1997 DFileSession *session = context;
1998 if (msgType == FILE_MANAGER_INNER_ERROR) {
1999 DFILE_LOGE(TAG, "Session (%u) fatal error -- File Manager error: %d", session->sessionId, errCode);
2000 PostFatalEvent(session);
2001 }
2002
2003 if (msgType == FILE_MANAGER_IN_PROGRESS) {
2004 NoticeSessionProgress(session);
2005 }
2006 }
2007
CreateFileManager(DFileSession * session,const uint8_t * key,uint32_t keyLen,uint8_t isSender,uint16_t connType)2008 int32_t CreateFileManager(DFileSession *session, const uint8_t *key, uint32_t keyLen, uint8_t isSender,
2009 uint16_t connType)
2010 {
2011 FileManagerMsgPara msgPara;
2012 if (session == NULL) {
2013 DFILE_LOGE(TAG, "invalid input");
2014 return NSTACKX_EINVAL;
2015 }
2016 if (isSender && (connType != CONNECT_TYPE_P2P && connType != CONNECT_TYPE_WLAN)) {
2017 DFILE_LOGE(TAG, "connType for sender is illagal");
2018 return NSTACKX_EINVAL;
2019 }
2020 if (keyLen > 0) {
2021 if ((keyLen != AES_128_KEY_LENGTH && keyLen != CHACHA20_KEY_LENGTH) || key == NULL) {
2022 DFILE_LOGE(TAG, "error key or key len");
2023 return NSTACKX_EFAILED;
2024 }
2025 if (!IsCryptoIncluded()) {
2026 DFILE_LOGE(TAG, "crypto is not opened");
2027 return NSTACKX_EFAILED;
2028 }
2029 }
2030 msgPara.epollfd = session->epollfd;
2031 msgPara.eventNodeChain = &session->eventNodeChain;
2032 msgPara.msgReceiver = FileManagerMsgHandle;
2033 msgPara.context = session;
2034 session->fileManager = FileManagerCreate(isSender, &msgPara, key, keyLen, connType);
2035 if (session->fileManager == NULL) {
2036 DFILE_LOGE(TAG, "create filemanager failed");
2037 return NSTACKX_EFAILED;
2038 }
2039 return NSTACKX_EOK;
2040 }
2041
DestroyReceiverPipe(DFileSession * session)2042 void DestroyReceiverPipe(DFileSession *session)
2043 {
2044 if (session->receiverPipe[PIPE_OUT] != INVALID_PIPE_DESC) {
2045 CloseDesc(session->receiverPipe[PIPE_OUT]);
2046 session->receiverPipe[PIPE_OUT] = INVALID_PIPE_DESC;
2047 }
2048 if (session->receiverPipe[PIPE_IN] != INVALID_PIPE_DESC) {
2049 CloseDesc(session->receiverPipe[PIPE_IN]);
2050 session->receiverPipe[PIPE_IN] = INVALID_PIPE_DESC;
2051 }
2052 }
2053
DFileSetEvent(void * softObj,DFileEventFunc func)2054 void DFileSetEvent(void *softObj, DFileEventFunc func)
2055 {
2056 g_dfileEventFunc = func;
2057 (void)softObj;
2058 }
2059
NSTACKX_DFileAssembleFunc(void * softObj,const DFileEvent * info)2060 void NSTACKX_DFileAssembleFunc(void *softObj, const DFileEvent *info)
2061 {
2062 if (g_dfileEventFunc != NULL) {
2063 g_dfileEventFunc(softObj, info);
2064 }
2065 }
2066