1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "nstackx_dfile_send.h"
17 #include "nstackx_dfile_mp.h"
18 #include "nstackx_dfile_transfer.h"
19 #include "nstackx_file_manager.h"
20 #include "nstackx_log.h"
21 #include "securec.h"
22 #define TAG "nStackXDfileMp"
23
DFileSocketRecvSP(DFileSession * session)24 int32_t DFileSocketRecvSP(DFileSession *session)
25 {
26 struct sockaddr_in peerAddr;
27 socklen_t addrLen = sizeof(struct sockaddr_in);
28 uint8_t frame[NSTACKX_MAX_FRAME_SIZE] = {0};
29 int32_t ret;
30
31 (void)memset_s(&peerAddr, addrLen, 0, addrLen);
32 if (CapsTcp(session)) {
33 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
34 if (session->acceptFlag == 0) {
35 ret = DFileAcceptSocket(session);
36 return ret;
37 } else {
38 ret = SocketRecvForTcp(session, session->recvBuffer, &peerAddr, &addrLen);
39 }
40 } else {
41 ret = SocketRecvForTcp(session, session->recvBuffer, &peerAddr, &addrLen);
42 }
43 if (ret == NSTACKX_PEER_CLOSE) {
44 return ret;
45 }
46 } else {
47 ret = SocketRecv(session->socket[0], frame, sizeof(frame), &peerAddr, &addrLen);
48 }
49 if (ret <= 0) {
50 if (ret != NSTACKX_EAGAIN) {
51 LOGE(TAG, "socket recv failed");
52 return NSTACKX_EFAILED;
53 }
54 return NSTACKX_EAGAIN;
55 }
56 NSTACKX_ATOM_FETCH_INC(&session->totalRecvBlocks);
57 if (CapsTcp(session)) {
58 ret = DFileSessionHandleReadBuffer(session, session->recvBuffer, (size_t)session->recvLen, &peerAddr, 0);
59 session->recvLen = 0;
60 } else {
61 ret = DFileSessionHandleReadBuffer(session, frame, (size_t)ret, &peerAddr, 0);
62 }
63 if (ret != NSTACKX_EOK) {
64 LOGE(TAG, "handle read buffer failed");
65 }
66 return ret;
67 }
68
TransSelectPeerInfo(DFileSession * session)69 PeerInfo *TransSelectPeerInfo(DFileSession *session)
70 {
71 PeerInfo *peerInfo = (PeerInfo *)ListGetFront(&session->peerInfoChain);
72 return peerInfo;
73 }
74
75 /* only for client */
ClientGetPeerInfoByTransId(DFileSession * session)76 PeerInfo *ClientGetPeerInfoByTransId(DFileSession *session)
77 {
78 PeerInfo *peerInfo = (PeerInfo *)ListGetFront(&session->peerInfoChain);
79 return peerInfo;
80 }
81
82 /* only for client */
ClientGetPeerInfoBySocketIndex(uint8_t socketIndex,const DFileSession * session)83 PeerInfo *ClientGetPeerInfoBySocketIndex(uint8_t socketIndex, const DFileSession *session)
84 {
85 PeerInfo *peerInfo = NULL;
86 List *pos = NULL;
87
88 LIST_FOR_EACH(pos, &session->peerInfoChain) {
89 peerInfo = (PeerInfo *)pos;
90 if (peerInfo->socketIndex == socketIndex) {
91 return peerInfo;
92 }
93 }
94 return NULL;
95 }
96
CreateSenderThread(DFileSession * session)97 int32_t CreateSenderThread(DFileSession *session)
98 {
99 SenderThreadPara *para = NULL;
100
101 para = malloc(sizeof(SenderThreadPara));
102 if (para == NULL) {
103 return NSTACKX_ENOMEM;
104 }
105 para->session = session;
106 para->socketIndex = 0;
107 if (PthreadCreate(&(session->senderTid[0]), NULL, DFileSenderHandle, para)) {
108 LOGE(TAG, "Create sender thread 0 failed");
109 free(para);
110 return NSTACKX_EFAILED;
111 }
112
113 return NSTACKX_EOK;
114 }
115
RebuildFilelist(const char * files[],const char * remotePath[],uint32_t fileNum,DFileSession * session,DFileRebuildFileList * rebuildList)116 int32_t RebuildFilelist(const char *files[], const char *remotePath[], uint32_t fileNum,
117 DFileSession *session, DFileRebuildFileList *rebuildList)
118 {
119 if (session->allTaskCount >= NSTACKX_MAX_FILE_LIST_NUM) {
120 LOGI(TAG, "more than %d send task", NSTACKX_MAX_FILE_LIST_NUM);
121 return NSTACKX_EFAILED;
122 }
123
124 (void)memset_s(rebuildList, sizeof(DFileRebuildFileList), 0, sizeof(DFileRebuildFileList));
125
126 rebuildList->transNum = 1;
127 for (uint32_t i = 0; i < fileNum; i++) {
128 rebuildList->files[i] = files[i];
129 if (remotePath) {
130 rebuildList->remotePath[i] = remotePath[i];
131 }
132 }
133 return NSTACKX_EOK;
134 }
135
InitOutboundQueueWait(DFileSession * session)136 int32_t InitOutboundQueueWait(DFileSession *session)
137 {
138 if (SemInit(&session->outboundQueueWait[0], 0, 0) != 0) {
139 return NSTACKX_EFAILED;
140 }
141
142 return NSTACKX_EOK;
143 }
144
DestroyOutboundQueueWait(DFileSession * session)145 void DestroyOutboundQueueWait(DFileSession *session)
146 {
147 SemDestroy(&session->outboundQueueWait[0]);
148 }
149
PostOutboundQueueWait(DFileSession * session)150 void PostOutboundQueueWait(DFileSession *session)
151 {
152 SemPost(&session->outboundQueueWait[0]);
153 }
154