1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "nstackx_dfile_session.h"
17
18 #include "nstackx_dfile_log.h"
19 #include "nstackx_socket.h"
20
21 #define TAG "nStackXDFile"
22
23 #define WAIT_DATA_FRAME_WAIT_US 5 /* Spend 5us to read one file data frame. */
24
25 #define MAX_NR_IOVCNT 20
26 #define MAX_UDP_PAYLOAD 65507
27
GetIovListSize(void)28 static inline uint32_t GetIovListSize(void)
29 {
30 // updated this value from 40 to 20 now
31 return MAX_NR_IOVCNT;
32 }
33
AllocIovList(List * head)34 static int32_t AllocIovList(List *head)
35 {
36 uint32_t size = GetIovListSize();
37 IovList *ptr = malloc(sizeof(IovList) * size);
38 if (ptr == NULL) {
39 return NSTACKX_ENOMEM;
40 }
41 for (uint32_t i = 0; i < size; i++) {
42 ptr[i].addr = NULL;
43 ptr[i].len = 0;
44 ListInsertTail(head, &ptr[i].entry);
45 }
46 return NSTACKX_EOK;
47 }
48
49 #ifndef BUILD_FOR_WINDOWS
50 __attribute__((unused))
51 #endif
GetFreeIovList(DFileSession * s,int32_t tid)52 static IovList *GetFreeIovList(DFileSession *s, int32_t tid)
53 {
54 List *p = &s->freeIovList[tid];
55 List *q = NULL;
56
57 if (ListIsEmpty(p)) {
58 int32_t err = AllocIovList(p);
59 if (err != NSTACKX_EOK) {
60 return NULL;
61 }
62 }
63
64 q = ListPopFront(p);
65 return (IovList *)q;
66 }
67
DestroyIovList(const List * head,DFileSession * s,uint32_t tid)68 void DestroyIovList(const List *head, DFileSession *s, uint32_t tid)
69 {
70 List *p = NULL;
71 List *n = NULL;
72 BlockFrame *block = NULL;
73
74 (void)s;
75 (void)tid;
76 LIST_FOR_EACH_SAFE(p, n, head) {
77 block = (BlockFrame *)p;
78 ListRemoveNode(p);
79 free(block->fileDataFrame);
80 free(block);
81 }
82 }
83
TcpSendFileDataFrame(Socket * socket,PeerInfo * peerInfo,List * p,BlockFrame * block,uint16_t len)84 static int32_t TcpSendFileDataFrame(Socket *socket, PeerInfo *peerInfo, List *p, BlockFrame *block, uint16_t len)
85 {
86 int32_t ret;
87 DFileSession *session = peerInfo->session;
88
89 ret = SocketSend(socket, (uint8_t *)block->fileDataFrame + block->sendLen, len - block->sendLen);
90 if (ret > 0 && ret == (int32_t)(len - block->sendLen)) {
91 block->sendLen = 0;
92 ListRemoveNode(p);
93 free(block->fileDataFrame);
94 free(block);
95 NSTACKX_ATOM_FETCH_INC(&peerInfo->sendCount);
96 NSTACKX_ATOM_FETCH_INC(&peerInfo->intervalSendCount);
97 NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
98 } else if (ret > 0) {
99 NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
100 block->sendLen = block->sendLen + (uint32_t)ret;
101 ret = NSTACKX_EAGAIN;
102 } else if (errno == EAGAIN) {
103 NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
104 ret = NSTACKX_EAGAIN;
105 } else {
106 DFILE_LOGE(TAG, "socket send failed ret is %d errno is %d", ret, errno);
107 ret = NSTACKX_EFAILED;
108 }
109
110 return ret;
111 }
112
UdpSendFileDataSuccess(DFileSession * session,PeerInfo * peerInfo,List * p,FileDataFrameZS * f,BlockFrame * block)113 static void UdpSendFileDataSuccess(DFileSession *session, PeerInfo *peerInfo, List *p, FileDataFrameZS *f,
114 BlockFrame *block)
115 {
116 ListRemoveNode(p);
117 free(f);
118 free(block);
119 NSTACKX_ATOM_FETCH_INC(&peerInfo->sendCount);
120 NSTACKX_ATOM_FETCH_INC(&peerInfo->intervalSendCount);
121 NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
122 }
123
SendFileDataFrame(DFileSession * session,PeerInfo * peerInfo,List * head,uint32_t tid)124 static int32_t SendFileDataFrame(DFileSession *session, PeerInfo *peerInfo, List *head, uint32_t tid)
125 {
126 List *p = NULL;
127 List *n = NULL;
128 BlockFrame *block = NULL;
129 FileDataFrameZS *f = NULL;
130 int32_t ret;
131 uint16_t len;
132 Socket *socket = session->socket[0];
133
134 if (CapsTcp(session) && (session->sessionType == DFILE_SESSION_TYPE_SERVER)) {
135 socket = session->acceptSocket;
136 }
137
138 LIST_FOR_EACH_SAFE(p, n, head) {
139 block = (BlockFrame *)p;
140 f = (FileDataFrameZS *)(void *)block->fileDataFrame;
141 len = ntohs(f->header.length) + DFILE_FRAME_HEADER_LEN;
142 if (CapsTcp(session)) {
143 ret = TcpSendFileDataFrame(socket, peerInfo, p, block, len);
144 if (ret == NSTACKX_EFAILED) {
145 break;
146 } else if (ret == NSTACKX_EAGAIN) {
147 return ret;
148 }
149 } else {
150 ret = SocketSend(session->socket[peerInfo->socketIndex], (void *)f, len);
151 if (ret > 0) {
152 UdpSendFileDataSuccess(session, peerInfo, p, f, block);
153 } else if (ret == NSTACKX_EAGAIN) {
154 NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
155 return ret;
156 } else {
157 DFILE_LOGE(TAG, "socket sendto failed");
158 break;
159 }
160 }
161 }
162
163 DestroyIovList(head, session, tid);
164
165 return ret;
166 }
167
SendFileDataFrameEx(DFileSession * session,PeerInfo * peerInfo,List * head,uint32_t tid)168 static int32_t SendFileDataFrameEx(DFileSession *session, PeerInfo *peerInfo, List *head, uint32_t tid)
169 {
170 return SendFileDataFrame(session, peerInfo, head, tid);
171 }
172
CheckUnsentList(List * unsent,List * head,int32_t maxCount)173 static int32_t CheckUnsentList(List *unsent, List *head, int32_t maxCount)
174 {
175 int32_t cnt = 0;
176
177 ListInitHead(head);
178 while (cnt < maxCount && !ListIsEmpty(unsent)) {
179 List *p = ListPopFront(unsent);
180 if (p == NULL) {
181 break;
182 }
183 ListInsertTail(head, p);
184 cnt++;
185 }
186
187 return cnt;
188 }
189
GetMaxSendCount(void)190 static int32_t GetMaxSendCount(void)
191 {
192 return 1;
193 }
194
DoSendDataFrame(DFileSession * session,List * head,int32_t count,uint32_t tid,uint8_t socketIndex)195 static int32_t DoSendDataFrame(DFileSession *session, List *head, int32_t count, uint32_t tid, uint8_t socketIndex)
196 {
197 BlockFrame *block = NULL;
198 int32_t ret;
199 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
200 if (!peerInfo) {
201 return NSTACKX_EFAILED;
202 }
203 int32_t maxCount = GetMaxSendCount();
204 int32_t flag;
205 do {
206 while (count < maxCount && FileManagerHasPendingData(session->fileManager)) {
207 ret = FileManagerFileRead(session->fileManager, tid, &block, maxCount - count);
208 if (ret < 0) {
209 DFILE_LOGE(TAG, "FileManagerFileRead failed %d", ret);
210 break;
211 }
212 if (ret == 0) {
213 NSTACKX_ATOM_FETCH_INC(&session->sendBlockListEmptyTimes);
214 (void)usleep(WAIT_DATA_FRAME_WAIT_US);
215 continue;
216 }
217 while (block) {
218 List *next = block->list.next;
219 ListInsertTail(head, &block->list);
220 block = (BlockFrame *)(void *)next;
221 }
222 count += ret;
223 }
224
225 if (count == 0) {
226 NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
227 ret = NSTACKX_EOK;
228 break;
229 }
230 ret = SendFileDataFrameEx(session, peerInfo, head, tid);
231 if (ret <= 0) {
232 break;
233 }
234
235 count = 0;
236 maxCount = GetMaxSendCount();
237 flag = CapsTcp(session) ? (session->sendRemain ? 0 : 1) :
238 (peerInfo->intervalSendCount < (uint16_t)peerInfo->amendSendRate && !session->closeFlag);
239 } while (flag && (session->stopSendCnt[tid] == 0));
240 return ret;
241 }
242
243
244 /*
245 * * if backpress frame count is not zero then sleep one ack interval and update stopSendCnt
246 * * if backpress frame count is zero then send packet normally
247 * */
CheckSendByBackPress(DFileSession * session,uint32_t tid,uint8_t socketIndex)248 static void CheckSendByBackPress(DFileSession *session, uint32_t tid, uint8_t socketIndex)
249 {
250 uint32_t fileProcessCnt;
251 uint32_t sleepTime;
252 uint32_t stopCnt;
253 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
254 if (peerInfo == NULL) {
255 return;
256 }
257
258 if (session->stopSendCnt[tid] != 0) {
259 if (PthreadMutexLock(&session->backPressLock) != 0) {
260 DFILE_LOGE(TAG, "pthread backPressLock mutex lock failed");
261 return;
262 }
263
264 stopCnt = session->stopSendCnt[tid];
265 if (stopCnt == 0) {
266 if (PthreadMutexUnlock(&session->backPressLock) != 0) {
267 DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
268 }
269 return;
270 }
271
272 /* fileProcessCnt corresponds to trans one-to-one, one ack interval recv fileProcessCnt backpress frame */
273 fileProcessCnt = session->fileListProcessingCnt + session->smallListProcessingCnt;
274
275 session->stopSendCnt[tid] = (session->stopSendCnt[tid] > fileProcessCnt) ? (session->stopSendCnt[tid] -
276 fileProcessCnt) : 0;
277
278 if (PthreadMutexUnlock(&session->backPressLock) != 0) {
279 DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
280 return;
281 }
282
283 sleepTime = CapsTcp(session) ? NSTACKX_INIT_RATE_STAT_INTERVAL : peerInfo->rateStateInterval;
284
285 #ifndef NSTACKX_WITH_LITEOS
286 DFILE_LOGI(TAG, "tid %u sleep %u us fileProCnt %u Interval %u lastStopCnt %u stopSendCnt %u", tid, sleepTime,
287 fileProcessCnt, peerInfo->rateStateInterval, stopCnt, session->stopSendCnt[tid]);
288 #endif
289 (void)usleep(sleepTime);
290 }
291 }
292
SendDataFrame(DFileSession * session,List * unsent,uint32_t tid,uint8_t socketIndex)293 int32_t SendDataFrame(DFileSession *session, List *unsent, uint32_t tid, uint8_t socketIndex)
294 {
295 int32_t ret = NSTACKX_EOK;
296 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
297 List tmpq;
298
299 if (peerInfo == NULL) {
300 return NSTACKX_EFAILED;
301 }
302 if (peerInfo->amendSendRate == 0) {
303 return ret;
304 }
305
306 CheckSendByBackPress(session, tid, socketIndex);
307
308 int32_t maxCount = GetMaxSendCount();
309 int32_t count = CheckUnsentList(unsent, &tmpq, maxCount);
310 ret = DoSendDataFrame(session, &tmpq, count, tid, socketIndex);
311 if (ret == NSTACKX_EAGAIN) {
312 ListMove(&tmpq, unsent);
313 }
314 return ret;
315 }
316
SendControlFrame(DFileSession * session,QueueNode * queueNode)317 int32_t SendControlFrame(DFileSession *session, QueueNode *queueNode)
318 {
319 int32_t ret;
320 Socket *socket = NULL;
321
322 if (CapsTcp(session)) {
323 socket = (session->sessionType == DFILE_SESSION_TYPE_SERVER) ? session->acceptSocket : session->socket[0];
324 ret = SocketSend(socket, queueNode->frame + queueNode->sendLen, queueNode->length - queueNode->sendLen);
325 if (ret > 0 && ret == (int32_t)(queueNode->length - queueNode->sendLen)) {
326 queueNode->sendLen = 0;
327 } else if (ret > 0) {
328 queueNode->sendLen = queueNode->sendLen + (uint32_t)ret;
329 ret = NSTACKX_EAGAIN;
330 } else if (errno == EAGAIN) {
331 ret = NSTACKX_EAGAIN;
332 } else {
333 DFILE_LOGE(TAG, "socket send failed ret is %d errno is %d", ret, errno);
334 ret = NSTACKX_EFAILED;
335 }
336 return ret;
337 }
338
339 uint8_t socketIndex = queueNode->socketIndex;
340 ret = SocketSend(session->socket[socketIndex], queueNode->frame, queueNode->length);
341 if (ret <= 0) {
342 if (ret != NSTACKX_EAGAIN) {
343 DFILE_LOGE(TAG, "MpEscape. socket:%u send failed. Errno:%d", socketIndex, errno);
344 ret = NSTACKX_EFAILED;
345 }
346 }
347
348 return ret;
349 }
350
SendOutboundFrame(DFileSession * session,QueueNode ** preQueueNode)351 int32_t SendOutboundFrame(DFileSession *session, QueueNode **preQueueNode)
352 {
353 QueueNode *queueNode = *preQueueNode;
354 int32_t ret;
355
356 do {
357 if (PthreadMutexLock(&session->outboundQueueLock) != 0) {
358 DFILE_LOGE(TAG, "Pthread mutex lock failed");
359 ret = NSTACKX_EFAILED;
360 break;
361 }
362 if (queueNode == NULL && session->outboundQueueSize) {
363 queueNode = (QueueNode *)ListPopFront(&session->outboundQueue);
364 session->outboundQueueSize--;
365 }
366 if (PthreadMutexUnlock(&session->outboundQueueLock) != 0) {
367 DFILE_LOGE(TAG, "Pthread mutex unlock failed");
368 ret = NSTACKX_EFAILED;
369 break;
370 }
371 if (queueNode == NULL) {
372 ret = NSTACKX_EOK;
373 break;
374 }
375
376 uint32_t socketIndex = queueNode->socketIndex;
377 if (session->socket[socketIndex]->protocol == NSTACKX_PROTOCOL_UDP &&
378 session->socket[socketIndex]->isServer == NSTACKX_TRUE) {
379 session->socket[socketIndex]->dstAddr = queueNode->peerAddr;
380 }
381
382 ret = SendControlFrame(session, queueNode);
383 if (ret <= 0) {
384 break;
385 }
386 /* Send ok, try to get next frame. */
387 DestroyQueueNode(queueNode);
388 queueNode = NULL;
389 NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
390 } while (!session->closeFlag);
391
392 if (ret == NSTACKX_EAGAIN) {
393 *preQueueNode = queueNode;
394 } else {
395 *preQueueNode = NULL;
396 DestroyQueueNode(queueNode);
397 queueNode = NULL;
398 }
399 return ret;
400 }
401
TcpSocketRecv(DFileSession * session,uint8_t * buffer,size_t length,struct sockaddr_in * srcAddr,const socklen_t * addrLen)402 int32_t TcpSocketRecv(DFileSession *session, uint8_t *buffer, size_t length, struct sockaddr_in *srcAddr,
403 const socklen_t *addrLen)
404 {
405 int32_t ret;
406 int recvLen = 0;
407
408 Socket *socket = session->socket[0];
409
410 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
411 socket = session->acceptSocket;
412 }
413
414 while (recvLen < (int32_t)length) {
415 ret = SocketRecv(socket, buffer + session->recvLen, length - (size_t)recvLen, srcAddr, addrLen);
416 if (ret == 0) {
417 return NSTACKX_PEER_CLOSE;
418 }
419 if (ret < 0) {
420 if (errno != EAGAIN) {
421 ret = NSTACKX_EFAILED;
422 return ret;
423 } else {
424 return NSTACKX_EAGAIN;
425 }
426 }
427 recvLen = recvLen + ret;
428 session->recvLen = session->recvLen + (uint32_t)ret;
429 }
430
431 return recvLen;
432 }
433
SocketRecvForTcp(DFileSession * session,uint8_t * buffer,struct sockaddr_in * srcAddr,const socklen_t * addrLen)434 int32_t SocketRecvForTcp(DFileSession *session, uint8_t *buffer, struct sockaddr_in *srcAddr,
435 const socklen_t *addrLen)
436 {
437 int32_t ret;
438 uint16_t payloadLen;
439 DFileFrameHeader *frameHeader = NULL;
440 size_t length = sizeof(DFileFrameHeader);
441 if (session->recvLen < length) {
442 ret = TcpSocketRecv(session, buffer, length - session->recvLen, srcAddr, addrLen);
443 if (ret <= 0) {
444 return ret;
445 }
446 }
447
448 frameHeader = (DFileFrameHeader *)(session->recvBuffer);
449 payloadLen = ntohs(frameHeader->length);
450 if (payloadLen >= NSTACKX_RECV_BUFFER_LEN) {
451 DFILE_LOGI(TAG, "header length is %u recv length is %u payloadLen is %u type %u", length,
452 session->recvLen, payloadLen, frameHeader->type);
453 return NSTACKX_EFAILED;
454 }
455
456 if ((session->recvLen - length) < payloadLen) {
457 ret = TcpSocketRecv(session, buffer, payloadLen - (session->recvLen - length), srcAddr, addrLen);
458 if (ret <= 0) {
459 return ret;
460 }
461 }
462
463 return (int32_t)(session->recvLen);
464 }
465