• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "vtp_stream_socket.h"
17 
18 #include <ifaddrs.h>
19 #include <thread>
20 
21 #include "softbus_adapter_crypto.h"
22 #include "softbus_adapter_socket.h"
23 #include "stream_depacketizer.h"
24 #include "stream_packetizer.h"
25 
26 namespace Communication {
27 namespace SoftBus {
28 bool g_logOn = false;
29 const int FEED_BACK_PERIOD = 1;  /* feedback period of fillp stream traffic statistics is 1s */
30 const int MS_PER_SECOND = 1000;
31 const int US_PER_MS = 1000;
32 
33 namespace {
PrintOptionInfo(int type,const StreamAttr & value)34 void PrintOptionInfo(int type, const StreamAttr &value)
35 {
36     switch (value.GetType()) {
37         case INT_TYPE:
38             TRANS_LOGI(TRANS_STREAM,
39                 "Int option: type=%{public}d, value=%{public}d", type, value.GetIntValue());
40             break;
41         case BOOL_TYPE:
42             TRANS_LOGI(TRANS_STREAM,
43                 "Bool option: type=%{public}d, value=%{public}d", type, value.GetBoolValue());
44             break;
45         case STRING_TYPE:
46             TRANS_LOGD(TRANS_STREAM,
47                 "String option: type=%{public}d, value=%{public}s", type, value.GetStrValue().c_str());
48             break;
49         default:
50             TRANS_LOGE(TRANS_STREAM, "Wrong StreamAttr!");
51             (void)type;
52     }
53 }
54 } // namespace
55 std::shared_ptr<VtpInstance> VtpStreamSocket::vtpInstance_ = VtpInstance::GetVtpInstance();
56 
57 std::map<int, std::mutex &> VtpStreamSocket::g_streamSocketLockMap;
58 std::mutex VtpStreamSocket::g_streamSocketLockMapLock_;
59 std::map<int, std::shared_ptr<VtpStreamSocket>> VtpStreamSocket::g_streamSocketMap;
60 std::mutex VtpStreamSocket::g_streamSocketMapLock_;
61 
ConvertStreamFrameInfo2FrameInfo(FrameInfo * frameInfo,const Communication::SoftBus::StreamFrameInfo * streamFrameInfo)62 static inline void ConvertStreamFrameInfo2FrameInfo(FrameInfo* frameInfo,
63     const Communication::SoftBus::StreamFrameInfo* streamFrameInfo)
64 {
65     frameInfo->frameType = (FILLP_INT)(streamFrameInfo->frameType);
66     frameInfo->seqNum = (FILLP_INT)(streamFrameInfo->seqNum);
67     frameInfo->subSeqNum = (FILLP_INT)(streamFrameInfo->seqSubNum);
68     frameInfo->level = (FILLP_INT)(streamFrameInfo->level);
69     frameInfo->timestamp = (FILLP_SLONG)streamFrameInfo->timeStamp;
70     frameInfo->bitMap = (FILLP_UINT32)streamFrameInfo->bitMap;
71 }
72 
AddStreamSocketLock(int fd,std::mutex & streamsocketlock)73 void VtpStreamSocket::AddStreamSocketLock(int fd, std::mutex &streamsocketlock)
74 {
75     std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
76     if (!g_streamSocketLockMap.empty() && g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
77         TRANS_LOGE(TRANS_STREAM, "streamsocketlock for the fd already exists. fd=%{public}d", fd);
78         return;
79     }
80 
81     g_streamSocketLockMap.emplace(std::pair<int, std::mutex &>(fd, streamsocketlock));
82 }
83 
AddStreamSocketListener(int fd,std::shared_ptr<VtpStreamSocket> streamreceiver)84 void VtpStreamSocket::AddStreamSocketListener(int fd, std::shared_ptr<VtpStreamSocket> streamreceiver)
85 {
86     std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
87     if (!g_streamSocketMap.empty() && g_streamSocketMap.find(fd) != g_streamSocketMap.end()) {
88         TRANS_LOGE(TRANS_STREAM, "streamreceiver for the fd already exists. fd=%{public}d", fd);
89         return;
90     }
91 
92     g_streamSocketMap.emplace(std::pair<int, std::shared_ptr<VtpStreamSocket>>(fd, streamreceiver));
93 }
94 
RemoveStreamSocketLock(int fd)95 void VtpStreamSocket::RemoveStreamSocketLock(int fd)
96 {
97     std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
98     if (g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
99         g_streamSocketLockMap.erase(fd);
100         TRANS_LOGI(TRANS_STREAM, "Remove streamsocketlock for the fd success. fd=%{public}d", fd);
101     } else {
102         TRANS_LOGE(TRANS_STREAM,
103             "Streamsocketlock for the fd not exist in the map. fd=%{public}d", fd);
104     }
105 }
106 
RemoveStreamSocketListener(int fd)107 void VtpStreamSocket::RemoveStreamSocketListener(int fd)
108 {
109     std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
110     if (g_streamSocketMap.find(fd) != g_streamSocketMap.end()) {
111         g_streamSocketMap.erase(fd);
112         TRANS_LOGI(TRANS_STREAM, "Remove streamreceiver for the fd success. fd=%{public}d", fd);
113     } else {
114         TRANS_LOGE(TRANS_STREAM, "Streamreceiver for the fd not exist in the map. fd=%{public}d", fd);
115     }
116 }
117 
InsertElementToFuncMap(int type,ValueType valueType,MySetFunc set,MyGetFunc get)118 void VtpStreamSocket::InsertElementToFuncMap(int type, ValueType valueType, MySetFunc set, MyGetFunc get)
119 {
120     OptionFunc fun = {
121         valueType, set, get
122     };
123     optFuncMap_.insert(std::pair<int, OptionFunc>(type, fun));
124 }
125 
VtpStreamSocket()126 VtpStreamSocket::VtpStreamSocket()
127 {
128     InsertElementToFuncMap(TOS, INT_TYPE, &VtpStreamSocket::SetIpTos, &VtpStreamSocket::GetIpTos);
129     InsertElementToFuncMap(FD, INT_TYPE, nullptr, &VtpStreamSocket::GetStreamSocketFd);
130     InsertElementToFuncMap(SERVER_FD, INT_TYPE, nullptr, &VtpStreamSocket::GetListenSocketFd);
131     InsertElementToFuncMap(LOCAL_IP, INT_TYPE, nullptr, &VtpStreamSocket::GetIp);
132     InsertElementToFuncMap(LOCAL_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
133     InsertElementToFuncMap(REMOTE_IP, STRING_TYPE, nullptr, &VtpStreamSocket::GetIp);
134     InsertElementToFuncMap(REMOTE_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
135     InsertElementToFuncMap(BOUND_INTERFACE_IP, STRING_TYPE, &VtpStreamSocket::SetSocketBindToDevices, nullptr);
136     InsertElementToFuncMap(IP_TYPE, STRING_TYPE, nullptr, &VtpStreamSocket::GetIpType);
137     InsertElementToFuncMap(REMOTE_SCOPE_ID, INT_TYPE, nullptr, &VtpStreamSocket::GetRemoteScopeId);
138     InsertElementToFuncMap(NON_BLOCK, BOOL_TYPE, &VtpStreamSocket::SetNonBlockMode, &VtpStreamSocket::GetNonBlockMode);
139     InsertElementToFuncMap(KEEP_ALIVE_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig, nullptr);
140     InsertElementToFuncMap(SEND_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
141         &VtpStreamSocket::GetVtpStackConfig);
142     InsertElementToFuncMap(RECV_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
143         &VtpStreamSocket::GetVtpStackConfig);
144     InsertElementToFuncMap(SEND_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
145         &VtpStreamSocket::GetVtpStackConfig);
146     InsertElementToFuncMap(RECV_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
147         &VtpStreamSocket::GetVtpStackConfig);
148     InsertElementToFuncMap(PACKET_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
149         &VtpStreamSocket::GetVtpStackConfig);
150     InsertElementToFuncMap(MAX_VTP_SOCKET_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
151         &VtpStreamSocket::GetVtpStackConfig);
152     InsertElementToFuncMap(MAX_VTP_CONNECT_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
153         &VtpStreamSocket::GetVtpStackConfig);
154     InsertElementToFuncMap(REDUNANCY_SWITCH, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
155         &VtpStreamSocket::GetVtpStackConfig);
156     InsertElementToFuncMap(REDUNANCY_LEVEL, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
157         &VtpStreamSocket::GetVtpStackConfig);
158     InsertElementToFuncMap(NACK_DELAY, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
159         &VtpStreamSocket::GetVtpStackConfig);
160     InsertElementToFuncMap(NACK_DELAY_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
161         &VtpStreamSocket::GetVtpStackConfig);
162     InsertElementToFuncMap(PACK_INTERVAL_ENLARGE, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
163         &VtpStreamSocket::GetVtpStackConfig);
164     InsertElementToFuncMap(STREAM_TYPE_INT, INT_TYPE, &VtpStreamSocket::SetStreamType, &VtpStreamSocket::GetStreamType);
165     InsertElementToFuncMap(IS_SERVER, INT_TYPE, nullptr, &VtpStreamSocket::IsServer);
166     InsertElementToFuncMap(SCENE, INT_TYPE, &VtpStreamSocket::SetStreamScene, nullptr);
167     InsertElementToFuncMap(STREAM_HEADER_SIZE, INT_TYPE, &VtpStreamSocket::SetStreamHeaderSize, nullptr);
168 
169     scene_ = UNKNOWN_SCENE;
170 }
171 
~VtpStreamSocket()172 VtpStreamSocket::~VtpStreamSocket()
173 {
174     TRANS_LOGW(TRANS_STREAM, "~VtpStreamSocket");
175 }
176 
GetSelf()177 std::shared_ptr<VtpStreamSocket> VtpStreamSocket::GetSelf()
178 {
179     return shared_from_this();
180 }
181 
HandleFillpFrameStats(int fd,const FtEventCbkInfo * info)182 int VtpStreamSocket::HandleFillpFrameStats(int fd, const FtEventCbkInfo *info)
183 {
184     if (info == nullptr) {
185         TRANS_LOGE(TRANS_STREAM, "stats info is nullptr");
186         return SOFTBUS_INVALID_PARAM;
187     }
188     StreamSendStats stats = {};
189     if (memcpy_s(&stats, sizeof(StreamSendStats), &info->info.frameSendStats,
190         sizeof(info->info.frameSendStats)) != EOK) {
191         TRANS_LOGE(TRANS_STREAM, "streamStats info memcpy fail");
192         return SOFTBUS_MEM_ERR;
193     }
194 
195     std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
196     auto itListener = g_streamSocketMap.find(fd);
197     if (itListener != g_streamSocketMap.end()) {
198         if (itListener->second->streamReceiver_ != nullptr) {
199             TRANS_LOGD(TRANS_STREAM, "OnFrameStats enter");
200             itListener->second->streamReceiver_->OnFrameStats(&stats);
201         } else {
202             TRANS_LOGE(TRANS_STREAM, "streamReceiver_ is nullptr");
203         }
204     } else {
205         TRANS_LOGE(TRANS_STREAM, "StreamReceiver for the fd is empty in the map. fd=%{public}d", fd);
206     }
207     return SOFTBUS_OK;
208 }
209 
HandleRipplePolicy(int fd,const FtEventCbkInfo * info)210 int VtpStreamSocket::HandleRipplePolicy(int fd, const FtEventCbkInfo *info)
211 {
212     if (info == nullptr) {
213         TRANS_LOGE(TRANS_STREAM, "policy info is nullptr");
214         return SOFTBUS_INVALID_PARAM;
215     }
216     TrafficStats stats;
217     (void)memset_s(&stats, sizeof(TrafficStats), 0, sizeof(TrafficStats));
218     if (memcpy_s(&stats.stats, sizeof(stats.stats), info->info.trafficData.stats,
219         sizeof(info->info.trafficData.stats)) != EOK) {
220         TRANS_LOGE(TRANS_STREAM, "RipplePolicy info memcpy fail");
221         return SOFTBUS_MEM_ERR;
222     }
223     std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
224     auto itListener = g_streamSocketMap.find(fd);
225     if (itListener != g_streamSocketMap.end()) {
226         if (itListener->second->streamReceiver_ != nullptr) {
227             TRANS_LOGI(TRANS_STREAM, "OnRippleStats enter");
228             itListener->second->streamReceiver_->OnRippleStats(&stats);
229         } else {
230             TRANS_LOGE(TRANS_STREAM, "OnRippleStats streamReceiver_ is nullptr");
231         }
232     } else {
233         TRANS_LOGE(TRANS_STREAM,
234             "OnRippleStats streamReceiver for the fd is empty in the map. fd=%{public}d", fd);
235     }
236     return SOFTBUS_OK;
237 }
238 
HandleFillpFrameEvt(int fd,const FtEventCbkInfo * info)239 int VtpStreamSocket::HandleFillpFrameEvt(int fd, const FtEventCbkInfo *info)
240 {
241     if (info == nullptr) {
242         TRANS_LOGE(TRANS_STREAM, "fd is %{public}d, info is nullptr", fd);
243         return SOFTBUS_INVALID_PARAM;
244     }
245     std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
246     auto itListener = g_streamSocketMap.find(fd);
247     if (itListener != g_streamSocketMap.end()) {
248         return itListener->second->HandleFillpFrameEvtInner(fd, info);
249     } else {
250         TRANS_LOGE(TRANS_STREAM, "OnFillpFrameEvt for the fd is empty in the map. fd=%{public}d", fd);
251     }
252     return SOFTBUS_OK;
253 }
254 
HandleFillpFrameEvtInner(int fd,const FtEventCbkInfo * info)255 int VtpStreamSocket::HandleFillpFrameEvtInner(int fd, const FtEventCbkInfo *info)
256 {
257     if (onStreamEvtCb_ != nullptr) {
258         TRANS_LOGD(TRANS_STREAM, "onStreamEvtCb_ enter");
259         return HandleVtpFrameEvt(fd, onStreamEvtCb_, info);
260     } else {
261         TRANS_LOGD(TRANS_STREAM, "onStreamEvtCb_ is nullptr");
262     }
263     return SOFTBUS_OK;
264 }
265 
266 #ifdef FILLP_SUPPORT_BW_DET
FillSupportDet(int fd,const FtEventCbkInfo * info,QosTv * metricList)267 void VtpStreamSocket::FillSupportDet(int fd, const FtEventCbkInfo *info, QosTv *metricList)
268 {
269     if (info == nullptr || metricList == nullptr) {
270         TRANS_LOGE(TRANS_STREAM, "info or metricList is nullptr");
271         return;
272     }
273     if (info->evt == FT_EVT_BW_DET) {
274         TRANS_LOGI(TRANS_STREAM,
275             "[Metric Return]: Fillp bandwidth information of socket fd=%{public}d is returned", fd);
276         TRANS_LOGI(TRANS_STREAM,
277             "[Metric Return]: Changed amount of current available bandwidth=%{public}d", info->info.bwInfo.bwStat);
278         TRANS_LOGI(TRANS_STREAM,
279             "[Metric Return]: Current bandwidth for receiving data rate=%{public}d kbps", info->info.bwInfo.rate);
280         metricList->type = BANDWIDTH_ESTIMATE_VALUE;
281         metricList->info.bandwidthInfo.trend = info->info.bwInfo.bwStat;
282         metricList->info.bandwidthInfo.rate = info->info.bwInfo.rate;
283     }
284     if (info->evt == FT_EVT_JITTER_DET) {
285         TRANS_LOGI(TRANS_STREAM,
286             "[Metric Return]: Fillp connection quality information of socket fd=%{public}d is returned", fd);
287         TRANS_LOGI(TRANS_STREAM,
288             "[Metric Return]: Predeicted network condition jitterLevel=%{public}d", info->info.jitterInfo.jitterLevel);
289         TRANS_LOGI(TRANS_STREAM,
290             "[Metric Return]: Current available receiving buffer time=%{public}d ms", info->info.jitterInfo.bufferTime);
291         metricList->type = JITTER_DETECTION_VALUE;
292         metricList->info.jitterInfo.jitterLevel = info->info.jitterInfo.jitterLevel;
293         metricList->info.jitterInfo.bufferTime = info->info.jitterInfo.bufferTime;
294     }
295 }
296 #endif
297 
298 /* This function is used to prompt the metrics returned by FtApiRegEventCallbackFunc() function */
FillpStatistics(int fd,const FtEventCbkInfo * info)299 int VtpStreamSocket::FillpStatistics(int fd, const FtEventCbkInfo *info)
300 {
301     if (info == nullptr || fd < 0) {
302         TRANS_LOGE(TRANS_STREAM, "param invalid fd is %{public}d", fd);
303         return SOFTBUS_INVALID_PARAM;
304     }
305     if (info->evt == FT_EVT_FRAME_STATS) {
306         TRANS_LOGI(TRANS_STREAM, "recv fillp frame stats");
307         return HandleFillpFrameStats(fd, info);
308     } else if (info->evt == FT_EVT_TRAFFIC_DATA) {
309         TRANS_LOGI(TRANS_STREAM, "recv fillp traffic data");
310         return HandleRipplePolicy(fd, info);
311     } else if (IsVtpFrameSentEvt(info)) {
312         TRANS_LOGI(TRANS_STREAM, "fd %{public}d recv fillp frame send evt", fd);
313         return HandleFillpFrameEvt(fd, info);
314     }
315 #ifdef FILLP_SUPPORT_BW_DET
316     if (info->evt == FT_EVT_BW_DET || info->evt == FT_EVT_JITTER_DET) {
317         int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
318         int16_t tvCount = 1;
319         QosTv metricList = {};
320 
321         FillSupportDet(fd, info, &metricList);
322         metricList.info.wifiChannelInfo = {};
323         metricList.info.frameStatusInfo = {};
324 
325         std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
326         auto itLock = g_streamSocketLockMap.find(fd);
327         if (itLock != g_streamSocketLockMap.end()) {
328             std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
329             auto itListener = g_streamSocketMap.find(fd);
330             if (itListener != g_streamSocketMap.end()) {
331                 std::thread([itListener, eventId, tvCount, metricList, &itLock]() {
332                     const std::string threadName = "OS_qosEvent";
333                     pthread_setname_np(pthread_self(), threadName.c_str());
334                     std::lock_guard<std::mutex> guard(itLock->second);
335                     itListener->second->OnQosEvent(eventId, tvCount, &metricList);
336                 }).detach();
337             } else {
338                 TRANS_LOGE(TRANS_STREAM, "StreamReceiver for fd=%{public}d is empty in the map", fd);
339             }
340         } else {
341             TRANS_LOGE(TRANS_STREAM, "StreamSocketLock for fd=%{public}d is empty in the map", fd);
342         }
343     } else {
344         TRANS_LOGE(TRANS_STREAM,
345             "[Metric Return]: Fail to retrieve bandwidth and connection quality information");
346         return -1;
347     }
348 #endif
349     return SOFTBUS_OK;
350 }
351 
FillpAppStatistics()352 void VtpStreamSocket::FillpAppStatistics()
353 {
354     int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
355     int16_t tvCount = 1;
356     QosTv metricList = {};
357     FillpStatisticsPcb fillpPcbStats = {};
358     SoftBusSysTime fillpStatsGetTime = {0};
359 
360     int getStatisticsRet = FtFillpStatsGet(streamFd_, &fillpPcbStats);
361     SoftBusGetTime(&fillpStatsGetTime);
362     if (getStatisticsRet == 0) {
363         metricList.type = STREAM_TRAFFIC_STASTICS;
364         metricList.info.appStatistics.statisticsGotTime = static_cast<uint64_t>((fillpStatsGetTime.sec *
365             MS_PER_SECOND + fillpStatsGetTime.usec / US_PER_MS)); /* ms */
366         metricList.info.appStatistics.periodRecvBits =
367             static_cast<uint64_t>(fillpPcbStats.appFcStastics.periodRecvBits);
368         metricList.info.appStatistics.pktNum = fillpPcbStats.appFcStastics.pktNum;
369         metricList.info.appStatistics.periodRecvPkts = fillpPcbStats.appFcStastics.periodRecvPkts;
370         metricList.info.appStatistics.periodRecvPktLoss = fillpPcbStats.appFcStastics.periodRecvPktLoss;
371         metricList.info.appStatistics.periodRecvRate = fillpPcbStats.appFcStastics.periodRecvRate;
372         metricList.info.appStatistics.periodRecvRateBps = fillpPcbStats.appFcStastics.periodRecvRateBps;
373         metricList.info.appStatistics.periodRtt = fillpPcbStats.appFcStastics.periodRtt;
374         metricList.info.appStatistics.periodRecvPktLossHighPrecision =
375             fillpPcbStats.appFcStastics.periodRecvPktLossHighPrecision;
376         metricList.info.appStatistics.periodSendLostPkts = fillpPcbStats.appFcStastics.periodSendLostPkts;
377         metricList.info.appStatistics.periodSendPkts = fillpPcbStats.appFcStastics.periodSendPkts;
378         metricList.info.appStatistics.periodSendPktLossHighPrecision =
379             fillpPcbStats.appFcStastics.periodSendPktLossHighPrecision;
380         metricList.info.appStatistics.periodSendBits = fillpPcbStats.appFcStastics.periodSendBits;
381         metricList.info.appStatistics.periodSendRateBps = fillpPcbStats.appFcStastics.periodSendRateBps;
382 
383         TRANS_LOGD(TRANS_STREAM,
384             "Succeed to get fillp statistics information for streamfd=%{public}d", streamFd_);
385         TRANS_LOGD(TRANS_STREAM,
386             "[Metric Return]: periodRtt=%{public}d", fillpPcbStats.appFcStastics.periodRtt);
387 
388         std::lock_guard<std::mutex> guard(streamSocketLock_);
389 
390         if (streamReceiver_ != nullptr) {
391             TRANS_LOGD(TRANS_STREAM,
392                 "[Metric Notify]: Fillp traffic statistics information of socket is notified. streamfd=%{public}d",
393                 streamFd_);
394             streamReceiver_->OnQosEvent(eventId, tvCount, &metricList);
395         } else {
396             TRANS_LOGE(TRANS_STREAM, "StreamReceiver for the streamFd is empty. streamFd=%{public}d", streamFd_);
397         }
398     } else {
399         TRANS_LOGE(TRANS_STREAM,
400             "Fail to get fillp statistics information for the streamfd. streamfd=%{public}d, errno=%{public}d",
401             streamFd_, FtGetErrno());
402     }
403 }
404 
CreateClient(IpAndPort & local,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)405 bool VtpStreamSocket::CreateClient(IpAndPort &local, int streamType, std::pair<uint8_t*, uint32_t> sessionKey)
406 {
407     int fd = CreateAndBindSocket(local, false);
408     if (fd == -1) {
409         TRANS_LOGE(TRANS_STREAM, "CreateAndBindSocket failed, errno=%{public}d", FtGetErrno());
410         DestroyStreamSocket();
411         return false;
412     }
413 
414     sessionKey_.second = sessionKey.second;
415     if (sessionKey_.first == nullptr) {
416         sessionKey_.first = new uint8_t[sessionKey_.second];
417     }
418     if (memcpy_s(sessionKey_.first, sessionKey_.second, sessionKey.first, sessionKey.second) != EOK) {
419         TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
420         return false;
421     }
422 
423     streamType_ = streamType;
424     std::lock_guard<std::mutex> guard(streamSocketLock_);
425     streamFd_ = fd;
426     configCv_.notify_all();
427 
428     TRANS_LOGI(TRANS_STREAM,
429         "Success to create a client socket. fd=%{public}d, streamType=%{public}d", fd, streamType);
430     return true;
431 }
432 
CreateClient(IpAndPort & local,const IpAndPort & remote,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)433 bool VtpStreamSocket::CreateClient(IpAndPort &local, const IpAndPort &remote, int streamType,
434     std::pair<uint8_t*, uint32_t> sessionKey)
435 {
436     if (!CreateClient(local, streamType, sessionKey)) {
437         return false;
438     }
439     /* enable the bandwidth and CQE estimation algorithms by FtSetSockOpt() for current ftsocket */
440 #ifdef FILLP_SUPPORT_BW_DET
441     bool isServer = false;
442     EnableBwEstimationAlgo(streamFd_, isServer);
443 #endif
444 
445     bool connectRet = Connect(remote);
446     if (connectRet) {
447         bool isServer = false;
448         RegisterMetricCallback(isServer); /* register the callback function */
449     }
450     return connectRet;
451 }
452 
CreateServer(IpAndPort & local,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)453 bool VtpStreamSocket::CreateServer(IpAndPort &local, int streamType, std::pair<uint8_t*, uint32_t> sessionKey)
454 {
455     TRANS_LOGD(TRANS_STREAM, "enter.");
456     listenFd_ = CreateAndBindSocket(local, true);
457     if (listenFd_ == -1) {
458         TRANS_LOGE(TRANS_STREAM, "create listenFd failed, errno=%{public}d", FtGetErrno());
459         DestroyStreamSocket();
460         return false;
461     }
462 
463     bool ret = FtListen(listenFd_, MAX_CONNECTION_VALUE);
464     if (ret) {
465         TRANS_LOGE(TRANS_STREAM, "FtListen failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
466         DestroyStreamSocket();
467         return false;
468     }
469 
470     epollFd_ = FtEpollCreate();
471     if (epollFd_ < 0) {
472         TRANS_LOGE(TRANS_STREAM, "Failed to create epoll fd errno=%{public}d", FtGetErrno());
473         DestroyStreamSocket();
474         return false;
475     }
476     isStreamRecv_ = true;
477     streamType_ = streamType;
478     sessionKey_.second = sessionKey.second;
479     if (sessionKey_.first == nullptr) {
480         sessionKey_.first = new uint8_t[sessionKey_.second];
481     }
482     if (memcpy_s(sessionKey_.first, sessionKey_.second, sessionKey.first, sessionKey.second) != EOK) {
483         TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
484         return false;
485     }
486 
487     CreateServerProcessThread();
488     TRANS_LOGI(TRANS_STREAM,
489         "CreateServer end, listenFd=%{public}d, epollFd=%{public}d, streamType=%{public}d", listenFd_, epollFd_,
490         streamType_);
491     return true;
492 }
493 
DestroyStreamSocket()494 void VtpStreamSocket::DestroyStreamSocket()
495 {
496     TRANS_LOGD(TRANS_STREAM, "enter.");
497     std::lock_guard<std::mutex> guard(streamSocketLock_);
498     if (isDestroyed_) {
499         TRANS_LOGI(TRANS_STREAM, "StreamSocket is already destroyed");
500         return;
501     }
502     if (listenFd_ != -1) {
503         TRANS_LOGI(TRANS_STREAM, "listenFd_ enter FtClose");
504         FtClose(listenFd_);
505         listenFd_ = -1;
506     }
507 
508     if (streamFd_ != -1) {
509         RemoveStreamSocketLock(streamFd_); /* remove the socket lock from the map */
510         RemoveStreamSocketListener(streamFd_); /* remove the socket listener from the map */
511         TRANS_LOGI(TRANS_STREAM, "streamFd_ enter FtClose");
512         FtClose(streamFd_);
513         streamFd_ = -1;
514     }
515 
516     if (epollFd_ != -1) {
517         TRANS_LOGI(TRANS_STREAM, "epollFd_ enter FtClose");
518         FtClose(epollFd_);
519         epollFd_ = -1;
520     }
521 
522     if (streamReceiver_ != nullptr) {
523         TRANS_LOGI(TRANS_STREAM, "DestroyStreamSocket receiver delete");
524         streamReceiver_->OnStreamStatus(STREAM_CLOSED);
525         streamReceiver_.reset();
526     }
527 
528     QuitStreamBuffer();
529     vtpInstance_->UpdateSocketStreamCount(false);
530     isDestroyed_ = true;
531     TRANS_LOGD(TRANS_STREAM, "ok");
532 }
533 
Connect(const IpAndPort & remote)534 bool VtpStreamSocket::Connect(const IpAndPort &remote)
535 {
536     if (remote.ip.empty()) {
537         TRANS_LOGE(TRANS_STREAM, "remote addr  error, ip is nullptr");
538         DestroyStreamSocket();
539         return false;
540     }
541 
542     TRANS_LOGD(TRANS_STREAM,
543         "Connect to server remotePort=%{public}d", remote.port);
544     remoteIpPort_ = remote;
545 
546     struct sockaddr_in remoteSockAddr;
547     remoteSockAddr.sin_family = AF_INET;
548     remoteSockAddr.sin_port = htons(static_cast<short>(remote.port));
549     remoteSockAddr.sin_addr.s_addr = inet_addr(remote.ip.c_str());
550 
551     int ret = FtConnect(streamFd_, reinterpret_cast<struct sockaddr *>(&remoteSockAddr), sizeof(remoteSockAddr));
552     if (ret != SOFTBUS_OK) {
553         TRANS_LOGE(TRANS_STREAM, "FtConnect failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
554         DestroyStreamSocket();
555         return false;
556     }
557 
558     epollFd_ = FtEpollCreate();
559     if (epollFd_ < 0) {
560         TRANS_LOGE(TRANS_STREAM, "Failed to create epoll fd errno=%{public}d", FtGetErrno());
561         DestroyStreamSocket();
562         return false;
563     }
564 
565     if (SetSocketEpollMode(streamFd_) != ERR_OK) {
566         TRANS_LOGE(TRANS_STREAM, "SetSocketEpollMode failed, streamFd=%{public}d", streamFd_);
567         DestroyStreamSocket();
568         return false;
569     }
570     isStreamRecv_ = true;
571     TRANS_LOGI(TRANS_STREAM, "Success to connect remote, and create a thread to recv data.");
572 
573     CreateClientProcessThread();
574     return true;
575 }
576 
EncryptStreamPacket(std::unique_ptr<IStream> stream,std::unique_ptr<char[]> & data,ssize_t & len)577 bool VtpStreamSocket::EncryptStreamPacket(std::unique_ptr<IStream> stream, std::unique_ptr<char[]> &data, ssize_t &len)
578 {
579     StreamPacketizer packet(streamType_, std::move(stream));
580     auto plainData = packet.PacketizeStream();
581     if (plainData == nullptr) {
582         TRANS_LOGE(TRANS_STREAM, "PacketizeStream failed");
583         return false;
584     }
585     len = packet.GetPacketLen() + GetEncryptOverhead();
586     TRANS_LOGD(TRANS_STREAM, "packetLen=%{public}zd, encryptOverhead=%{public}zd",
587         packet.GetPacketLen(), GetEncryptOverhead());
588     data = std::make_unique<char[]>(len + FRAME_HEADER_LEN);
589     ssize_t encLen = Encrypt(plainData.get(), packet.GetPacketLen(), data.get() + FRAME_HEADER_LEN, len);
590     if (encLen != len) {
591         TRANS_LOGE(TRANS_STREAM, "encrypted failed, dataLen=%{public}zd, encLen=%{public}zd", len, encLen);
592         return false;
593     }
594     InsertBufferLength(len, FRAME_HEADER_LEN, reinterpret_cast<uint8_t *>(data.get()));
595     len += FRAME_HEADER_LEN;
596 
597     return true;
598 }
599 
Send(std::unique_ptr<IStream> stream)600 bool VtpStreamSocket::Send(std::unique_ptr<IStream> stream)
601 {
602     TRANS_LOGD(TRANS_STREAM, "send in... streamType=%{public}d, dataSize=%{public}zd, extSize=%{public}zd",
603         streamType_, stream->GetBufferLen(), stream->GetExtBufferLen());
604 
605     if (!isBlocked_) {
606         isBlocked_ = true;
607         if (!SetNonBlockMode(streamFd_, StreamAttr(false))) {
608             TRANS_LOGE(TRANS_STREAM, "set non block mode fail");
609             return false;
610         }
611     }
612 
613     int32_t ret = -1;
614     std::unique_ptr<char[]> data = nullptr;
615     ssize_t len = 0;
616 
617     const Communication::SoftBus::StreamFrameInfo *streamFrameInfo = stream->GetStreamFrameInfo();
618     if (streamFrameInfo == nullptr) {
619         TRANS_LOGE(TRANS_STREAM, "streamFrameInfo is null");
620         return false;
621     }
622     FrameInfo frameInfo;
623     ConvertStreamFrameInfo2FrameInfo(&frameInfo, streamFrameInfo);
624 
625     if (streamType_ == RAW_STREAM) {
626         data = stream->GetBuffer();
627         len = stream->GetBufferLen();
628 
629         ret = FtSendFrame(streamFd_, data.get(), len, 0, &frameInfo);
630     } else if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
631         if (!EncryptStreamPacket(std::move(stream), data, len)) {
632             return false;
633         }
634         ret = FtSendFrame(streamFd_, data.get(), len, 0, &frameInfo);
635     }
636 
637     if (ret == -1) {
638         TRANS_LOGE(TRANS_STREAM, "send failed, errno=%{public}d", FtGetErrno());
639         return false;
640     }
641 
642     TRANS_LOGD(TRANS_STREAM, "send out..., streamType=%{public}d, len=%{public}zd", streamType_, len);
643     return true;
644 }
645 
SetOption(int type,const StreamAttr & value)646 bool VtpStreamSocket::SetOption(int type, const StreamAttr &value)
647 {
648     PrintOptionInfo(type, value);
649     auto it = optFuncMap_.find(type);
650     if (it == optFuncMap_.end()) {
651         TRANS_LOGW(TRANS_STREAM, "not found type=%{public}d", type);
652         return false;
653     }
654 
655     if (value.GetType() != it->second.valueType) {
656         TRANS_LOGW(TRANS_STREAM,
657             "type=%{public}d, valueType=%{public}d", value.GetType(), it->second.valueType);
658         return false;
659     }
660 
661     MySetFunc set = it->second.set;
662     if (set == nullptr) {
663         TRANS_LOGW(TRANS_STREAM, "set is nullptr");
664         return false;
665     }
666 
667     if (type == NON_BLOCK || type == TOS) {
668         return (this->*set)(static_cast<int>(streamFd_), value);
669     }
670 
671     auto outerIt = FILLP_TYPE_MAP.find(type);
672     if (outerIt != FILLP_TYPE_MAP.end()) {
673         return (this->*set)(outerIt->second, value);
674     }
675 
676     auto innerIt = INNER_FILLP_TYPE_MAP.find(type);
677     if (innerIt != INNER_FILLP_TYPE_MAP.end()) {
678         return (this->*set)(innerIt->second, value);
679     }
680 
681     return (this->*set)(static_cast<int>(type), value);
682 }
683 
GetOption(int type) const684 StreamAttr VtpStreamSocket::GetOption(int type) const
685 {
686     StreamAttr attr {};
687     auto it = optFuncMap_.find(type);
688     if (it != optFuncMap_.end()) {
689         MyGetFunc get = it->second.get;
690         if (get == nullptr) {
691             TRANS_LOGE(TRANS_STREAM, "Can not get option type=%{public}d", type);
692             return std::move(StreamAttr());
693         }
694         if (type == NON_BLOCK) {
695             attr = (this->*get)(static_cast<int>(streamFd_));
696         } else {
697             attr = (this->*get)(static_cast<int>(type));
698         }
699     }
700 
701     PrintOptionInfo(type, attr);
702     return attr;
703 }
704 
SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)705 bool VtpStreamSocket::SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)
706 {
707     if (receiver == nullptr) {
708         TRANS_LOGW(TRANS_STREAM, "receiver is nullptr");
709         return false;
710     }
711 
712     std::lock_guard<std::mutex> guard(streamSocketLock_);
713     streamReceiver_ = receiver;
714     TRANS_LOGI(TRANS_STREAM, "set receiver success");
715     return true;
716 }
717 
InitVtpInstance(const std::string & pkgName)718 bool VtpStreamSocket::InitVtpInstance(const std::string &pkgName)
719 {
720     return vtpInstance_->InitVtp(pkgName);
721 }
722 
DestroyVtpInstance(const std::string & pkgName)723 void VtpStreamSocket::DestroyVtpInstance(const std::string &pkgName)
724 {
725     TRANS_LOGD(TRANS_STREAM, "enter.");
726     vtpInstance_->DestroyVtp(pkgName);
727     TRANS_LOGD(TRANS_STREAM, "ok");
728 }
729 
CreateAndBindSocket(IpAndPort & local,bool isServer)730 int VtpStreamSocket::CreateAndBindSocket(IpAndPort &local, bool isServer)
731 {
732     localIpPort_ = local;
733     vtpInstance_->UpdateSocketStreamCount(true);
734     if (local.ip.empty()) {
735         TRANS_LOGE(TRANS_STREAM, "ip is empty");
736         return -1;
737     }
738 
739     int sockFd = FtSocket(AF_INET, SOCK_STREAM, IPPROTO_FILLP);
740     if (sockFd == -1) {
741         TRANS_LOGE(TRANS_STREAM, "FtSocket failed, errno=%{public}d", FtGetErrno());
742         return -1;
743     }
744     if (!isServer) {
745         TRANS_LOGI(TRANS_STREAM, "FtSocket set client, errno=%{public}d", FtGetErrno());
746         streamFd_ = sockFd;
747     }
748     SetDefaultConfig(sockFd);
749 
750     // bind
751     sockaddr_in localSockAddr = { 0 };
752     char host[ADDR_MAX_SIZE];
753     localSockAddr.sin_family = AF_INET;
754     localSockAddr.sin_port = htons((short)local.port);
755     localSockAddr.sin_addr.s_addr = inet_addr(local.ip.c_str());
756     localIpPort_.ip = SoftBusInetNtoP(AF_INET, &(localSockAddr.sin_addr), host, ADDR_MAX_SIZE);
757     if (!SetSocketBoundInner(sockFd, localIpPort_.ip)) {
758         TRANS_LOGE(TRANS_STREAM, "SetSocketBoundInner failed, errno=%{public}d", FtGetErrno());
759     }
760 
761     socklen_t localAddrLen = sizeof(localSockAddr);
762     int ret = FtBind(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), localAddrLen);
763     if (ret == -1) {
764         FtClose(sockFd);
765         TRANS_LOGE(TRANS_STREAM, "FtBind failed, errno=%{public}d", FtGetErrno());
766         return -1;
767     }
768 
769     // 获取port
770     ret = FtGetSockName(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), &localAddrLen);
771     if (ret != ERR_OK) {
772         TRANS_LOGE(TRANS_STREAM, "getsockname error ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
773         FtClose(sockFd);
774         return -1;
775     }
776 
777     localIpPort_.port = static_cast<int32_t>(ntohs(localSockAddr.sin_port));
778     local.port = localIpPort_.port;
779 
780     return sockFd;
781 }
782 
783 
EnableBwEstimationAlgo(int streamFd,bool isServer) const784 bool VtpStreamSocket::EnableBwEstimationAlgo(int streamFd, bool isServer) const
785 {
786 #ifdef FILLP_SUPPORT_BW_DET
787     int errBwDet;
788     if (isServer) {
789         int32_t enableBwDet = FILLP_BW_DET_RX_ENABLE;
790         errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
791             &enableBwDet, sizeof(enableBwDet));
792     } else {
793         int32_t enableBwDet = FILLP_BW_DET_TX_ENABLE;
794         errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
795             &enableBwDet, sizeof(enableBwDet));
796     }
797     if (errBwDet < 0) {
798         TRANS_LOGE(TRANS_STREAM,
799             "Fail to enable bandwidth estimation algorithm for streamFd=%{public}d, errno%{public}d",
800             streamFd, FtGetErrno());
801         return true;
802     } else {
803         TRANS_LOGE(TRANS_STREAM,
804             "Success to enable bandwidth estimation algorithm for stream=Fd%{public}d", streamFd);
805         return false;
806     }
807 #else
808     return true;
809 #endif
810 }
811 
EnableJitterDetectionAlgo(int streamFd) const812 bool VtpStreamSocket::EnableJitterDetectionAlgo(int streamFd) const
813 {
814 #ifdef FILLP_SUPPORT_CQE
815     int32_t  enableCQE = FILLP_CQE_ENABLE;
816     auto errCQE = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_CQE_ALGO, &enableCQE, sizeof(enableCQE));
817     if (errCQE < 0) {
818         TRANS_LOGE(TRANS_STREAM,
819             "Fail to enable CQE algorithm for streamFd=%{public}d, errno=%{public}d", streamFd, FtGetErrno());
820         return true;
821     } else {
822         TRANS_LOGE(TRANS_STREAM,
823             "Success to enable CQE algorithm for streamFd=%{public}d", streamFd);
824         return false;
825     }
826 #else
827     return true;
828 #endif
829 }
830 
EnableDirectlySend(int streamFd) const831 bool VtpStreamSocket::EnableDirectlySend(int streamFd) const
832 {
833     int32_t enable = 1;
834     FILLP_INT ret = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_DIRECTLY_SEND, &enable, sizeof(enable));
835     if (ret < 0) {
836         TRANS_LOGE(TRANS_STREAM,
837             "Fail to enable direct send for streamFd=%{public}d, rrno=%{public}d", streamFd, FtGetErrno());
838         return false;
839     }
840     TRANS_LOGI(TRANS_STREAM, "Success to enable direct send for streamFd=%{public}d", streamFd);
841     return true;
842 }
843 
EnableSemiReliable(int streamFd) const844 bool VtpStreamSocket::EnableSemiReliable(int streamFd) const
845 {
846     int32_t enable = 1;
847     FILLP_INT ret = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SEMI_RELIABLE, &enable, sizeof(enable));
848     if (ret < 0) {
849         TRANS_LOGE(TRANS_STREAM,
850             "Fail to enable direct send for streamFd=%{public}d, errno=%{public}d", streamFd, FtGetErrno());
851         return false;
852     }
853     TRANS_LOGI(TRANS_STREAM, "Success to enable semi reliable for streamFd=%{public}d", streamFd);
854     return true;
855 }
856 
RegisterMetricCallback(bool isServer)857 void VtpStreamSocket::RegisterMetricCallback(bool isServer)
858 {
859     VtpStreamSocket::AddStreamSocketLock(streamFd_, streamSocketLock_);
860     auto self = this->GetSelf();
861     VtpStreamSocket::AddStreamSocketListener(streamFd_, self);
862     int regStatisticsRet = FtApiRegEventCallbackFunc(FILLP_CONFIG_ALL_SOCKET, FillpStatistics);
863     int value = 1;
864     auto err = FtSetSockOpt(streamFd_, IPPROTO_FILLP, FILLP_SOCK_TRAFFIC, &value, sizeof(value));
865     if (err < 0) {
866         TRANS_LOGE(TRANS_STREAM, "fail to set socket binding to device");
867         return;
868     }
869     TRANS_LOGD(TRANS_STREAM, "FtSetSockOpt start success");
870     if (isServer) {
871         if (regStatisticsRet == 0) {
872             TRANS_LOGI(TRANS_STREAM,
873                 "Success to register the stream callback function at server side. streamFd=%{public}d", streamFd_);
874         } else {
875             TRANS_LOGE(TRANS_STREAM,
876                 "Fail to register the stream callback function at server side. streamFd=%{public}d, errno=%{public}d",
877                 streamFd_, regStatisticsRet);
878         }
879     } else {
880         if (regStatisticsRet == 0) {
881             TRANS_LOGI(TRANS_STREAM,
882                 "Success to register the stream callback function at client side. streamFd=%{public}d", streamFd_);
883         } else {
884             TRANS_LOGE(TRANS_STREAM,
885                 "Fail to register the stream callback function at client side. streamFd=%{public}d, errno=%{public}d",
886                 streamFd_, regStatisticsRet);
887         }
888     }
889 }
890 
Accept()891 bool VtpStreamSocket::Accept()
892 {
893     TRANS_LOGD(TRANS_STREAM, "enter.");
894     auto fd = FtAccept(listenFd_, nullptr, nullptr);
895     TRANS_LOGI(TRANS_STREAM, "accept streamFd=%{public}d", fd);
896     if (fd == -1) {
897         TRANS_LOGE(TRANS_STREAM, "errno=%{public}d", FtGetErrno());
898         return false;
899     }
900 
901     sockaddr remoteAddr {};
902     socklen_t remoteAddrLen = sizeof(remoteAddr);
903     auto ret = FtGetPeerName(fd, &remoteAddr, &remoteAddrLen);
904     if (ret != ERR_OK) {
905         TRANS_LOGE(TRANS_STREAM, "get name failed, fd=%{public}d", fd);
906         FtClose(fd);
907         return false;
908     }
909 
910     char host[ADDR_MAX_SIZE];
911     if (remoteAddr.sa_family == AF_INET) {
912         auto v4Addr = reinterpret_cast<const sockaddr_in *>(&remoteAddr);
913         remoteIpPort_.ip = SoftBusInetNtoP(AF_INET, &(v4Addr->sin_addr), host, ADDR_MAX_SIZE);
914         remoteIpPort_.port = v4Addr->sin_port;
915     } else {
916         auto v6Addr = reinterpret_cast<const sockaddr_in6 *>(&remoteAddr);
917         remoteIpPort_.ip = SoftBusInetNtoP(AF_INET6, &(v6Addr->sin6_addr), host, ADDR_MAX_SIZE);
918         remoteIpPort_.port = v6Addr->sin6_port;
919     }
920 
921     TRANS_LOGD(TRANS_STREAM, "Accept a client remotePort=%{public}d", remoteIpPort_.port);
922 
923     if (SetSocketEpollMode(fd) != ERR_OK) {
924         TRANS_LOGE(TRANS_STREAM, "SetSocketEpollMode failed, fd=%{public}d", fd);
925         FtClose(fd);
926         return false;
927     }
928 
929     std::lock_guard<std::mutex> guard(streamSocketLock_);
930     streamFd_ = fd;
931     configCv_.notify_all();
932 
933     if (streamReceiver_ != nullptr) {
934         TRANS_LOGI(TRANS_STREAM, "notify stream connected!");
935         streamReceiver_->OnStreamStatus(STREAM_CONNECTED);
936     }
937 
938     bool isServer = true;
939     RegisterMetricCallback(isServer); /* register the callback function */
940     /* enable the bandwidth and CQE estimation algorithms for current ftsocket */
941 #ifdef FILLP_SUPPORT_BW_DET
942     EnableBwEstimationAlgo(streamFd_, isServer);
943 #endif
944 #ifdef FILLP_SUPPORT_CQE
945     EnableJitterDetectionAlgo(streamFd_);
946 #endif
947 
948     TRANS_LOGI(TRANS_STREAM, "accept success!");
949     return true;
950 }
951 
EpollTimeout(int fd,int timeout)952 int VtpStreamSocket::EpollTimeout(int fd, int timeout)
953 {
954     struct SpungeEpollEvent events[MAX_EPOLL_NUM];
955     (void)memset_s(events, sizeof(events), 0, sizeof(events));
956     while (true) {
957         FILLP_INT fdNum = FtEpollWait(epollFd_, events, MAX_EPOLL_NUM, timeout);
958         if (fdNum <= 0) {
959             TRANS_LOGE(TRANS_STREAM,
960                 "FtEpollWait failed, ret=%{public}d, errno=%{public}d", fdNum, FtGetErrno());
961             return -FtGetErrno();
962         }
963 
964         for (FILLP_INT i = 0; i < fdNum; i++) {
965             if (events[i].data.fd != fd) {
966                 continue;
967             }
968 
969             if (events[i].events & (SPUNGE_EPOLLHUP | SPUNGE_EPOLLERR)) {
970                 TRANS_LOGE(TRANS_STREAM,
971                     "EpollTimeout, something may be wrong in this socket, fd=%{public}d, events=%{public}u", fd,
972                     (unsigned int)events[i].events);
973                 return -1;
974             }
975 
976             if (events[i].events & SPUNGE_EPOLLIN) {
977                 return SOFTBUS_OK;
978             }
979         }
980     }
981 }
982 
SetSocketEpollMode(int fd)983 int VtpStreamSocket::SetSocketEpollMode(int fd)
984 {
985     if (!SetNonBlockMode(fd, StreamAttr(true))) {
986         TRANS_LOGE(TRANS_STREAM, "SetNonBlockMode failed, errno=%{public}d", FtGetErrno());
987         return -1;
988     }
989 
990     struct SpungeEpollEvent event = {0};
991     event.events = SPUNGE_EPOLLIN;
992     event.data.fd = fd;
993 
994     auto ret = FtEpollCtl(epollFd_, SPUNGE_EPOLL_CTL_ADD, fd, &event);
995     if (ret != ERR_OK) {
996         TRANS_LOGE(TRANS_STREAM, "FtEpollCtl failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
997         return ret;
998     }
999 
1000     TRANS_LOGD(TRANS_STREAM, "SetNonBlockMode success");
1001     return SOFTBUS_OK;
1002 }
1003 
InsertBufferLength(int num,int length,uint8_t * output) const1004 void VtpStreamSocket::InsertBufferLength(int num, int length, uint8_t *output) const
1005 {
1006     for (int i = 0; i < length; i++) {
1007         output[length - 1 - i] = static_cast<unsigned int>(
1008             ((static_cast<unsigned int>(num) >> static_cast<unsigned int>(BYTE_TO_BIT * i))) & INT_TO_BYTE);
1009     }
1010 }
1011 
MakeStreamData(StreamData & data,const StreamFrameInfo & info) const1012 std::unique_ptr<IStream> VtpStreamSocket::MakeStreamData(StreamData &data, const StreamFrameInfo &info) const
1013 {
1014     std::unique_ptr<IStream> stream = nullptr;
1015     switch (streamType_) {
1016         case VIDEO_SLICE_STREAM:
1017             TRANS_LOGD(TRANS_STREAM, "do not support VIDEO_SLICE_STREAM streamType=%{public}d", streamType_);
1018             break;
1019         case COMMON_VIDEO_STREAM:
1020         case COMMON_AUDIO_STREAM:
1021             TRANS_LOGD(TRANS_STREAM,
1022                 "streamType=%{public}d, seqNum=%{public}d, streamId=%{public}d",
1023                 streamType_, info.seqNum, info.streamId);
1024             stream = IStream::MakeCommonStream(data, info);
1025             break;
1026         case RAW_STREAM:
1027             TRANS_LOGD(TRANS_STREAM, "streamType=%{public}d", streamType_);
1028             stream = IStream::MakeRawStream(data, info);
1029             break;
1030         default:
1031             TRANS_LOGE(TRANS_STREAM, "do not support streamType=%{public}d", streamType_);
1032             break;
1033     }
1034     if (stream == nullptr) {
1035         TRANS_LOGE(TRANS_STREAM, "IStream construct error");
1036         return nullptr;
1037     }
1038 
1039     return stream;
1040 }
1041 
RecvStreamLen()1042 int32_t VtpStreamSocket::RecvStreamLen()
1043 {
1044     int32_t hdrSize = FRAME_HEADER_LEN;
1045     if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1046         hdrSize = streamHdrSize_;
1047     }
1048 
1049     int32_t len = -1;
1050     int32_t timeout = -1;
1051     auto buffer = std::make_unique<char[]>(hdrSize);
1052     if (EpollTimeout(streamFd_, timeout) == 0) {
1053         do {
1054             len = FtRecv(streamFd_, buffer.get(), hdrSize, 0);
1055         } while (len <= 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
1056     }
1057     TRANS_LOGD(TRANS_STREAM, "recv frame header, len=%{public}d, scene=%{public}d", len, scene_);
1058 
1059     if (len <= 0) {
1060         TRANS_LOGE(TRANS_STREAM, "len invalid, len=%{public}d", len);
1061         return -1;
1062     }
1063 
1064     if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1065         std::lock_guard<std::mutex> guard(streamSocketLock_);
1066         if (streamReceiver_ != nullptr) {
1067             return streamReceiver_->OnStreamHdrReceived(std::move(buffer), hdrSize);
1068         }
1069     }
1070 
1071     return ntohl(*reinterpret_cast<int *>(buffer.get()));
1072 }
1073 
ProcessCommonDataStream(std::unique_ptr<char[]> & dataBuffer,int32_t & dataLength,std::unique_ptr<char[]> & extBuffer,int32_t & extLen,StreamFrameInfo & info)1074 bool VtpStreamSocket::ProcessCommonDataStream(std::unique_ptr<char[]> &dataBuffer,
1075     int32_t &dataLength, std::unique_ptr<char[]> &extBuffer, int32_t &extLen, StreamFrameInfo &info)
1076 {
1077     TRANS_LOGD(TRANS_STREAM, "recv common stream");
1078     int32_t decryptedLength = dataLength;
1079     auto decryptedBuffer = std::move(dataBuffer);
1080 
1081     int32_t plainDataLength = decryptedLength - GetEncryptOverhead();
1082     if (plainDataLength <= 0) {
1083         TRANS_LOGE(TRANS_STREAM, "Decrypt failed, invalid decryptedLen=%{public}d", decryptedLength);
1084         return false;
1085     }
1086     std::unique_ptr<char[]> plainData = std::make_unique<char[]>(plainDataLength);
1087     ssize_t decLen = Decrypt(decryptedBuffer.get(), decryptedLength, plainData.get(), plainDataLength);
1088     if (decLen != plainDataLength) {
1089         TRANS_LOGE(TRANS_STREAM,
1090             "Decrypt failed, dataLen=%{public}d, decryptedLen=%{public}zd", plainDataLength, decLen);
1091         return false;
1092     }
1093     auto header = plainData.get();
1094     StreamDepacketizer decode(streamType_);
1095     if (plainDataLength < static_cast<int32_t>(sizeof(CommonHeader))) {
1096         TRANS_LOGE(TRANS_STREAM,
1097             "failed, plainDataLen=%{public}d, CommonHeader=%{public}zu", plainDataLength, sizeof(CommonHeader));
1098         return false;
1099     }
1100     decode.DepacketizeHeader(header);
1101 
1102     auto buffer = plainData.get() + sizeof(CommonHeader);
1103     decode.DepacketizeBuffer(buffer, plainDataLength - sizeof(CommonHeader));
1104 
1105     extBuffer = decode.GetUserExt();
1106     extLen = decode.GetUserExtSize();
1107     info = decode.GetFrameInfo();
1108     dataBuffer = decode.GetData();
1109     dataLength = decode.GetDataLength();
1110     if (dataLength <= 0) {
1111         TRANS_LOGE(TRANS_STREAM, "common depacketize error, dataLen=%{public}d", dataLength);
1112         return false;
1113     }
1114     return true;
1115 }
1116 
DoStreamRecv()1117 void VtpStreamSocket::DoStreamRecv()
1118 {
1119     while (isStreamRecv_) {
1120         std::unique_ptr<char[]> dataBuffer = nullptr;
1121         std::unique_ptr<char[]> extBuffer = nullptr;
1122         int32_t extLen = 0;
1123         StreamFrameInfo info = {};
1124         TRANS_LOGD(TRANS_STREAM, "recv stream");
1125         int32_t dataLength = VtpStreamSocket::RecvStreamLen();
1126         if (dataLength <= 0 || dataLength > MAX_STREAM_LEN) {
1127             TRANS_LOGE(TRANS_STREAM, "read frame length error, dataLength=%{public}d", dataLength);
1128             break;
1129         }
1130         TRANS_LOGD(TRANS_STREAM,
1131             "recv a new frame, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1132         dataBuffer = VtpStreamSocket::RecvStream(dataLength);
1133 
1134         if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
1135             if (!ProcessCommonDataStream(dataBuffer, dataLength, extBuffer, extLen, info)) {
1136                 break;
1137             }
1138         }
1139 
1140         StreamData data = { std::move(dataBuffer), dataLength, std::move(extBuffer), extLen };
1141         std::unique_ptr<IStream> stream = MakeStreamData(data, info);
1142         if (stream == nullptr) {
1143             TRANS_LOGE(TRANS_STREAM, "MakeStreamData failed, stream is null");
1144             break;
1145         }
1146 
1147         TRANS_LOGD(TRANS_STREAM,
1148             "recv frame done, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1149 
1150         if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1151             std::lock_guard<std::mutex> guard(streamSocketLock_);
1152             if (streamReceiver_ != nullptr) {
1153                 streamReceiver_->OnStreamReceived(std::move(stream));
1154                 continue;
1155             }
1156         }
1157 
1158         PutStream(std::move(stream));
1159         TRANS_LOGD(TRANS_STREAM, "put frame done, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1160     }
1161     TRANS_LOGI(TRANS_STREAM, "recv thread exit");
1162 }
1163 
RecvStream(int32_t dataLength)1164 std::unique_ptr<char[]> VtpStreamSocket::RecvStream(int32_t dataLength)
1165 {
1166     auto buffer = std::make_unique<char[]>(dataLength);
1167     int32_t recvLen = 0;
1168     while (recvLen < dataLength) {
1169         int32_t ret = -1;
1170         int32_t timeout = -1;
1171 
1172         if (EpollTimeout(streamFd_, timeout) == 0) {
1173             do {
1174                 ret = FtRecv(streamFd_, (buffer.get() + recvLen), dataLength - recvLen, 0);
1175             } while (ret < 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
1176         }
1177 
1178         if (ret == -1) {
1179             TRANS_LOGE(TRANS_STREAM, "read frame failed, errno=%{public}d", FtGetErrno());
1180             return nullptr;
1181         }
1182 
1183         recvLen += ret;
1184     }
1185     return std::unique_ptr<char[]>(buffer.release());
1186 }
1187 
SetDefaultConfig(int fd)1188 void VtpStreamSocket::SetDefaultConfig(int fd)
1189 {
1190     if (!SetIpTos(fd, StreamAttr(static_cast<int>(IPTOS_LOWDELAY)))) {
1191         TRANS_LOGW(TRANS_STREAM, "SetIpTos failed");
1192     }
1193     // Set Fillp direct sending
1194     if (!EnableDirectlySend(fd)) {
1195         TRANS_LOGW(TRANS_STREAM, "EnableDirectlySend failed");
1196     }
1197 
1198     if (!EnableSemiReliable(fd)) {
1199         TRANS_LOGW(TRANS_STREAM, "EnableSemiReliable failed");
1200     }
1201     // Set Fillp Differentiated Transmission
1202     FILLP_BOOL enable = 1;
1203     if (!FtConfigSet(FT_CONF_APP_DIFFER_TRANSMIT, &enable, &fd)) {
1204         TRANS_LOGW(TRANS_STREAM, "Set differ transmit failed");
1205     }
1206 
1207     if (!SetOption(RECV_BUF_SIZE, StreamAttr(static_cast<int>(DEFAULT_UDP_BUFFER_RCV_SIZE)))) {
1208         TRANS_LOGW(TRANS_STREAM, "Set recv buff failed");
1209     }
1210 
1211     if (!SetOption(SEND_BUF_SIZE, StreamAttr(static_cast<int>(DEFAULT_UDP_BUFFER_SIZE)))) {
1212         TRANS_LOGW(TRANS_STREAM, "Set send buff failed");
1213     }
1214 
1215     if (!SetOption(InnerStreamOptionType::RECV_CACHE, StreamAttr(static_cast<int>(FILLP_VTP_RECV_CACHE_SIZE)))) {
1216         TRANS_LOGW(TRANS_STREAM, "Set recv cache failed");
1217     }
1218 
1219     if (!SetOption(InnerStreamOptionType::SEND_CACHE, StreamAttr(static_cast<int>(FILLP_VTP_SEND_CACHE_SIZE)))) {
1220         TRANS_LOGW(TRANS_STREAM, "Set send cache failed");
1221     }
1222 }
1223 
SetIpTos(int fd,const StreamAttr & tos)1224 bool VtpStreamSocket::SetIpTos(int fd, const StreamAttr &tos)
1225 {
1226     auto tmp = tos.GetIntValue();
1227     if (FtSetSockOpt(fd, IPPROTO_IP, IP_TOS, &tmp, sizeof(tmp)) != ERR_OK) {
1228         TRANS_LOGE(TRANS_STREAM, "SetIpTos wrong! fd=%{public}d, errno=%{public}d", fd, FtGetErrno());
1229         return false;
1230     }
1231 
1232     TRANS_LOGD(TRANS_STREAM, "Success to set ip tos: fd=%{public}d, tos=%{public}d", fd, tmp);
1233     return true;
1234 }
1235 
GetIpTos(int type) const1236 StreamAttr VtpStreamSocket::GetIpTos(int type) const
1237 {
1238     static_cast<void>(type);
1239     int tos;
1240     int size = sizeof(tos);
1241 
1242     if (FtGetSockOpt(streamFd_, IPPROTO_IP, IP_TOS, &tos, &size) != ERR_OK) {
1243         TRANS_LOGE(TRANS_STREAM, "FtGetSockOpt errno=%{public}d", FtGetErrno());
1244         return std::move(StreamAttr());
1245     }
1246 
1247     return std::move(StreamAttr(tos));
1248 }
1249 
GetStreamSocketFd(int type) const1250 StreamAttr VtpStreamSocket::GetStreamSocketFd(int type) const
1251 {
1252     static_cast<void>(type);
1253     return std::move(StreamAttr(streamFd_));
1254 }
1255 
GetListenSocketFd(int type) const1256 StreamAttr VtpStreamSocket::GetListenSocketFd(int type) const
1257 {
1258     static_cast<void>(type);
1259     return std::move(StreamAttr(listenFd_));
1260 }
1261 
SetSocketBoundInner(int fd,std::string ip) const1262 bool VtpStreamSocket::SetSocketBoundInner(int fd, std::string ip) const
1263 {
1264     auto boundIp = (ip.empty()) ? localIpPort_.ip : ip;
1265     struct ifaddrs *ifList = nullptr;
1266     if (getifaddrs(&ifList) < 0) {
1267         TRANS_LOGE(TRANS_STREAM,
1268             "get interface address return errno=%{public}d, strerror=%{public}s", errno, strerror(errno));
1269         return false;
1270     }
1271 
1272     struct ifaddrs *ifa = nullptr;
1273     for (ifa = ifList; ifa != nullptr; ifa = ifa->ifa_next) {
1274         if (ifa->ifa_addr == nullptr) {
1275             continue;
1276         }
1277         if (ifa->ifa_addr->sa_family != AF_INET) {
1278             continue;
1279         }
1280 
1281         char host[ADDR_MAX_SIZE];
1282         std::string devName(ifa->ifa_name);
1283         if (strcmp(boundIp.c_str(), SoftBusInetNtoP(AF_INET, &(((struct sockaddr_in *)ifa->ifa_addr)->sin_addr),
1284             host, ADDR_MAX_SIZE)) == 0) {
1285             TRANS_LOGI(TRANS_STREAM, "current use interface to bind to socket. ifName=%{public}s", ifa->ifa_name);
1286             auto err = FtSetSockOpt(fd, SOL_SOCKET, SO_BINDTODEVICE, devName.c_str(), devName.size());
1287             if (err < 0) {
1288                 TRANS_LOGE(TRANS_STREAM, "fail to set socket binding to device");
1289                 freeifaddrs(ifList);
1290                 return false;
1291             }
1292             break;
1293         }
1294     }
1295     freeifaddrs(ifList);
1296 
1297     return true;
1298 }
1299 
SetSocketBindToDevices(int type,const StreamAttr & ip)1300 bool VtpStreamSocket::SetSocketBindToDevices(int type, const StreamAttr &ip)
1301 {
1302     static_cast<void>(type);
1303     auto tmp = ip.GetStrValue();
1304     auto boundIp = (tmp.empty()) ? localIpPort_.ip : tmp;
1305     return SetSocketBoundInner(streamFd_, boundIp);
1306 }
1307 
SetVtpStackConfigDelayed(int type,const StreamAttr & value)1308 bool VtpStreamSocket::SetVtpStackConfigDelayed(int type, const StreamAttr &value)
1309 {
1310     std::unique_lock<std::mutex> lock(streamSocketLock_);
1311     if (streamFd_ == -1) {
1312         configCv_.wait(lock, [this] { return streamFd_ != -1; });
1313     }
1314     TRANS_LOGD(TRANS_STREAM, "set vtp stack config, streamFd=%{public}d", streamFd_);
1315     return SetVtpStackConfig(type, value);
1316 }
1317 
SetVtpStackConfig(int type,const StreamAttr & value)1318 bool VtpStreamSocket::SetVtpStackConfig(int type, const StreamAttr &value)
1319 {
1320     if (streamFd_ == -1) {
1321         TRANS_LOGI(TRANS_STREAM, "set vtp stack config when streamFd is legal, type=%{public}d", type);
1322         auto self = GetSelf();
1323         std::thread([self, type, value]() {
1324             const std::string threadName = "OS_setVtpCfg";
1325             pthread_setname_np(pthread_self(), threadName.c_str());
1326             self->SetVtpStackConfigDelayed(type, value);
1327             }).detach();
1328         return true;
1329     }
1330 
1331     if (value.GetType() == INT_TYPE) {
1332         int intVal = value.GetIntValue();
1333         int ret = FtConfigSet(type, &intVal, &streamFd_);
1334         if (ret != SOFTBUS_OK) {
1335             TRANS_LOGE(TRANS_STREAM,
1336                 "FtConfigSet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1337             return false;
1338         }
1339 
1340         TRANS_LOGI(TRANS_STREAM,
1341             "setVtpConfig success, type=%{public}d, streamFd=%{public}d, value=%{public}d", type, streamFd_, intVal);
1342         return true;
1343     }
1344 
1345     if (value.GetType() == BOOL_TYPE) {
1346         bool flag = value.GetBoolValue();
1347         int ret = FtConfigSet(type, &flag, &streamFd_);
1348         if (ret != SOFTBUS_OK) {
1349             TRANS_LOGE(TRANS_STREAM,
1350                 "FtConfigSet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1351             return false;
1352         }
1353 
1354         TRANS_LOGI(TRANS_STREAM,
1355             "setVtpConfig success, streamFd=%{public}d, flag=%{public}d, type=%{public}d", type, streamFd_, flag);
1356         return true;
1357     }
1358 
1359     TRANS_LOGE(TRANS_STREAM, "UNKNOWN TYPE!");
1360     return false;
1361 }
1362 
GetVtpStackConfig(int type) const1363 StreamAttr VtpStreamSocket::GetVtpStackConfig(int type) const
1364 {
1365     int intVal = -1;
1366     int configFd = (streamFd_ == -1) ? FILLP_CONFIG_ALL_SOCKET : streamFd_;
1367     int ret = FtConfigGet(type, &intVal, &configFd);
1368     if (ret != SOFTBUS_OK) {
1369         TRANS_LOGE(TRANS_STREAM,
1370             "FtConfigGet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1371         return std::move(StreamAttr());
1372     }
1373 
1374     int valType = ValueType::UNKNOWN;
1375     for (auto it = FILLP_TYPE_MAP.begin(); it != FILLP_TYPE_MAP.end(); it++) {
1376         if (it->second != type) {
1377             continue;
1378         }
1379 
1380         valType = optFuncMap_.at(it->first).valueType;
1381         break;
1382     }
1383 
1384     if (valType != ValueType::UNKNOWN) {
1385         for (auto it = INNER_FILLP_TYPE_MAP.begin(); it != INNER_FILLP_TYPE_MAP.end(); it++) {
1386             if (it->second != type) {
1387                 continue;
1388             }
1389 
1390             valType = optFuncMap_.at(it->first).valueType;
1391             break;
1392         }
1393     }
1394 
1395     if (valType == BOOL_TYPE) {
1396         return std::move(StreamAttr(!!intVal));
1397     }
1398 
1399     return std::move(StreamAttr(intVal));
1400 }
1401 
SetNonBlockMode(int fd,const StreamAttr & value)1402 bool VtpStreamSocket::SetNonBlockMode(int fd, const StreamAttr &value)
1403 {
1404     FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1405     if (flags < 0) {
1406         TRANS_LOGE(TRANS_STREAM, "failed to get FtFcntl, flags=%{public}d", flags);
1407         flags = 0;
1408     }
1409     bool nonBlock = value.GetBoolValue();
1410 
1411     flags = nonBlock ? static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) | O_NONBLOCK)) :
1412         static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) & ~O_NONBLOCK));
1413 
1414     FILLP_INT res = FtFcntl(fd, F_SETFL, flags);
1415     if (res < 0) {
1416         TRANS_LOGE(TRANS_STREAM, "failed to set FtFcntl, res=%{public}d", res);
1417         return false;
1418     }
1419 
1420     TRANS_LOGI(TRANS_STREAM, "Successfully to set fd=%{public}d, nonBlock=%{public}d", fd, nonBlock);
1421     return true;
1422 }
1423 
GetNonBlockMode(int fd) const1424 StreamAttr VtpStreamSocket::GetNonBlockMode(int fd) const
1425 {
1426     FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1427     if (static_cast<unsigned int>(flags) & O_NONBLOCK) {
1428         return std::move(StreamAttr(true));
1429     }
1430 
1431     return std::move(StreamAttr(false));
1432 }
1433 
GetIp(int type) const1434 StreamAttr VtpStreamSocket::GetIp(int type) const
1435 {
1436     if (type == LOCAL_IP) {
1437         return std::move(StreamAttr(localIpPort_.ip));
1438     }
1439 
1440     return std::move(StreamAttr(remoteIpPort_.ip));
1441 }
1442 
GetPort(int type) const1443 StreamAttr VtpStreamSocket::GetPort(int type) const
1444 {
1445     if (type == LOCAL_PORT) {
1446         return std::move(StreamAttr(localIpPort_.port));
1447     }
1448     return std::move(StreamAttr(remoteIpPort_.port));
1449 }
1450 
SetStreamType(int type,const StreamAttr & value)1451 bool VtpStreamSocket::SetStreamType(int type, const StreamAttr &value)
1452 {
1453     if (type != STREAM_TYPE_INT) {
1454         return false;
1455     }
1456 
1457     streamType_ = value.GetIntValue();
1458     return true;
1459 }
1460 
GetStreamType(int type) const1461 StreamAttr VtpStreamSocket::GetStreamType(int type) const
1462 {
1463     if (type != STREAM_TYPE_INT) {
1464         return std::move(StreamAttr());
1465     }
1466 
1467     return std::move(StreamAttr(streamType_));
1468 }
1469 
SetStreamScene(int type,const StreamAttr & value)1470 bool VtpStreamSocket::SetStreamScene(int type, const StreamAttr &value)
1471 {
1472     static_cast<void>(type);
1473     if (value.GetType() != INT_TYPE) {
1474         TRANS_LOGE(TRANS_STREAM, "value.GetType=%{public}d", value.GetType());
1475         return false;
1476     }
1477     scene_ = value.GetIntValue();
1478     TRANS_LOGI(TRANS_STREAM, "Set scene=%{public}d", scene_);
1479     return true;
1480 }
1481 
SetStreamHeaderSize(int type,const StreamAttr & value)1482 bool VtpStreamSocket::SetStreamHeaderSize(int type, const StreamAttr &value)
1483 {
1484     static_cast<void>(type);
1485     if (value.GetType() != INT_TYPE) {
1486         TRANS_LOGE(TRANS_STREAM, "value.GetType=%{public}d", value.GetType());
1487         return false;
1488     }
1489     streamHdrSize_ = value.GetIntValue();
1490     TRANS_LOGI(TRANS_STREAM, "Set headerSize=%{public}d", streamHdrSize_);
1491     return true;
1492 }
1493 
NotifyStreamListener()1494 void VtpStreamSocket::NotifyStreamListener()
1495 {
1496     while (isStreamRecv_) {
1497         int streamNum = GetStreamNum();
1498         if (streamNum >= STREAM_BUFFER_THRESHOLD) {
1499             TRANS_LOGW(TRANS_STREAM, "Too many data in receiver, streamNum=%{public}d", streamNum);
1500         }
1501 
1502         auto stream = TakeStream();
1503         if (stream == nullptr) {
1504             TRANS_LOGE(TRANS_STREAM, "Pop stream failed");
1505             break;
1506         }
1507 
1508         std::lock_guard<std::mutex> guard(streamSocketLock_);
1509         if (streamReceiver_ != nullptr) {
1510             TRANS_LOGD(TRANS_STREAM, "notify listener");
1511             streamReceiver_->OnStreamReceived(std::move(stream));
1512             TRANS_LOGD(TRANS_STREAM, "notify listener done.");
1513         }
1514     }
1515     TRANS_LOGI(TRANS_STREAM, "notify thread exit");
1516 }
1517 
GetEncryptOverhead() const1518 ssize_t VtpStreamSocket::GetEncryptOverhead() const
1519 {
1520     return OVERHEAD_LEN;
1521 }
1522 
Encrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1523 ssize_t VtpStreamSocket::Encrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1524 {
1525     if (in == nullptr || out == nullptr) {
1526         TRANS_LOGE(TRANS_STREAM, "param invalid");
1527         return SOFTBUS_INVALID_PARAM;
1528     }
1529     AesGcmCipherKey cipherKey = {0};
1530 
1531     if (inLen - OVERHEAD_LEN > outLen) {
1532         TRANS_LOGE(TRANS_STREAM, "Encrypt invalid para.");
1533         return SOFTBUS_INVALID_PARAM;
1534     }
1535 
1536     cipherKey.keyLen = SESSION_KEY_LENGTH;
1537     if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.first, sessionKey_.second) != EOK) {
1538         TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
1539         return SOFTBUS_MEM_ERR;
1540     }
1541 
1542     int ret = SoftBusEncryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1543     (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1544     if (ret != SOFTBUS_OK || outLen != inLen + OVERHEAD_LEN) {
1545         TRANS_LOGE(TRANS_STREAM, "Encrypt Data fail. ret=%{public}d", ret);
1546         return SOFTBUS_ENCRYPT_ERR;
1547     }
1548     return outLen;
1549 }
1550 
Decrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1551 ssize_t VtpStreamSocket::Decrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1552 {
1553     if (in == nullptr || out == nullptr) {
1554         TRANS_LOGE(TRANS_STREAM, "param invalid");
1555         return SOFTBUS_INVALID_PARAM;
1556     }
1557     AesGcmCipherKey cipherKey = {0};
1558 
1559     if (inLen - OVERHEAD_LEN > outLen) {
1560         TRANS_LOGE(TRANS_STREAM, "Decrypt invalid para.");
1561         return SOFTBUS_INVALID_PARAM;
1562     }
1563 
1564     cipherKey.keyLen = SESSION_KEY_LENGTH; // 256 bit encryption
1565     if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.first, sessionKey_.second) != EOK) {
1566         TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
1567         return SOFTBUS_MEM_ERR;
1568     }
1569     int ret = SoftBusDecryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1570     (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1571     if (ret != SOFTBUS_OK) {
1572         TRANS_LOGE(TRANS_STREAM, "Decrypt Data fail. ret=%{public}d ", ret);
1573         return SOFTBUS_DECRYPT_ERR;
1574     }
1575 
1576     return outLen;
1577 }
1578 
SetMultiLayer(const void * para)1579 int32_t VtpStreamSocket::SetMultiLayer(const void *para)
1580 {
1581     int fd = GetStreamSocketFd(FD).GetIntValue();
1582     return VtpSetSocketMultiLayer(fd, &onStreamEvtCb_, para);
1583 }
1584 
CreateServerProcessThread()1585 void VtpStreamSocket::CreateServerProcessThread()
1586 {
1587     auto self = this->GetSelf();
1588     std::thread([self]() {
1589         const std::string threadName = "OS_sntfStmLsn";
1590         pthread_setname_np(pthread_self(), threadName.c_str());
1591         self->NotifyStreamListener();
1592         }).detach();
1593 
1594     std::thread([self]() {
1595         const std::string threadName = "OS_sdstyStmSkt";
1596         pthread_setname_np(pthread_self(), threadName.c_str());
1597         if (!self->Accept()) {
1598             self->DestroyStreamSocket();
1599             return;
1600         }
1601         self->DoStreamRecv();
1602         self->DestroyStreamSocket();
1603         }).detach();
1604 
1605     bool &isDestroyed = isDestroyed_;
1606     std::thread([self, &isDestroyed]() {
1607         const std::string threadName = "OS_sfillStatic";
1608         pthread_setname_np(pthread_self(), threadName.c_str());
1609         while (!isDestroyed) {
1610             self->FillpAppStatistics();
1611             std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
1612         }
1613         }).detach();
1614 }
1615 
CreateClientProcessThread()1616 void VtpStreamSocket::CreateClientProcessThread()
1617 {
1618     auto self = this->GetSelf();
1619     std::thread([self]() {
1620         const std::string threadName = "OS_cntfStmLsn";
1621         pthread_setname_np(pthread_self(), threadName.c_str());
1622         self->NotifyStreamListener();
1623         }).detach();
1624 
1625     std::thread([self]() {
1626         const std::string threadName = "OS_cdstyStmSkt";
1627         pthread_setname_np(pthread_self(), threadName.c_str());
1628         self->DoStreamRecv();
1629         self->DestroyStreamSocket();
1630         }).detach();
1631 
1632     bool &isDestroyed = isDestroyed_;
1633     std::thread([self, &isDestroyed]() {
1634         const std::string threadName = "OS_cfillStatic";
1635         pthread_setname_np(pthread_self(), threadName.c_str());
1636         while (!isDestroyed) {
1637             self->FillpAppStatistics();
1638             std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
1639         }
1640         }).detach();
1641 }
1642 } // namespace SoftBus
1643 } // namespace Communication
1644