1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <stdlib.h>
16
17 #include "nstackx_dfile_session.h"
18
19 #include "nstackx_dfile_log.h"
20 #include "nstackx_socket.h"
21
22 #define TAG "nStackXDFile"
23
24 #define WAIT_DATA_FRAME_WAIT_US 5 /* Spend 5us to read one file data frame. */
25
26 #define MAX_NR_IOVCNT 20
27 #define MAX_UDP_PAYLOAD 65507
28 #define MAX_SEND_COUNT 1
29
GetIovListSize(void)30 static inline uint32_t GetIovListSize(void)
31 {
32 // updated this value from 40 to 20 now
33 return MAX_NR_IOVCNT;
34 }
35
AllocIovList(List * head)36 static int32_t AllocIovList(List *head)
37 {
38 uint32_t size = GetIovListSize();
39 IovList *ptr = malloc(sizeof(IovList) * size);
40 if (ptr == NULL) {
41 return NSTACKX_ENOMEM;
42 }
43 for (uint32_t i = 0; i < size; i++) {
44 ptr[i].addr = NULL;
45 ptr[i].len = 0;
46 ListInsertTail(head, &ptr[i].entry);
47 }
48 return NSTACKX_EOK;
49 }
50
51 #ifndef BUILD_FOR_WINDOWS
52 __attribute__((unused))
53 #endif
GetFreeIovList(DFileSession * s,int32_t tid)54 static IovList *GetFreeIovList(DFileSession *s, int32_t tid)
55 {
56 List *p = &s->freeIovList[tid];
57 List *q = NULL;
58
59 if (ListIsEmpty(p)) {
60 int32_t err = AllocIovList(p);
61 if (err != NSTACKX_EOK) {
62 return NULL;
63 }
64 }
65
66 q = ListPopFront(p);
67 return (IovList *)q;
68 }
69
DestroyIovList(const List * head,DFileSession * s,uint32_t tid)70 void DestroyIovList(const List *head, DFileSession *s, uint32_t tid)
71 {
72 List *p = NULL;
73 List *n = NULL;
74 BlockFrame *block = NULL;
75
76 (void)s;
77 (void)tid;
78 LIST_FOR_EACH_SAFE(p, n, head) {
79 block = (BlockFrame *)p;
80 ListRemoveNode(p);
81 free(block->fileDataFrame);
82 free(block);
83 }
84 }
85
TcpSendFileDataFrame(Socket * socket,PeerInfo * peerInfo,List * p,BlockFrame * block,uint16_t len)86 static int32_t TcpSendFileDataFrame(Socket *socket, PeerInfo *peerInfo, List *p, BlockFrame *block, uint16_t len)
87 {
88 int32_t ret;
89 DFileSession *session = peerInfo->session;
90
91 ret = SocketSend(socket, (uint8_t *)block->fileDataFrame + block->sendLen, len - block->sendLen);
92 if (ret > 0 && ret == (int32_t)(len - block->sendLen)) {
93 block->sendLen = 0;
94 ListRemoveNode(p);
95 free(block->fileDataFrame);
96 free(block);
97 NSTACKX_ATOM_FETCH_INC(&peerInfo->sendCount);
98 NSTACKX_ATOM_FETCH_INC(&peerInfo->intervalSendCount);
99 NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
100 } else if (ret > 0) {
101 NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
102 block->sendLen = block->sendLen + (uint32_t)ret;
103 ret = NSTACKX_EAGAIN;
104 } else if (errno == EAGAIN) {
105 NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
106 ret = NSTACKX_EAGAIN;
107 } else {
108 DFILE_LOGE(TAG, "socket send failed ret is %d errno is %d", ret, errno);
109 ret = NSTACKX_EFAILED;
110 }
111
112 return ret;
113 }
114
UdpSendFileDataSuccess(DFileSession * session,PeerInfo * peerInfo,List * p,FileDataFrameZS * f,BlockFrame * block)115 static void UdpSendFileDataSuccess(DFileSession *session, PeerInfo *peerInfo, List *p, FileDataFrameZS *f,
116 BlockFrame *block)
117 {
118 ListRemoveNode(p);
119 free(f);
120 free(block);
121 NSTACKX_ATOM_FETCH_INC(&peerInfo->sendCount);
122 NSTACKX_ATOM_FETCH_INC(&peerInfo->intervalSendCount);
123 NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
124 }
125
SendFileDataFrame(DFileSession * session,PeerInfo * peerInfo,List * head,uint32_t tid)126 static int32_t SendFileDataFrame(DFileSession *session, PeerInfo *peerInfo, List *head, uint32_t tid)
127 {
128 List *p = NULL;
129 List *n = NULL;
130 BlockFrame *block = NULL;
131 FileDataFrameZS *f = NULL;
132 int32_t ret;
133 uint16_t len;
134 Socket *socket = session->socket[0];
135
136 if (CapsTcp(session) && (session->sessionType == DFILE_SESSION_TYPE_SERVER)) {
137 socket = session->acceptSocket;
138 }
139
140 LIST_FOR_EACH_SAFE(p, n, head) {
141 block = (BlockFrame *)p;
142 f = (FileDataFrameZS *)(void *)block->fileDataFrame;
143 len = ntohs(f->header.length) + DFILE_FRAME_HEADER_LEN;
144 if (CapsTcp(session)) {
145 ret = TcpSendFileDataFrame(socket, peerInfo, p, block, len);
146 if (ret == NSTACKX_EFAILED) {
147 break;
148 } else if (ret == NSTACKX_EAGAIN) {
149 return ret;
150 }
151 } else {
152 ret = SocketSend(session->socket[peerInfo->socketIndex], (void *)f, len);
153 if (ret > 0) {
154 UdpSendFileDataSuccess(session, peerInfo, p, f, block);
155 } else if (ret == NSTACKX_EAGAIN) {
156 NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
157 return ret;
158 } else {
159 DFILE_LOGE(TAG, "socket sendto failed");
160 break;
161 }
162 }
163 }
164
165 DestroyIovList(head, session, tid);
166
167 return ret;
168 }
169
SendFileDataFrameEx(DFileSession * session,PeerInfo * peerInfo,List * head,uint32_t tid)170 static int32_t SendFileDataFrameEx(DFileSession *session, PeerInfo *peerInfo, List *head, uint32_t tid)
171 {
172 return SendFileDataFrame(session, peerInfo, head, tid);
173 }
174
CheckUnsentList(List * unsent,List * head,int32_t maxCount)175 static int32_t CheckUnsentList(List *unsent, List *head, int32_t maxCount)
176 {
177 int32_t cnt = 0;
178
179 ListInitHead(head);
180 while (cnt < maxCount && !ListIsEmpty(unsent)) {
181 List *p = ListPopFront(unsent);
182 if (p == NULL) {
183 break;
184 }
185 ListInsertTail(head, p);
186 cnt++;
187 }
188
189 return cnt;
190 }
191
GetMaxSendCount(void)192 static int32_t GetMaxSendCount(void)
193 {
194 return MAX_SEND_COUNT;
195 }
196
DoSendDataFrame(DFileSession * session,List * head,int32_t count,uint32_t tid,uint8_t socketIndex)197 static int32_t DoSendDataFrame(DFileSession *session, List *head, int32_t count, uint32_t tid, uint8_t socketIndex)
198 {
199 BlockFrame *block = NULL;
200 int32_t ret;
201 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
202 if (!peerInfo) {
203 return NSTACKX_EFAILED;
204 }
205 int32_t maxCount = GetMaxSendCount();
206 int32_t flag;
207 do {
208 while (count < maxCount && FileManagerHasPendingData(session->fileManager)) {
209 ret = FileManagerFileRead(session->fileManager, tid, &block, maxCount - count);
210 if (ret < 0) {
211 DFILE_LOGE(TAG, "FileManagerFileRead failed %d", ret);
212 break;
213 }
214 if (ret == 0) {
215 NSTACKX_ATOM_FETCH_INC(&session->sendBlockListEmptyTimes);
216 (void)usleep(WAIT_DATA_FRAME_WAIT_US);
217 continue;
218 }
219 while (block) {
220 List *next = block->list.next;
221 ListInsertTail(head, &block->list);
222 block = (BlockFrame *)(void *)next;
223 }
224 count += ret;
225 }
226
227 if (count == 0) {
228 NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
229 ret = NSTACKX_EOK;
230 break;
231 }
232 ret = SendFileDataFrameEx(session, peerInfo, head, tid);
233 if (ret <= 0) {
234 break;
235 }
236
237 count = 0;
238 maxCount = GetMaxSendCount();
239 flag = CapsTcp(session) ? (session->sendRemain ? 0 : 1) :
240 (peerInfo->intervalSendCount < (uint16_t)peerInfo->amendSendRate && !session->closeFlag);
241 } while (flag && (session->stopSendCnt[tid] == 0));
242 return ret;
243 }
244
245
246 /*
247 * * if backpress frame count is not zero then sleep one ack interval and update stopSendCnt
248 * * if backpress frame count is zero then send packet normally
249 * */
CheckSendByBackPress(DFileSession * session,uint32_t tid,uint8_t socketIndex)250 static void CheckSendByBackPress(DFileSession *session, uint32_t tid, uint8_t socketIndex)
251 {
252 uint32_t fileProcessCnt;
253 uint32_t sleepTime;
254 uint32_t stopCnt;
255 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
256 if (peerInfo == NULL) {
257 return;
258 }
259
260 if (session->stopSendCnt[tid] != 0) {
261 if (PthreadMutexLock(&session->backPressLock) != 0) {
262 DFILE_LOGE(TAG, "pthread backPressLock mutex lock failed");
263 return;
264 }
265
266 stopCnt = session->stopSendCnt[tid];
267 if (stopCnt == 0) {
268 if (PthreadMutexUnlock(&session->backPressLock) != 0) {
269 DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
270 }
271 return;
272 }
273
274 /* fileProcessCnt corresponds to trans one-to-one, one ack interval recv fileProcessCnt backpress frame */
275 fileProcessCnt = session->fileListProcessingCnt + session->smallListProcessingCnt;
276
277 session->stopSendCnt[tid] = (session->stopSendCnt[tid] > fileProcessCnt) ? (session->stopSendCnt[tid] -
278 fileProcessCnt) : 0;
279
280 if (PthreadMutexUnlock(&session->backPressLock) != 0) {
281 DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
282 return;
283 }
284
285 sleepTime = CapsTcp(session) ? NSTACKX_INIT_RATE_STAT_INTERVAL : peerInfo->rateStateInterval;
286
287 #ifndef NSTACKX_WITH_LITEOS
288 DFILE_LOGI(TAG, "tid %u sleep %u us fileProCnt %u Interval %u lastStopCnt %u stopSendCnt %u", tid, sleepTime,
289 fileProcessCnt, peerInfo->rateStateInterval, stopCnt, session->stopSendCnt[tid]);
290 #endif
291 (void)usleep(sleepTime);
292 }
293 }
294
SendDataFrame(DFileSession * session,List * unsent,uint32_t tid,uint8_t socketIndex)295 int32_t SendDataFrame(DFileSession *session, List *unsent, uint32_t tid, uint8_t socketIndex)
296 {
297 int32_t ret = NSTACKX_EOK;
298 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
299 List tmpq;
300
301 if (peerInfo == NULL) {
302 return NSTACKX_EFAILED;
303 }
304 if (peerInfo->amendSendRate == 0) {
305 return ret;
306 }
307
308 CheckSendByBackPress(session, tid, socketIndex);
309
310 int32_t maxCount = GetMaxSendCount();
311 int32_t count = CheckUnsentList(unsent, &tmpq, maxCount);
312 ret = DoSendDataFrame(session, &tmpq, count, tid, socketIndex);
313 if (ret == NSTACKX_EAGAIN) {
314 ListMove(&tmpq, unsent);
315 }
316 return ret;
317 }
318
SendControlFrame(DFileSession * session,QueueNode * queueNode)319 int32_t SendControlFrame(DFileSession *session, QueueNode *queueNode)
320 {
321 int32_t ret;
322 Socket *socket = NULL;
323
324 if (CapsTcp(session)) {
325 socket = (session->sessionType == DFILE_SESSION_TYPE_SERVER) ? session->acceptSocket : session->socket[0];
326 ret = SocketSend(socket, queueNode->frame + queueNode->sendLen, queueNode->length - queueNode->sendLen);
327 if (ret > 0 && ret == (int32_t)(queueNode->length - queueNode->sendLen)) {
328 queueNode->sendLen = 0;
329 } else if (ret > 0) {
330 queueNode->sendLen = queueNode->sendLen + (uint32_t)ret;
331 ret = NSTACKX_EAGAIN;
332 } else if (errno == EAGAIN) {
333 ret = NSTACKX_EAGAIN;
334 } else {
335 DFILE_LOGE(TAG, "socket send failed ret is %d errno is %d", ret, errno);
336 ret = NSTACKX_EFAILED;
337 }
338 return ret;
339 }
340
341 uint8_t socketIndex = queueNode->socketIndex;
342 ret = SocketSend(session->socket[socketIndex], queueNode->frame, queueNode->length);
343 if (ret <= 0) {
344 if (ret != NSTACKX_EAGAIN) {
345 DFILE_LOGE(TAG, "MpEscape. socket:%u send failed. Errno:%d", socketIndex, errno);
346 ret = NSTACKX_EFAILED;
347 }
348 }
349
350 return ret;
351 }
352
SendOutboundFrame(DFileSession * session,QueueNode ** preQueueNode)353 int32_t SendOutboundFrame(DFileSession *session, QueueNode **preQueueNode)
354 {
355 QueueNode *queueNode = *preQueueNode;
356 int32_t ret;
357
358 do {
359 if (PthreadMutexLock(&session->outboundQueueLock) != 0) {
360 DFILE_LOGE(TAG, "Pthread mutex lock failed");
361 ret = NSTACKX_EFAILED;
362 break;
363 }
364 if (queueNode == NULL && session->outboundQueueSize) {
365 queueNode = (QueueNode *)ListPopFront(&session->outboundQueue);
366 session->outboundQueueSize--;
367 }
368 if (PthreadMutexUnlock(&session->outboundQueueLock) != 0) {
369 DFILE_LOGE(TAG, "Pthread mutex unlock failed");
370 ret = NSTACKX_EFAILED;
371 break;
372 }
373 if (queueNode == NULL) {
374 ret = NSTACKX_EOK;
375 break;
376 }
377
378 uint32_t socketIndex = queueNode->socketIndex;
379 if (session->socket[socketIndex]->protocol == NSTACKX_PROTOCOL_UDP &&
380 session->socket[socketIndex]->isServer == NSTACKX_TRUE) {
381 session->socket[socketIndex]->dstAddr = queueNode->peerAddr;
382 }
383
384 ret = SendControlFrame(session, queueNode);
385 if (ret <= 0) {
386 break;
387 }
388 /* Send ok, try to get next frame. */
389 DestroyQueueNode(queueNode);
390 queueNode = NULL;
391 NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
392 } while (!session->closeFlag);
393
394 if (ret == NSTACKX_EAGAIN) {
395 *preQueueNode = queueNode;
396 } else {
397 *preQueueNode = NULL;
398 DestroyQueueNode(queueNode);
399 queueNode = NULL;
400 }
401 return ret;
402 }
403
TcpSocketRecv(DFileSession * session,uint8_t * buffer,size_t length,struct sockaddr_in * srcAddr,const socklen_t * addrLen)404 int32_t TcpSocketRecv(DFileSession *session, uint8_t *buffer, size_t length, struct sockaddr_in *srcAddr,
405 const socklen_t *addrLen)
406 {
407 int32_t ret;
408 int recvLen = 0;
409
410 Socket *socket = session->socket[0];
411
412 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
413 socket = session->acceptSocket;
414 }
415
416 while (recvLen < (int32_t)length) {
417 ret = SocketRecv(socket, buffer + session->recvLen, length - (size_t)recvLen, srcAddr, addrLen);
418 if (ret == 0) {
419 return NSTACKX_PEER_CLOSE;
420 }
421 if (ret < 0) {
422 if (errno != EAGAIN) {
423 ret = NSTACKX_EFAILED;
424 return ret;
425 } else {
426 return NSTACKX_EAGAIN;
427 }
428 }
429 recvLen = recvLen + ret;
430 session->recvLen = session->recvLen + (uint32_t)ret;
431 }
432
433 return recvLen;
434 }
435
SocketRecvForTcp(DFileSession * session,uint8_t * buffer,struct sockaddr_in * srcAddr,const socklen_t * addrLen)436 int32_t SocketRecvForTcp(DFileSession *session, uint8_t *buffer, struct sockaddr_in *srcAddr,
437 const socklen_t *addrLen)
438 {
439 int32_t ret;
440 uint16_t payloadLen;
441 DFileFrameHeader *frameHeader = NULL;
442 size_t length = sizeof(DFileFrameHeader);
443 if (session->recvLen < length) {
444 ret = TcpSocketRecv(session, buffer, length - session->recvLen, srcAddr, addrLen);
445 if (ret <= 0) {
446 return ret;
447 }
448 }
449
450 frameHeader = (DFileFrameHeader *)(session->recvBuffer);
451 payloadLen = ntohs(frameHeader->length);
452 if (payloadLen > NSTACKX_RECV_BUFFER_LEN - length) {
453 DFILE_LOGI(TAG, "header length is %u recv length is %u payloadLen is %u type %u", length,
454 session->recvLen, payloadLen, frameHeader->type);
455 return NSTACKX_EFAILED;
456 }
457
458 if ((session->recvLen - length) < payloadLen) {
459 ret = TcpSocketRecv(session, buffer, payloadLen - (session->recvLen - length), srcAddr, addrLen);
460 if (ret <= 0) {
461 return ret;
462 }
463 }
464
465 return (int32_t)(session->recvLen);
466 }
467