• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "vtp_stream_socket.h"
17 
18 #include <chrono>
19 #include <ifaddrs.h>
20 #include <memory>
21 #include <netinet/in.h>
22 #include <securec.h>
23 #include <sys/socket.h>
24 #include <sys/time.h>
25 #include <thread>
26 
27 #include "fillpinc.h"
28 #include "raw_stream_data.h"
29 #include "stream_common_data.h"
30 #include "session.h"
31 #include "softbus_adapter_timer.h"
32 #include "softbus_adapter_crypto.h"
33 #include "softbus_adapter_socket.h"
34 #include "softbus_errcode.h"
35 #include "softbus_log.h"
36 #include "softbus_trans_def.h"
37 #include "stream_depacketizer.h"
38 #include "stream_packetizer.h"
39 
40 namespace Communication {
41 namespace SoftBus {
42 bool g_logOn = false;
43 const int FEED_BACK_PERIOD = 1;  /* feedback period of fillp stream traffic statistics is 1s */
44 const int MS_PER_SECOND = 1000;
45 const int US_PER_MS = 1000;
46 
47 namespace {
PrintOptionInfo(int type,const StreamAttr & value)48 void PrintOptionInfo(int type, const StreamAttr &value)
49 {
50     switch (value.GetType()) {
51         case INT_TYPE:
52             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
53                 "Int option: type:%d, value:%d", type, value.GetIntValue());
54             break;
55         case BOOL_TYPE:
56             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
57                 "Bool option: type:%d, value:%d", type, value.GetBoolValue());
58             break;
59         case STRING_TYPE:
60             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
61                 "String option: type:%d, value:%s", type, value.GetStrValue().c_str());
62             break;
63         default:
64             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Wrong StreamAttr!");
65             (void)type;
66     }
67 }
68 } // namespace
69 std::shared_ptr<VtpInstance> VtpStreamSocket::vtpInstance_ = VtpInstance::GetVtpInstance();
70 
71 std::map<int, std::mutex &> VtpStreamSocket::g_streamSocketLockMap;
72 std::mutex VtpStreamSocket::g_streamSocketLockMapLock_;
73 std::map<int, std::shared_ptr<VtpStreamSocket>> VtpStreamSocket::g_streamSocketMap;
74 std::mutex VtpStreamSocket::g_streamSocketMapLock_;
75 
ConvertStreamFrameInfo2FrameInfo(FrameInfo * frameInfo,const Communication::SoftBus::StreamFrameInfo * streamFrameInfo)76 static inline void ConvertStreamFrameInfo2FrameInfo(FrameInfo* frameInfo,
77     const Communication::SoftBus::StreamFrameInfo* streamFrameInfo)
78 {
79     frameInfo->frameType = (FILLP_INT)(streamFrameInfo->frameType);
80     frameInfo->seqNum = (FILLP_INT)(streamFrameInfo->seqNum);
81     frameInfo->subSeqNum = (FILLP_INT)(streamFrameInfo->seqSubNum);
82     frameInfo->level = (FILLP_INT)(streamFrameInfo->level);
83     frameInfo->timestamp = (FILLP_SLONG)streamFrameInfo->timeStamp;
84     frameInfo->bitMap = (FILLP_UINT32)streamFrameInfo->bitMap;
85 }
86 
AddStreamSocketLock(int fd,std::mutex & streamsocketlock)87 void VtpStreamSocket::AddStreamSocketLock(int fd, std::mutex &streamsocketlock)
88 {
89     std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
90     if (!g_streamSocketLockMap.empty() && g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
91         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "streamsocketlock for fd = %d already exists", fd);
92         return;
93     }
94 
95     g_streamSocketLockMap.emplace(std::pair<int, std::mutex &>(fd, streamsocketlock));
96 }
97 
AddStreamSocketListener(int fd,std::shared_ptr<VtpStreamSocket> streamreceiver)98 void VtpStreamSocket::AddStreamSocketListener(int fd, std::shared_ptr<VtpStreamSocket> streamreceiver)
99 {
100     std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
101     if (!g_streamSocketMap.empty() && g_streamSocketMap.find(fd) != g_streamSocketMap.end()) {
102         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "streamreceiver for fd = %d already exists", fd);
103         return;
104     }
105 
106     g_streamSocketMap.emplace(std::pair<int, std::shared_ptr<VtpStreamSocket>>(fd, streamreceiver));
107 }
108 
RemoveStreamSocketLock(int fd)109 void VtpStreamSocket::RemoveStreamSocketLock(int fd)
110 {
111     std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
112     if (g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
113         g_streamSocketLockMap.erase(fd);
114         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Remove streamsocketlock for fd = %d success", fd);
115     } else {
116         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
117             "Streamsocketlock for fd = %d not exist in the map", fd);
118     }
119     return;
120 }
121 
RemoveStreamSocketListener(int fd)122 void VtpStreamSocket::RemoveStreamSocketListener(int fd)
123 {
124     std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
125     if (g_streamSocketMap.find(fd) != g_streamSocketMap.end()) {
126         g_streamSocketMap.erase(fd);
127         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Remove streamreceiver for fd = %d success", fd);
128     } else {
129         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Streamreceiver for fd = %d not exist in the map", fd);
130     }
131     return;
132 }
133 
InsertElementToFuncMap(int type,ValueType valueType,MySetFunc set,MyGetFunc get)134 void VtpStreamSocket::InsertElementToFuncMap(int type, ValueType valueType, MySetFunc set, MyGetFunc get)
135 {
136     OptionFunc fun = {
137         valueType, set, get
138     };
139     optFuncMap_.insert(std::pair<int, OptionFunc>(type, fun));
140 }
141 
VtpStreamSocket()142 VtpStreamSocket::VtpStreamSocket()
143 {
144     InsertElementToFuncMap(TOS, INT_TYPE, &VtpStreamSocket::SetIpTos, &VtpStreamSocket::GetIpTos);
145     InsertElementToFuncMap(FD, INT_TYPE, nullptr, &VtpStreamSocket::GetStreamSocketFd);
146     InsertElementToFuncMap(SERVER_FD, INT_TYPE, nullptr, &VtpStreamSocket::GetListenSocketFd);
147     InsertElementToFuncMap(LOCAL_IP, INT_TYPE, nullptr, &VtpStreamSocket::GetIp);
148     InsertElementToFuncMap(LOCAL_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
149     InsertElementToFuncMap(REMOTE_IP, STRING_TYPE, nullptr, &VtpStreamSocket::GetIp);
150     InsertElementToFuncMap(REMOTE_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
151     InsertElementToFuncMap(BOUND_INTERFACE_IP, STRING_TYPE, &VtpStreamSocket::SetSocketBindToDevices, nullptr);
152     InsertElementToFuncMap(IP_TYPE, STRING_TYPE, nullptr, &VtpStreamSocket::GetIpType);
153     InsertElementToFuncMap(REMOTE_SCOPE_ID, INT_TYPE, nullptr, &VtpStreamSocket::GetRemoteScopeId);
154     InsertElementToFuncMap(NON_BLOCK, BOOL_TYPE, &VtpStreamSocket::SetNonBlockMode, &VtpStreamSocket::GetNonBlockMode);
155     InsertElementToFuncMap(KEEP_ALIVE_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig, nullptr);
156     InsertElementToFuncMap(SEND_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
157         &VtpStreamSocket::GetVtpStackConfig);
158     InsertElementToFuncMap(RECV_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
159         &VtpStreamSocket::GetVtpStackConfig);
160     InsertElementToFuncMap(SEND_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
161         &VtpStreamSocket::GetVtpStackConfig);
162     InsertElementToFuncMap(RECV_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
163         &VtpStreamSocket::GetVtpStackConfig);
164     InsertElementToFuncMap(PACKET_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
165         &VtpStreamSocket::GetVtpStackConfig);
166     InsertElementToFuncMap(MAX_VTP_SOCKET_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
167         &VtpStreamSocket::GetVtpStackConfig);
168     InsertElementToFuncMap(MAX_VTP_CONNECT_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
169         &VtpStreamSocket::GetVtpStackConfig);
170     InsertElementToFuncMap(REDUNANCY_SWITCH, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
171         &VtpStreamSocket::GetVtpStackConfig);
172     InsertElementToFuncMap(REDUNANCY_LEVEL, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
173         &VtpStreamSocket::GetVtpStackConfig);
174     InsertElementToFuncMap(NACK_DELAY, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
175         &VtpStreamSocket::GetVtpStackConfig);
176     InsertElementToFuncMap(NACK_DELAY_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
177         &VtpStreamSocket::GetVtpStackConfig);
178     InsertElementToFuncMap(PACK_INTERVAL_ENLARGE, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
179         &VtpStreamSocket::GetVtpStackConfig);
180     InsertElementToFuncMap(STREAM_TYPE_INT, INT_TYPE, &VtpStreamSocket::SetStreamType, &VtpStreamSocket::GetStreamType);
181     InsertElementToFuncMap(IS_SERVER, INT_TYPE, nullptr, &VtpStreamSocket::IsServer);
182     InsertElementToFuncMap(SCENE, INT_TYPE, &VtpStreamSocket::SetStreamScene, nullptr);
183     InsertElementToFuncMap(STREAM_HEADER_SIZE, INT_TYPE, &VtpStreamSocket::SetStreamHeaderSize, nullptr);
184 
185     scene_ = UNKNOWN_SCENE;
186 }
187 
~VtpStreamSocket()188 VtpStreamSocket::~VtpStreamSocket()
189 {
190     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "~VtpStreamSocket");
191 }
192 
GetSelf()193 std::shared_ptr<VtpStreamSocket> VtpStreamSocket::GetSelf()
194 {
195     return shared_from_this();
196 }
197 
HandleFillpFrameStats(int fd,const FtEventCbkInfo * info)198 int VtpStreamSocket::HandleFillpFrameStats(int fd, const FtEventCbkInfo *info)
199 {
200     StreamSendStats stats = {};
201     if (memcpy_s(&stats, sizeof(StreamSendStats), &info->info.frameSendStats,
202         sizeof(info->info.frameSendStats)) != EOK) {
203         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "streamStats info memcpy fail");
204         return -1;
205     }
206 
207     std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
208     auto itListener = g_streamSocketMap.find(fd);
209     if (itListener != g_streamSocketMap.end()) {
210         if (itListener->second->streamReceiver_ != nullptr) {
211             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "OnFrameStats enter");
212             itListener->second->streamReceiver_->OnFrameStats(&stats);
213         } else {
214             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "streamReceiver_ is nullptr");
215         }
216     } else {
217         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "StreamReceiver for fd = %d is empty in the map", fd);
218     }
219     return 0;
220 }
221 
HandleRipplePolicy(int fd,const FtEventCbkInfo * info)222 int VtpStreamSocket::HandleRipplePolicy(int fd, const FtEventCbkInfo *info)
223 {
224     TrafficStats stats;
225     (void)memset_s(&stats, sizeof(TrafficStats), 0, sizeof(TrafficStats));
226     if (memcpy_s(&stats.stats, sizeof(stats.stats), info->info.trafficData.stats,
227         sizeof(info->info.trafficData.stats)) != EOK) {
228         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "RipplePolicy info memcpy fail");
229         return -1;
230     }
231     std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
232     auto itListener = g_streamSocketMap.find(fd);
233     if (itListener != g_streamSocketMap.end()) {
234         if (itListener->second->streamReceiver_ != nullptr) {
235             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "OnRippleStats enter");
236             itListener->second->streamReceiver_->OnRippleStats(&stats);
237         } else {
238             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "OnRippleStats streamReceiver_ is nullptr");
239         }
240     } else {
241         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
242             "OnRippleStats streamReceiver for fd = %d is empty in the map", fd);
243     }
244     return 0;
245 }
246 
247 #ifdef FILLP_SUPPORT_BW_DET
FillSupportDet(int fd,const FtEventCbkInfo * info,QosTv * metricList)248 void VtpStreamSocket::FillSupportDet(int fd, const FtEventCbkInfo *info, QosTv *metricList)
249 {
250     if (info == nullptr) {
251         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "stats info is nullptr");
252         return;
253     }
254     if (info->evt == FT_EVT_BW_DET) {
255         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
256             "[Metric Return]: Fillp bandwidth information of socket(%d) is returned", fd);
257         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
258             "[Metric Return]: Changed amount of current available bandwidth is: %d", info->info.bwInfo.bwStat);
259         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
260             "[Metric Return]: Current bandwidth for receiving data is: %d kbps", info->info.bwInfo.rate);
261         metricList->type = BANDWIDTH_ESTIMATE_VALUE;
262         metricList->info.bandwidthInfo.trend = info->info.bwInfo.bwStat;
263         metricList->info.bandwidthInfo.rate = info->info.bwInfo.rate;
264     }
265     if (info->evt == FT_EVT_JITTER_DET) {
266         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
267             "[Metric Return]: Fillp connection quality information of socket(%d) is returned", fd);
268         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
269             "[Metric Return]: Predeicted network condition is: %d", info->info.jitterInfo.jitterLevel);
270         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
271             "[Metric Return]: Current available receiving buffer is: %d ms", info->info.jitterInfo.bufferTime);
272         metricList->type = JITTER_DETECTION_VALUE;
273         metricList->info.jitterInfo.jitterLevel = info->info.jitterInfo.jitterLevel;
274         metricList->info.jitterInfo.bufferTime = info->info.jitterInfo.bufferTime;
275     }
276 }
277 #endif
278 
279 /* This function is used to prompt the metrics returned by FtApiRegEventCallbackFunc() function */
FillpStatistics(int fd,const FtEventCbkInfo * info)280 int VtpStreamSocket::FillpStatistics(int fd, const FtEventCbkInfo *info)
281 {
282     if (info == nullptr) {
283         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "stats info is nullptr");
284         return -1;
285     }
286     if (info->evt == FT_EVT_FRAME_STATS) {
287         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "recv fillp frame stats");
288         return HandleFillpFrameStats(fd, info);
289     } else if (info->evt == FT_EVT_TRAFFIC_DATA) {
290         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "recv fillp traffic data");
291         return HandleRipplePolicy(fd, info);
292     }
293 #ifdef FILLP_SUPPORT_BW_DET
294     if (info->evt == FT_EVT_BW_DET || info->evt == FT_EVT_JITTER_DET) {
295         int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
296         int16_t tvCount = 1;
297         QosTv metricList = {};
298 
299         FillSupportDet(fd, info, &metricList);
300         metricList.info.wifiChannelInfo = {};
301         metricList.info.frameStatusInfo = {};
302 
303         std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
304         auto itLock = g_streamSocketLockMap.find(fd);
305         if (itLock != g_streamSocketLockMap.end()) {
306             std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
307             auto itListener = g_streamSocketMap.find(fd);
308             if (itListener != g_streamSocketMap.end()) {
309                 std::thread([itListener, eventId, tvCount, metricList, &itLock]() {
310                     std::lock_guard<std::mutex> guard(itLock->second);
311                     itListener->second->OnQosEvent(eventId, tvCount, &metricList);
312                 }).detach();
313             } else {
314                 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "StreamReceiver for fd = %d is empty in the map", fd);
315             }
316         } else {
317             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "StreamSocketLock for fd = %d is empty in the map", fd);
318         }
319     } else {
320         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
321             "[Metric Return]: Fail to retrieve bandwidth and connection quality information");
322         return -1;
323     }
324 #endif
325     return 0;
326 }
327 
FillpAppStatistics()328 void VtpStreamSocket::FillpAppStatistics()
329 {
330     int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
331     int16_t tvCount = 1;
332     QosTv metricList = {};
333     FillpStatisticsPcb fillpPcbStats = {};
334     SoftBusSysTime fillpStatsGetTime = {0};
335 
336     int getStatisticsRet = FtFillpStatsGet(streamFd_, &fillpPcbStats);
337     SoftBusGetTime(&fillpStatsGetTime);
338     if (getStatisticsRet == 0) {
339         metricList.type = STREAM_TRAFFIC_STASTICS;
340         metricList.info.appStatistics.statisticsGotTime = static_cast<uint64_t>((fillpStatsGetTime.sec *
341             MS_PER_SECOND + fillpStatsGetTime.usec / US_PER_MS)); /* ms */
342         metricList.info.appStatistics.periodRecvBits =
343             static_cast<uint64_t>(fillpPcbStats.appFcStastics.periodRecvBits);
344         metricList.info.appStatistics.pktNum = fillpPcbStats.appFcStastics.pktNum;
345         metricList.info.appStatistics.periodRecvPkts = fillpPcbStats.appFcStastics.periodRecvPkts;
346         metricList.info.appStatistics.periodRecvPktLoss = fillpPcbStats.appFcStastics.periodRecvPktLoss;
347         metricList.info.appStatistics.periodRecvRate = fillpPcbStats.appFcStastics.periodRecvRate;
348         metricList.info.appStatistics.periodRecvRateBps = fillpPcbStats.appFcStastics.periodRecvRateBps;
349         metricList.info.appStatistics.periodRtt = fillpPcbStats.appFcStastics.periodRtt;
350         metricList.info.appStatistics.periodRecvPktLossHighPrecision =
351             fillpPcbStats.appFcStastics.periodRecvPktLossHighPrecision;
352         metricList.info.appStatistics.periodSendLostPkts = fillpPcbStats.appFcStastics.periodSendLostPkts;
353         metricList.info.appStatistics.periodSendPkts = fillpPcbStats.appFcStastics.periodSendPkts;
354         metricList.info.appStatistics.periodSendPktLossHighPrecision =
355             fillpPcbStats.appFcStastics.periodSendPktLossHighPrecision;
356         metricList.info.appStatistics.periodSendBits = fillpPcbStats.appFcStastics.periodSendBits;
357         metricList.info.appStatistics.periodSendRateBps = fillpPcbStats.appFcStastics.periodSendRateBps;
358 
359         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
360             "Succeed to get fillp statistics information for streamfd = %d", streamFd_);
361         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
362             "[Metric Return]: periodRtt is: %d", fillpPcbStats.appFcStastics.periodRtt);
363 
364         std::lock_guard<std::mutex> guard(streamSocketLock_);
365 
366         if (streamReceiver_ != nullptr) {
367             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
368                 "[Metric Notify]: Fillp traffic statistics information of socket(%d) is notified", streamFd_);
369             streamReceiver_->OnQosEvent(eventId, tvCount, &metricList);
370         } else {
371             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "StreamReceiver for fd = %d is empty", streamFd_);
372         }
373     } else {
374         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
375             "Fail to get fillp statistics information for streamfd = %d, errorcode = %d", streamFd_, FtGetErrno());
376     }
377 }
378 
CreateClient(IpAndPort & local,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)379 bool VtpStreamSocket::CreateClient(IpAndPort &local, int streamType, std::pair<uint8_t*, uint32_t> sessionKey)
380 {
381     int fd = CreateAndBindSocket(local);
382     if (fd == -1) {
383         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "CreateAndBindSocket failed, errorcode:%d", FtGetErrno());
384         DestroyStreamSocket();
385         return false;
386     }
387 
388     sessionKey_.second = sessionKey.second;
389     if (sessionKey_.first == nullptr) {
390         sessionKey_.first = new uint8_t[sessionKey_.second];
391     }
392     if (memcpy_s(sessionKey_.first, sessionKey_.second, sessionKey.first, sessionKey.second) != EOK) {
393         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "memcpy key error.");
394         return false;
395     }
396 
397     streamType_ = streamType;
398     std::lock_guard<std::mutex> guard(streamSocketLock_);
399     streamFd_ = fd;
400     configCv_.notify_all();
401 
402     SetDefaultConfig(fd);
403     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
404         "Success to create a client socket(%d) of stream type(%d)", fd, streamType);
405     return true;
406 }
407 
CreateClient(IpAndPort & local,const IpAndPort & remote,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)408 bool VtpStreamSocket::CreateClient(IpAndPort &local, const IpAndPort &remote, int streamType,
409     std::pair<uint8_t*, uint32_t> sessionKey)
410 {
411     if (!CreateClient(local, streamType, sessionKey)) {
412         return false;
413     }
414     /* enable the bandwidth and CQE estimation algorithms by FtSetSockOpt() for current ftsocket */
415 #ifdef FILLP_SUPPORT_BW_DET
416     bool isServer = false;
417     EnableBwEstimationAlgo(streamFd_, isServer);
418 #endif
419 
420     bool connectRet = Connect(remote);
421     if (connectRet) {
422         bool isServer = false;
423         RegisterMetricCallback(isServer); /* register the callback function */
424     }
425     return connectRet;
426 }
427 
CreateServer(IpAndPort & local,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)428 bool VtpStreamSocket::CreateServer(IpAndPort &local, int streamType, std::pair<uint8_t*, uint32_t> sessionKey)
429 {
430     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "CreateVtpServer start");
431     listenFd_ = CreateAndBindSocket(local);
432     if (listenFd_ == -1) {
433         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "create listenFd failed, errorcode %d", FtGetErrno());
434         DestroyStreamSocket();
435         return false;
436     }
437 
438     bool ret = FtListen(listenFd_, MAX_CONNECTION_VALUE);
439     if (ret != 0) {
440         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtListen failed, ret :%d errorcode %d", ret, FtGetErrno());
441         DestroyStreamSocket();
442         return false;
443     }
444 
445     epollFd_ = FtEpollCreate();
446     if (epollFd_ < 0) {
447         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Failed to create epoll fd:%d", FtGetErrno());
448         DestroyStreamSocket();
449         return false;
450     }
451     isStreamRecv_ = true;
452     streamType_ = streamType;
453     sessionKey_.second = sessionKey.second;
454     if (sessionKey_.first == nullptr) {
455         sessionKey_.first = new uint8_t[sessionKey_.second];
456     }
457     if (memcpy_s(sessionKey_.first, sessionKey_.second, sessionKey.first, sessionKey.second) != EOK) {
458         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "memcpy key error.");
459         return false;
460     }
461     auto self = this->GetSelf();
462     std::thread([self]() { self->NotifyStreamListener(); }).detach();
463 
464     std::thread([self]() {
465         if (!self->Accept()) {
466             self->DestroyStreamSocket();
467             return;
468         }
469         self->DoStreamRecv();
470         self->DestroyStreamSocket();
471     }).detach();
472 
473     bool &isDestroyed = isDestroyed_;
474     std::thread([self, &isDestroyed]() {
475         while (!isDestroyed) {
476             self->FillpAppStatistics();
477             std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
478         }
479     }).detach();
480 
481     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
482         "CreateServer end, listenFd:%d, epollFd:%d, streamType:%d", listenFd_, epollFd_, streamType_);
483     return true;
484 }
485 
DestroyStreamSocket()486 void VtpStreamSocket::DestroyStreamSocket()
487 {
488     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyStreamSocket start");
489     std::lock_guard<std::mutex> guard(streamSocketLock_);
490     if (isDestroyed_) {
491         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "StreamSocket is already destroyed");
492         return;
493     }
494     if (listenFd_ != -1) {
495         FtClose(listenFd_);
496         listenFd_ = -1;
497     }
498 
499     if (streamFd_ != -1) {
500         RemoveStreamSocketLock(streamFd_); /* remove the socket lock from the map */
501         RemoveStreamSocketListener(streamFd_); /* remove the socket listener from the map */
502         FtClose(streamFd_);
503         streamFd_ = -1;
504     }
505 
506     if (epollFd_ != -1) {
507         FtClose(epollFd_);
508         epollFd_ = -1;
509     }
510 
511     if (streamReceiver_ != nullptr) {
512         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyStreamSocket receiver delete");
513         streamReceiver_->OnStreamStatus(STREAM_CLOSED);
514         streamReceiver_.reset();
515     }
516 
517     QuitStreamBuffer();
518     vtpInstance_->UpdateSocketStreamCount(false);
519     isDestroyed_ = true;
520     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyStreamSocket end");
521 }
522 
Connect(const IpAndPort & remote)523 bool VtpStreamSocket::Connect(const IpAndPort &remote)
524 {
525     if (remote.ip.empty()) {
526         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "remote addr  error, ip is nullptr");
527         DestroyStreamSocket();
528         return false;
529     }
530 
531     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
532         "Connect to server(server port:%d)", remote.port);
533     remoteIpPort_ = remote;
534 
535     struct sockaddr_in remoteSockAddr;
536     remoteSockAddr.sin_family = AF_INET;
537     remoteSockAddr.sin_port = htons(static_cast<short>(remote.port));
538     remoteSockAddr.sin_addr.s_addr = inet_addr(remote.ip.c_str());
539 
540     int ret = FtConnect(streamFd_, reinterpret_cast<struct sockaddr *>(&remoteSockAddr), sizeof(remoteSockAddr));
541     if (ret != 0) {
542         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtConnect failed, ret :%d, errorno: %d", ret, FtGetErrno());
543         DestroyStreamSocket();
544         return false;
545     }
546 
547     epollFd_ = FtEpollCreate();
548     if (epollFd_ < 0) {
549         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Failed to create epoll fd:%d", FtGetErrno());
550         DestroyStreamSocket();
551         return false;
552     }
553 
554     if (SetSocketEpollMode(streamFd_) != ERR_OK) {
555         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetSocketEpollMode failed, fd = %d", streamFd_);
556         DestroyStreamSocket();
557         return false;
558     }
559     isStreamRecv_ = true;
560     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Success to connect remote, and create a thread to recv data.");
561 
562     auto self = this->GetSelf();
563     std::thread([self]() { self->NotifyStreamListener(); }).detach();
564 
565     std::thread([self]() {
566         self->DoStreamRecv();
567         self->DestroyStreamSocket();
568     }).detach();
569 
570     bool &isDestroyed = isDestroyed_;
571     std::thread([self, &isDestroyed]() {
572         while (!isDestroyed) {
573             self->FillpAppStatistics();
574             std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
575         }
576     }).detach();
577     return true;
578 }
579 
Send(std::unique_ptr<IStream> stream)580 bool VtpStreamSocket::Send(std::unique_ptr<IStream> stream)
581 {
582     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "send in..., streamType:%d, data size:%zd, ext size:%zd", streamType_,
583         stream->GetBufferLen(), stream->GetExtBufferLen());
584 
585     if (!isBlocked_) {
586         isBlocked_ = true;
587         if (!SetNonBlockMode(streamFd_, StreamAttr(false))) {
588             return false;
589         }
590     }
591 
592     int ret = -1;
593     std::unique_ptr<char[]> data = nullptr;
594     ssize_t len = 0;
595 
596     if (streamType_ == RAW_STREAM) {
597         data = stream->GetBuffer();
598         len = stream->GetBufferLen();
599 
600         ret = FtSend(streamFd_, data.get(), len, 0);
601     } else if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
602         const Communication::SoftBus::StreamFrameInfo *streamFrameInfo = stream->GetStreamFrameInfo();
603         if (streamFrameInfo == nullptr) {
604             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "streamFrameInfo == nullptr");
605             return false;
606         }
607 
608         StreamPacketizer packet(streamType_, std::move(stream));
609         auto plainData = packet.PacketizeStream();
610         if (plainData == nullptr) {
611             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "PacketizeStream failed");
612             return false;
613         }
614         len = packet.GetPacketLen() + GetEncryptOverhead();
615         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
616             "packet.GetPacketLen() = %zd, GetEncryptOverhead() = %zd", packet.GetPacketLen(), GetEncryptOverhead());
617         data = std::make_unique<char[]>(len + FRAME_HEADER_LEN);
618         ssize_t encLen = Encrypt(plainData.get(), packet.GetPacketLen(), data.get() + FRAME_HEADER_LEN, len);
619         if (encLen != len) {
620             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
621                 "encrypted failed, dataLen = %zd, encryptLen = %zd", len, encLen);
622             return false;
623         }
624         InsertBufferLength(len, FRAME_HEADER_LEN, reinterpret_cast<uint8_t *>(data.get()));
625         len += FRAME_HEADER_LEN;
626 
627         FrameInfo frameInfo;
628         ConvertStreamFrameInfo2FrameInfo(&frameInfo, streamFrameInfo);
629         ret = FtSendFrame(streamFd_, data.get(), len, 0, &frameInfo);
630     }
631 
632     if (ret == -1) {
633         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "send failed, errorno: %d", FtGetErrno());
634         return false;
635     }
636 
637     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "send out..., streamType:%d, data size:%zd", streamType_, len);
638     return true;
639 }
640 
SetOption(int type,const StreamAttr & value)641 bool VtpStreamSocket::SetOption(int type, const StreamAttr &value)
642 {
643     PrintOptionInfo(type, value);
644     auto it = optFuncMap_.find(type);
645     if (it == optFuncMap_.end()) {
646         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "not found type = %d", type);
647         return false;
648     }
649 
650     if (value.GetType() != it->second.valueType) {
651         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN,
652             "type = %d, value.type = %d", value.GetType(), it->second.valueType);
653         return false;
654     }
655 
656     MySetFunc set = it->second.set;
657     if (set == nullptr) {
658         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "set is nullptr");
659         return false;
660     }
661 
662     if (type == NON_BLOCK || type == TOS) {
663         return (this->*set)(static_cast<int>(streamFd_), value);
664     }
665 
666     auto outerIt = FILLP_TYPE_MAP.find(type);
667     if (outerIt != FILLP_TYPE_MAP.end()) {
668         return (this->*set)(outerIt->second, value);
669     }
670 
671     auto innerIt = INNER_FILLP_TYPE_MAP.find(type);
672     if (innerIt != INNER_FILLP_TYPE_MAP.end()) {
673         return (this->*set)(innerIt->second, value);
674     }
675 
676     return (this->*set)(static_cast<int>(type), value);
677 }
678 
GetOption(int type) const679 StreamAttr VtpStreamSocket::GetOption(int type) const
680 {
681     StreamAttr attr {};
682     auto it = optFuncMap_.find(type);
683     if (it != optFuncMap_.end()) {
684         MyGetFunc get = it->second.get;
685         if (get == nullptr) {
686             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Can not get option:%d", type);
687             return std::move(StreamAttr());
688         }
689         if (type == NON_BLOCK) {
690             attr = (this->*get)(static_cast<int>(streamFd_));
691         } else {
692             attr = (this->*get)(static_cast<int>(type));
693         }
694     }
695 
696     PrintOptionInfo(type, attr);
697     return attr;
698 }
699 
SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)700 bool VtpStreamSocket::SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)
701 {
702     if (receiver == nullptr) {
703         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "receiver is nullptr");
704         return false;
705     }
706 
707     std::lock_guard<std::mutex> guard(streamSocketLock_);
708     streamReceiver_ = receiver;
709     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "set receiver success");
710     return true;
711 }
712 
InitVtpInstance(const std::string & pkgName)713 bool VtpStreamSocket::InitVtpInstance(const std::string &pkgName)
714 {
715     return vtpInstance_->InitVtp(pkgName);
716 }
717 
DestroyVtpInstance(const std::string & pkgName)718 void VtpStreamSocket::DestroyVtpInstance(const std::string &pkgName)
719 {
720     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyVtpInstance start");
721     vtpInstance_->DestroyVtp(pkgName);
722     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyVtpInstance end");
723 }
724 
CreateAndBindSocket(IpAndPort & local)725 int VtpStreamSocket::CreateAndBindSocket(IpAndPort &local)
726 {
727     localIpPort_ = local;
728     vtpInstance_->UpdateSocketStreamCount(true);
729     if (local.ip.empty()) {
730         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "ip is empty");
731         return -1;
732     }
733 
734     int sockFd = FtSocket(AF_INET, SOCK_STREAM, IPPROTO_FILLP);
735     if (sockFd == -1) {
736         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtSocket failed, errorcode = %d", FtGetErrno());
737         return -1;
738     }
739 
740     // bind
741     sockaddr_in localSockAddr = {0};
742     localSockAddr.sin_family = AF_INET;
743     localSockAddr.sin_port = htons((short)local.port);
744     localSockAddr.sin_addr.s_addr = inet_addr(local.ip.c_str());
745 
746     socklen_t localAddrLen = sizeof(localSockAddr);
747     int ret = FtBind(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), localAddrLen);
748     if (ret == -1) {
749         FtClose(sockFd);
750         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtBind failed, errorcode %d", FtGetErrno());
751         return -1;
752     }
753 
754     // 获取port
755     ret = FtGetSockName(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), &localAddrLen);
756     if (ret != ERR_OK) {
757         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "getsockname error ret: %d, errorcode :%d", ret, FtGetErrno());
758         FtClose(sockFd);
759         return -1;
760     }
761 
762     char host[ADDR_MAX_SIZE];
763     localIpPort_.port = static_cast<int32_t>(ntohs(localSockAddr.sin_port));
764     localIpPort_.ip = SoftBusInetNtoP(AF_INET, &(localSockAddr.sin_addr), host, ADDR_MAX_SIZE);
765     local.port = localIpPort_.port;
766 
767     if (!SetSocketBoundInner(sockFd, localIpPort_.ip)) {
768         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetSocketBoundInner failed, errorcode :%d", FtGetErrno());
769     }
770     return sockFd;
771 }
772 
773 
EnableBwEstimationAlgo(int streamFd,bool isServer) const774 bool VtpStreamSocket::EnableBwEstimationAlgo(int streamFd, bool isServer) const
775 {
776 #ifdef FILLP_SUPPORT_BW_DET
777     int errBwDet;
778     if (isServer) {
779         int32_t enableBwDet = FILLP_BW_DET_RX_ENABLE;
780         errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
781             &enableBwDet, sizeof(enableBwDet));
782     } else {
783         int32_t enableBwDet = FILLP_BW_DET_TX_ENABLE;
784         errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
785             &enableBwDet, sizeof(enableBwDet));
786     }
787     if (errBwDet < 0) {
788         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
789             "Fail to enable bandwidth estimation algorithm for stream: %d, errorcode = %d", streamFd, FtGetErrno());
790         return true;
791     } else {
792         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
793             "Success to enable bandwidth estimation algorithm for stream: %d", streamFd);
794         return false;
795     }
796 #else
797     return true;
798 #endif
799 }
800 
EnableJitterDetectionAlgo(int streamFd) const801 bool VtpStreamSocket::EnableJitterDetectionAlgo(int streamFd) const
802 {
803 #ifdef FILLP_SUPPORT_CQE
804     int32_t  enableCQE = FILLP_CQE_ENABLE;
805     auto errCQE = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_CQE_ALGO, &enableCQE, sizeof(enableCQE));
806     if (errCQE < 0) {
807         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
808             "Fail to enable CQE algorithm for stream: %d, errorcode = %d", streamFd, FtGetErrno());
809         return true;
810     } else {
811         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
812             "Success to enable CQE algorithm for stream: %d", streamFd);
813         return false;
814     }
815 #else
816     return true;
817 #endif
818 }
819 
EnableDirectlySend(int streamFd) const820 bool VtpStreamSocket::EnableDirectlySend(int streamFd) const
821 {
822     int32_t enable = 1;
823     FILLP_INT ret = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_DIRECTLY_SEND, &enable, sizeof(enable));
824     if (ret < 0) {
825         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
826             "Fail to enable direct send for stream: %d, errorcode = %d", streamFd, FtGetErrno());
827         return false;
828     }
829     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Success to enable direct send for stream: %d", streamFd);
830     return true;
831 }
832 
EnableSemiReliable(int streamFd) const833 bool VtpStreamSocket::EnableSemiReliable(int streamFd) const
834 {
835     int32_t enable = 1;
836     FILLP_INT ret = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SEMI_RELIABLE, &enable, sizeof(enable));
837     if (ret < 0) {
838         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
839             "Fail to enable direct send for stream: %d, errorcode = %d", streamFd, FtGetErrno());
840         return false;
841     }
842     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Success to enable direct send for stream: %d", streamFd);
843     return true;
844 }
845 
RegisterMetricCallback(bool isServer)846 void VtpStreamSocket::RegisterMetricCallback(bool isServer)
847 {
848     VtpStreamSocket::AddStreamSocketLock(streamFd_, streamSocketLock_);
849     auto self = this->GetSelf();
850     VtpStreamSocket::AddStreamSocketListener(streamFd_, self);
851     int regStatisticsRet = FtApiRegEventCallbackFunc(FILLP_CONFIG_ALL_SOCKET, FillpStatistics);
852     int value = 1;
853     auto err = FtSetSockOpt(streamFd_, IPPROTO_FILLP, FILLP_SOCK_TRAFFIC, &value, sizeof(value));
854     if (err < 0) {
855         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "fail to set socket binding to device");
856         return;
857     }
858     SoftBusLog(SOFTBUS_LOG_LNN, SOFTBUS_LOG_INFO, "FtSetSockOpt start success");
859     if (isServer) {
860         if (regStatisticsRet == 0) {
861             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
862                 "Success to register the stream callback function at server side with Fd = %d", streamFd_);
863         } else {
864             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
865                 "Fail to register the stream callback function at server side with Fd = %d, errcode:%d",
866                 streamFd_, regStatisticsRet);
867         }
868     } else {
869         if (regStatisticsRet == 0) {
870             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
871                 "Success to register the stream callback function at client side with Fd = %d", streamFd_);
872         } else {
873             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
874                 "Fail to register the stream callback function at client side with Fd = %d, errcode:%d",
875                 streamFd_, regStatisticsRet);
876         }
877     }
878 }
879 
Accept()880 bool VtpStreamSocket::Accept()
881 {
882     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "accept start");
883 
884     auto fd = FtAccept(listenFd_, nullptr, nullptr);
885     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "accept streamFd:%d", fd);
886     if (fd == -1) {
887         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "errorcode = %d", FtGetErrno());
888         return false;
889     }
890 
891     sockaddr remoteAddr {};
892     socklen_t remoteAddrLen = sizeof(remoteAddr);
893     auto ret = FtGetPeerName(fd, &remoteAddr, &remoteAddrLen);
894     if (ret != ERR_OK) {
895         FtClose(fd);
896         return false;
897     }
898 
899     char host[ADDR_MAX_SIZE];
900     if (remoteAddr.sa_family == AF_INET) {
901         auto v4Addr = reinterpret_cast<const sockaddr_in *>(&remoteAddr);
902         remoteIpPort_.ip = SoftBusInetNtoP(AF_INET, &(v4Addr->sin_addr), host, ADDR_MAX_SIZE);
903         remoteIpPort_.port = v4Addr->sin_port;
904     } else {
905         auto v6Addr = reinterpret_cast<const sockaddr_in6 *>(&remoteAddr);
906         remoteIpPort_.ip = SoftBusInetNtoP(AF_INET6, &(v6Addr->sin6_addr), host, ADDR_MAX_SIZE);
907         remoteIpPort_.port = v6Addr->sin6_port;
908     }
909 
910     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
911         "Accept a client(server port:%d)", remoteIpPort_.port);
912     SetDefaultConfig(fd);
913 
914     if (SetSocketEpollMode(fd) != ERR_OK) {
915         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetSocketEpollMode failed, fd = %d", fd);
916         FtClose(fd);
917         return false;
918     }
919 
920     std::lock_guard<std::mutex> guard(streamSocketLock_);
921     streamFd_ = fd;
922     configCv_.notify_all();
923 
924     if (streamReceiver_ != nullptr) {
925         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "notify stream connected!");
926         streamReceiver_->OnStreamStatus(STREAM_CONNECTED);
927     }
928 
929     bool isServer = true;
930     RegisterMetricCallback(isServer); /* register the callback function */
931     /* enable the bandwidth and CQE estimation algorithms for current ftsocket */
932 #ifdef FILLP_SUPPORT_BW_DET
933     EnableBwEstimationAlgo(streamFd_, isServer);
934 #endif
935 #ifdef FILLP_SUPPORT_CQE
936     EnableJitterDetectionAlgo(streamFd_);
937 #endif
938 
939     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "accept success!");
940     return true;
941 }
942 
EpollTimeout(int fd,int timeout)943 int VtpStreamSocket::EpollTimeout(int fd, int timeout)
944 {
945     struct SpungeEpollEvent events[MAX_EPOLL_NUM];
946     (void)memset_s(events, sizeof(events), 0, sizeof(events));
947     while (true) {
948         FILLP_INT fdNum = FtEpollWait(epollFd_, events, MAX_EPOLL_NUM, timeout);
949         if (fdNum <= 0) {
950             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
951                 "FtEpollWait failed, ret = %d, errno = %d", fdNum, FtGetErrno());
952             return -FtGetErrno();
953         }
954 
955         for (FILLP_INT i = 0; i < fdNum; i++) {
956             if (events[i].data.fd != fd) {
957                 continue;
958             }
959 
960             if (events[i].events & (SPUNGE_EPOLLHUP | SPUNGE_EPOLLERR)) {
961                 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
962                     "EpollTimeout, something may be wrong in this socket, fd = %d, events = %u", fd,
963                     (unsigned int)events[i].events);
964                 return -1;
965             }
966 
967             if (events[i].events & SPUNGE_EPOLLIN) {
968                 return 0;
969             }
970         }
971     }
972 }
973 
SetSocketEpollMode(int fd)974 int VtpStreamSocket::SetSocketEpollMode(int fd)
975 {
976     if (!SetNonBlockMode(fd, StreamAttr(true))) {
977         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetNonBlockMode failed, errno = %d", FtGetErrno());
978         return -1;
979     }
980 
981     struct SpungeEpollEvent event = {0};
982     event.events = SPUNGE_EPOLLIN;
983     event.data.fd = fd;
984 
985     auto ret = FtEpollCtl(epollFd_, SPUNGE_EPOLL_CTL_ADD, fd, &event);
986     if (ret != ERR_OK) {
987         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtEpollCtl failed, ret = %d, errno = %d", ret, FtGetErrno());
988         return ret;
989     }
990 
991     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "SetNonBlockMode success");
992     return 0;
993 }
994 
InsertBufferLength(int num,int length,uint8_t * output) const995 void VtpStreamSocket::InsertBufferLength(int num, int length, uint8_t *output) const
996 {
997     for (int i = 0; i < length; i++) {
998         output[length - 1 - i] = static_cast<unsigned int>(
999             ((static_cast<unsigned int>(num) >> static_cast<unsigned int>(BYTE_TO_BIT * i))) & INT_TO_BYTE);
1000     }
1001 }
1002 
MakeStreamData(StreamData & data,const StreamFrameInfo & info) const1003 std::unique_ptr<IStream> VtpStreamSocket::MakeStreamData(StreamData &data, const StreamFrameInfo &info) const
1004 {
1005     std::unique_ptr<IStream> stream = nullptr;
1006     switch (streamType_) {
1007         case VIDEO_SLICE_STREAM:
1008             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "do not support VIDEO_SLICE_STREAM type = %d", streamType_);
1009             break;
1010         case COMMON_VIDEO_STREAM:
1011         case COMMON_AUDIO_STREAM:
1012             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
1013                 "streamType = %d, seqnum=%d, streamid=%d", streamType_, info.seqNum, info.streamId);
1014             stream = IStream::MakeCommonStream(data, info);
1015             break;
1016         case RAW_STREAM:
1017             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "streamType = %d", streamType_);
1018             stream = IStream::MakeRawStream(data, info);
1019             break;
1020         default:
1021             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "do not support type = %d", streamType_);
1022             break;
1023     }
1024     if (stream == nullptr) {
1025         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "IStream construct error");
1026         return nullptr;
1027     }
1028 
1029     return stream;
1030 }
1031 
RecvStreamLen()1032 int VtpStreamSocket::RecvStreamLen()
1033 {
1034     int hdrSize = FRAME_HEADER_LEN;
1035     if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1036         hdrSize = streamHdrSize_;
1037     }
1038 
1039     int len = -1;
1040     int timeout = -1;
1041     auto buffer = std::make_unique<char[]>(hdrSize);
1042     if (EpollTimeout(streamFd_, timeout) == 0) {
1043         do {
1044             len = FtRecv(streamFd_, buffer.get(), hdrSize, 0);
1045         } while (len <= 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
1046     }
1047     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "recv frame header, len = %d, scene:%d", len, scene_);
1048 
1049     if (len <= 0) {
1050         return -1;
1051     }
1052 
1053     if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1054         std::lock_guard<std::mutex> guard(streamSocketLock_);
1055         if (streamReceiver_ != nullptr) {
1056             return streamReceiver_->OnStreamHdrReceived(std::move(buffer), hdrSize);
1057         }
1058     }
1059 
1060     return ntohl(*reinterpret_cast<int *>(buffer.get()));
1061 }
1062 
DoStreamRecv()1063 void VtpStreamSocket::DoStreamRecv()
1064 {
1065     while (isStreamRecv_) {
1066         std::unique_ptr<char[]> dataBuffer = nullptr;
1067         std::unique_ptr<char[]> extBuffer = nullptr;
1068         int extLen = 0;
1069         StreamFrameInfo info = {};
1070         int dataLength = 0;
1071         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "recv stream");
1072         dataLength = VtpStreamSocket::RecvStreamLen();
1073         if (dataLength <= 0 || dataLength > MAX_STREAM_LEN) {
1074             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "read frame length error, dataLength = %d", dataLength);
1075             break;
1076         }
1077         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
1078             "recv a new frame, dataLength = %d, stream type:%d", dataLength, streamType_);
1079         dataBuffer = VtpStreamSocket::RecvStream(dataLength);
1080 
1081         if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
1082             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "recv common stream");
1083             int decryptedLength = dataLength;
1084             auto decryptedBuffer = std::move(dataBuffer);
1085 
1086             int plainDataLength = decryptedLength - GetEncryptOverhead();
1087             std::unique_ptr<char[]> plainData = std::make_unique<char[]>(plainDataLength);
1088             ssize_t decLen = Decrypt(decryptedBuffer.get(), decryptedLength, plainData.get(), plainDataLength);
1089             if (decLen != plainDataLength) {
1090                 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1091                     "Decrypt failed, dataLength = %d, decryptedLen = %zd", plainDataLength, decLen);
1092                 break;
1093             }
1094             auto header = plainData.get();
1095             StreamDepacketizer decode(streamType_);
1096             decode.DepacketizeHeader(header);
1097 
1098             auto buffer = plainData.get() + sizeof(CommonHeader);
1099             decode.DepacketizeBuffer(buffer);
1100 
1101             extBuffer = decode.GetUserExt();
1102             extLen = decode.GetUserExtSize();
1103             info.seqNum = decode.GetSeqNum();
1104             info.streamId = decode.GetStreamId();
1105             dataBuffer = decode.GetData();
1106             dataLength = decode.GetDataLength();
1107             if (dataLength <= 0) {
1108                 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1109                     "common depacketize error, dataLength = %d", dataLength);
1110                 break;
1111             }
1112         }
1113 
1114         StreamData data = { std::move(dataBuffer), dataLength, std::move(extBuffer), extLen };
1115         std::unique_ptr<IStream> stream = MakeStreamData(data, info);
1116         if (stream == nullptr) {
1117             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "MakeStreamData failed, stream == nullptr");
1118             break;
1119         }
1120 
1121         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
1122             "recv frame done, dataLength = %d, stream type:%d", dataLength, streamType_);
1123 
1124         if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1125             std::lock_guard<std::mutex> guard(streamSocketLock_);
1126             if (streamReceiver_ != nullptr) {
1127                 streamReceiver_->OnStreamReceived(std::move(stream));
1128                 continue;
1129             }
1130         }
1131 
1132         PutStream(std::move(stream));
1133         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
1134             "put frame done, dataLength = %d, stream type:%d", dataLength, streamType_);
1135     }
1136     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "recv thread exit");
1137 }
1138 
RecvStream(int dataLength)1139 std::unique_ptr<char[]> VtpStreamSocket::RecvStream(int dataLength)
1140 {
1141     auto buffer = std::make_unique<char[]>(dataLength);
1142     int recvLen = 0;
1143     while (recvLen < dataLength) {
1144         int ret = -1;
1145         int timeout = -1;
1146 
1147         if (EpollTimeout(streamFd_, timeout) == 0) {
1148             do {
1149                 ret = FtRecv(streamFd_, (buffer.get() + recvLen), dataLength - recvLen, 0);
1150             } while (ret < 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
1151         }
1152 
1153         if (ret == -1) {
1154             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "read frame failed, errno: %d", FtGetErrno());
1155             return nullptr;
1156         }
1157 
1158         recvLen += ret;
1159     }
1160     return std::unique_ptr<char[]>(buffer.release());
1161 }
1162 
SetDefaultConfig(int fd)1163 void VtpStreamSocket::SetDefaultConfig(int fd)
1164 {
1165     if (!SetIpTos(fd, StreamAttr(static_cast<int>(IPTOS_LOWDELAY)))) {
1166         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "SetIpTos failed");
1167     }
1168 
1169     if (!EnableDirectlySend(fd)) {
1170         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "EnableDirectlySend failed");
1171     }
1172 
1173     if (!EnableSemiReliable(fd)) {
1174         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "EnableSemiReliable failed");
1175     }
1176 
1177     FILLP_BOOL enable = 1;
1178     if (!FtConfigSet(FT_CONF_APP_DIFFER_TRANSMIT, &enable, &fd)) {
1179         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Set differ transmit failed");
1180     }
1181 
1182     if (!SetOption(RECV_BUF_SIZE, StreamAttr(static_cast<int>(DEFAULT_UDP_BUFFER_RCV_SIZE)))) {
1183         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Set recv buff failed");
1184     }
1185 
1186     if (!SetOption(SEND_BUF_SIZE, StreamAttr(static_cast<int>(DEFAULT_UDP_BUFFER_SIZE)))) {
1187         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Set send buff failed");
1188     }
1189 
1190     if (!SetOption(InnerStreamOptionType::RECV_CACHE, StreamAttr(static_cast<int>(FILLP_VTP_RECV_CACHE_SIZE)))) {
1191         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Set recv cache failed");
1192     }
1193 
1194     if (!SetOption(InnerStreamOptionType::SEND_CACHE, StreamAttr(static_cast<int>(FILLP_VTP_SEND_CACHE_SIZE)))) {
1195         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Set send cache failed");
1196     }
1197 }
1198 
SetIpTos(int fd,const StreamAttr & tos)1199 bool VtpStreamSocket::SetIpTos(int fd, const StreamAttr &tos)
1200 {
1201     auto tmp = tos.GetIntValue();
1202     if (FtSetSockOpt(fd, IPPROTO_IP, IP_TOS, &tmp, sizeof(tmp)) != ERR_OK) {
1203         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetIpTos wrong! fd=%d, errorcode=%d", fd, FtGetErrno());
1204         return false;
1205     }
1206 
1207     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Success to set ip tos: fd=%d, tos=%d", fd, tmp);
1208     return true;
1209 }
1210 
GetIpTos(int type) const1211 StreamAttr VtpStreamSocket::GetIpTos(int type) const
1212 {
1213     static_cast<void>(type);
1214     int tos;
1215     int size = sizeof(tos);
1216 
1217     if (FtGetSockOpt(streamFd_, IPPROTO_IP, IP_TOS, &tos, &size) != ERR_OK) {
1218         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtGetSockOpt errorcode = %d", FtGetErrno());
1219         return std::move(StreamAttr());
1220     }
1221 
1222     return std::move(StreamAttr(tos));
1223 }
1224 
GetStreamSocketFd(int type) const1225 StreamAttr VtpStreamSocket::GetStreamSocketFd(int type) const
1226 {
1227     static_cast<void>(type);
1228     return std::move(StreamAttr(streamFd_));
1229 }
1230 
GetListenSocketFd(int type) const1231 StreamAttr VtpStreamSocket::GetListenSocketFd(int type) const
1232 {
1233     static_cast<void>(type);
1234     return std::move(StreamAttr(listenFd_));
1235 }
1236 
SetSocketBoundInner(int fd,std::string ip) const1237 bool VtpStreamSocket::SetSocketBoundInner(int fd, std::string ip) const
1238 {
1239     auto boundIp = (ip == "") ? localIpPort_.ip : ip;
1240     struct ifaddrs *ifList = nullptr;
1241     if (getifaddrs(&ifList) < 0) {
1242         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1243             "get interface address return error %d (%s)", errno, strerror(errno));
1244         return false;
1245     }
1246 
1247     struct ifaddrs *ifa = nullptr;
1248     for (ifa = ifList; ifa != nullptr; ifa = ifa->ifa_next) {
1249         if (ifa->ifa_addr == nullptr) {
1250             continue;
1251         }
1252         if (ifa->ifa_addr->sa_family != AF_INET) {
1253             continue;
1254         }
1255 
1256         char host[ADDR_MAX_SIZE];
1257         std::string devName(ifa->ifa_name);
1258         if (strcmp(boundIp.c_str(), SoftBusInetNtoP(AF_INET, &(((struct sockaddr_in *)ifa->ifa_addr)->sin_addr),
1259             host, ADDR_MAX_SIZE)) == 0) {
1260             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "current use interface %s to bind to socket", ifa->ifa_name);
1261             auto err = FtSetSockOpt(fd, SOL_SOCKET, SO_BINDTODEVICE, devName.c_str(), devName.size());
1262             if (err < 0) {
1263                 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "fail to set socket binding to device");
1264                 freeifaddrs(ifList);
1265                 return false;
1266             }
1267             break;
1268         }
1269     }
1270     freeifaddrs(ifList);
1271 
1272     return true;
1273 }
1274 
SetSocketBindToDevices(int type,const StreamAttr & ip)1275 bool VtpStreamSocket::SetSocketBindToDevices(int type, const StreamAttr &ip)
1276 {
1277     static_cast<void>(type);
1278     auto tmp = ip.GetStrValue();
1279     auto boundIp = (tmp == "") ? localIpPort_.ip : tmp;
1280     return SetSocketBoundInner(streamFd_, boundIp);
1281 }
1282 
SetVtpStackConfigDelayed(int type,const StreamAttr & value)1283 bool VtpStreamSocket::SetVtpStackConfigDelayed(int type, const StreamAttr &value)
1284 {
1285     std::unique_lock<std::mutex> lock(streamSocketLock_);
1286     if (streamFd_ == -1) {
1287         configCv_.wait(lock, [this] { return streamFd_ != -1; });
1288     }
1289     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "set vtp stack config, streamFd = %d", streamFd_);
1290     return SetVtpStackConfig(type, value);
1291 }
1292 
SetVtpStackConfig(int type,const StreamAttr & value)1293 bool VtpStreamSocket::SetVtpStackConfig(int type, const StreamAttr &value)
1294 {
1295     if (streamFd_ == -1) {
1296         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "set vtp stack config when streamFd is legal");
1297         auto self = GetSelf();
1298         std::thread([self, type, value]() { self->SetVtpStackConfigDelayed(type, value); }).detach();
1299         return true;
1300     }
1301 
1302     if (value.GetType() == INT_TYPE) {
1303         int intVal = value.GetIntValue();
1304         int ret = FtConfigSet(type, &intVal, &streamFd_);
1305         if (ret != 0) {
1306             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1307                 "FtConfigSet failed, type = %d, errorcode = %d", type, FtGetErrno());
1308             return false;
1309         }
1310 
1311         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
1312             "setVtpConfig(%d) success, fd= %d, value= %d", type, streamFd_, intVal);
1313         return true;
1314     }
1315 
1316     if (value.GetType() == BOOL_TYPE) {
1317         bool flag = value.GetBoolValue();
1318         int ret = FtConfigSet(type, &flag, &streamFd_);
1319         if (ret != 0) {
1320             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1321                 "FtConfigSet failed, type = %d, errorcode = %d", type, FtGetErrno());
1322             return false;
1323         }
1324 
1325         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
1326             "setVtpConfig(%d) success, fd= %d, value= %d", type, streamFd_, flag);
1327         return true;
1328     }
1329 
1330     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "UNKNOWN TYPE!");
1331     return false;
1332 }
1333 
GetVtpStackConfig(int type) const1334 StreamAttr VtpStreamSocket::GetVtpStackConfig(int type) const
1335 {
1336     int intVal = -1;
1337     int configFd = (streamFd_ == -1) ? FILLP_CONFIG_ALL_SOCKET : streamFd_;
1338     int ret = FtConfigGet(type, &intVal, &configFd);
1339     if (ret != 0) {
1340         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1341             "FtConfigGet failed, type = %d, errorcode = %d", type, FtGetErrno());
1342         return std::move(StreamAttr());
1343     }
1344 
1345     int valType = ValueType::UNKNOWN;
1346     for (auto it = FILLP_TYPE_MAP.begin(); it != FILLP_TYPE_MAP.end(); it++) {
1347         if (it->second != type) {
1348             continue;
1349         }
1350 
1351         valType = optFuncMap_.at(it->first).valueType;
1352         break;
1353     }
1354 
1355     if (valType != ValueType::UNKNOWN) {
1356         for (auto it = INNER_FILLP_TYPE_MAP.begin(); it != INNER_FILLP_TYPE_MAP.end(); it++) {
1357             if (it->second != type) {
1358                 continue;
1359             }
1360 
1361             valType = optFuncMap_.at(it->first).valueType;
1362             break;
1363         }
1364     }
1365 
1366     if (valType == BOOL_TYPE) {
1367         return std::move(StreamAttr(!!intVal));
1368     }
1369 
1370     return std::move(StreamAttr(intVal));
1371 }
1372 
SetNonBlockMode(int fd,const StreamAttr & value)1373 bool VtpStreamSocket::SetNonBlockMode(int fd, const StreamAttr &value)
1374 {
1375     FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1376     if (flags < 0) {
1377         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "failed to get FtFcntl, flags = %d", flags);
1378         flags = 0;
1379     }
1380     bool nonBlock = value.GetBoolValue();
1381 
1382     flags = nonBlock ? static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) | O_NONBLOCK)) :
1383         static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) & ~O_NONBLOCK));
1384 
1385     FILLP_INT res = FtFcntl(fd, F_SETFL, flags);
1386     if (res < 0) {
1387         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "failed to set FtFcntl, res = %d", res);
1388         return false;
1389     }
1390 
1391     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Successfully to set fd(%d) nonBlock mode = %d", fd, nonBlock);
1392     return true;
1393 }
1394 
GetNonBlockMode(int fd) const1395 StreamAttr VtpStreamSocket::GetNonBlockMode(int fd) const
1396 {
1397     FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1398     if (static_cast<unsigned int>(flags) & O_NONBLOCK) {
1399         return std::move(StreamAttr(true));
1400     }
1401 
1402     return std::move(StreamAttr(false));
1403 }
1404 
GetIp(int type) const1405 StreamAttr VtpStreamSocket::GetIp(int type) const
1406 {
1407     if (type == LOCAL_IP) {
1408         return std::move(StreamAttr(localIpPort_.ip));
1409     }
1410 
1411     return std::move(StreamAttr(remoteIpPort_.ip));
1412 }
1413 
GetPort(int type) const1414 StreamAttr VtpStreamSocket::GetPort(int type) const
1415 {
1416     if (type == LOCAL_PORT) {
1417         return std::move(StreamAttr(localIpPort_.port));
1418     }
1419     return std::move(StreamAttr(remoteIpPort_.port));
1420 }
1421 
SetStreamType(int type,const StreamAttr & value)1422 bool VtpStreamSocket::SetStreamType(int type, const StreamAttr &value)
1423 {
1424     if (type != STREAM_TYPE_INT) {
1425         return false;
1426     }
1427 
1428     streamType_ = value.GetIntValue();
1429     return true;
1430 }
1431 
GetStreamType(int type) const1432 StreamAttr VtpStreamSocket::GetStreamType(int type) const
1433 {
1434     if (type != STREAM_TYPE_INT) {
1435         return std::move(StreamAttr());
1436     }
1437 
1438     return std::move(StreamAttr(streamType_));
1439 }
1440 
SetStreamScene(int type,const StreamAttr & value)1441 bool VtpStreamSocket::SetStreamScene(int type, const StreamAttr &value)
1442 {
1443     static_cast<void>(type);
1444     if (value.GetType() != INT_TYPE) {
1445         return false;
1446     }
1447     scene_ = value.GetIntValue();
1448     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Set scene to %d", scene_);
1449     return true;
1450 }
1451 
SetStreamHeaderSize(int type,const StreamAttr & value)1452 bool VtpStreamSocket::SetStreamHeaderSize(int type, const StreamAttr &value)
1453 {
1454     static_cast<void>(type);
1455     if (value.GetType() != INT_TYPE) {
1456         return false;
1457     }
1458     streamHdrSize_ = value.GetIntValue();
1459     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Set header size to %d", streamHdrSize_);
1460     return true;
1461 }
1462 
NotifyStreamListener()1463 void VtpStreamSocket::NotifyStreamListener()
1464 {
1465     while (isStreamRecv_) {
1466         int streamNum = GetStreamNum();
1467         if (streamNum >= STREAM_BUFFER_THRESHOLD) {
1468             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Too many data in receiver, num = %d", streamNum);
1469         }
1470 
1471         auto stream = TakeStream();
1472         if (stream == nullptr) {
1473             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Pop stream failed");
1474             break;
1475         }
1476 
1477         std::lock_guard<std::mutex> guard(streamSocketLock_);
1478         if (streamReceiver_ != nullptr) {
1479             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "notify listener");
1480             streamReceiver_->OnStreamReceived(std::move(stream));
1481             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "notify listener done.");
1482         }
1483     }
1484     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "notify thread exit");
1485 }
1486 
GetEncryptOverhead() const1487 ssize_t VtpStreamSocket::GetEncryptOverhead() const
1488 {
1489     return OVERHEAD_LEN;
1490 }
1491 
Encrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1492 ssize_t VtpStreamSocket::Encrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1493 {
1494     AesGcmCipherKey cipherKey = {0};
1495 
1496     if (inLen - OVERHEAD_LEN > outLen) {
1497         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Encrypt invalid para.");
1498         return SOFTBUS_ERR;
1499     }
1500 
1501     cipherKey.keyLen = SESSION_KEY_LENGTH;
1502     if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.first, sessionKey_.second) != EOK) {
1503         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "memcpy key error.");
1504         return SOFTBUS_ERR;
1505     }
1506 
1507     int ret = SoftBusEncryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1508     (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1509     if (ret != SOFTBUS_OK || outLen != inLen + OVERHEAD_LEN) {
1510         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Encrypt Data fail. %d", ret);
1511         return SOFTBUS_ENCRYPT_ERR;
1512     }
1513     return outLen;
1514 }
1515 
Decrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1516 ssize_t VtpStreamSocket::Decrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1517 {
1518     AesGcmCipherKey cipherKey = {0};
1519 
1520     if (inLen - OVERHEAD_LEN > outLen) {
1521         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Decrypt invalid para.");
1522         return SOFTBUS_ERR;
1523     }
1524 
1525     cipherKey.keyLen = SESSION_KEY_LENGTH; // 256 bit encryption
1526     if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.first, sessionKey_.second) != EOK) {
1527         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "memcpy key error.");
1528         return SOFTBUS_ERR;
1529     }
1530     int ret = SoftBusDecryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1531     (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1532     if (ret != SOFTBUS_OK) {
1533         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Decrypt Data fail. %d ", ret);
1534         return SOFTBUS_DECRYPT_ERR;
1535     }
1536 
1537     return outLen;
1538 }
1539 } // namespace SoftBus
1540 } // namespace Communication
1541