• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "vtp_stream_socket.h"
17 
18 #include <ifaddrs.h>
19 #include <thread>
20 
21 #include "fillpinc.h"
22 #include "g_enhance_sdk_func.h"
23 #include "raw_stream_data.h"
24 #include "session.h"
25 #include "legacy/softbus_hisysevt_transreporter.h"
26 #include "softbus_adapter_crypto.h"
27 #include "softbus_adapter_socket.h"
28 #include "stream_depacketizer.h"
29 #include "stream_packetizer.h"
30 #include "trans_event.h"
31 
32 namespace Communication {
33 namespace SoftBus {
34 bool g_logOn = false;
35 const int32_t FEED_BACK_PERIOD = 1;  /* feedback period of fillp stream traffic statistics is 1s */
36 const int32_t MS_PER_SECOND = 1000;
37 const int32_t US_PER_MS = 1000;
38 static constexpr int32_t MAX_PORT = 65535;
39 
40 namespace {
PrintOptionInfo(int type,const StreamAttr & value)41 void PrintOptionInfo(int type, const StreamAttr &value)
42 {
43     switch (value.GetType()) {
44         case INT_TYPE:
45             TRANS_LOGI(TRANS_STREAM,
46                 "Int option: type=%{public}d, value=%{public}d", type, value.GetIntValue());
47             break;
48         case BOOL_TYPE:
49             TRANS_LOGI(TRANS_STREAM,
50                 "Bool option: type=%{public}d, value=%{public}d", type, value.GetBoolValue());
51             break;
52         case STRING_TYPE:
53             TRANS_LOGD(TRANS_STREAM,
54                 "String option: type=%{public}d, value=%{public}s", type, value.GetStrValue().c_str());
55             break;
56         default:
57             TRANS_LOGE(TRANS_STREAM, "Wrong StreamAttr!");
58             (void)type;
59     }
60 }
61 } // namespace
62 std::shared_ptr<VtpInstance> VtpStreamSocket::vtpInstance_ = VtpInstance::GetVtpInstance();
63 
64 std::map<int, std::mutex &> VtpStreamSocket::g_streamSocketLockMap;
65 std::mutex VtpStreamSocket::g_streamSocketLockMapLock_;
66 std::map<int, std::shared_ptr<VtpStreamSocket>> VtpStreamSocket::g_streamSocketMap;
67 std::mutex VtpStreamSocket::g_streamSocketMapLock_;
68 
ConvertStreamFrameInfo2FrameInfo(FrameInfo * frameInfo,const Communication::SoftBus::StreamFrameInfo * streamFrameInfo)69 static inline void ConvertStreamFrameInfo2FrameInfo(FrameInfo* frameInfo,
70     const Communication::SoftBus::StreamFrameInfo* streamFrameInfo)
71 {
72     frameInfo->frameType = (FILLP_INT)(streamFrameInfo->frameType);
73     frameInfo->seqNum = (FILLP_INT)(streamFrameInfo->seqNum);
74     frameInfo->subSeqNum = (FILLP_INT)(streamFrameInfo->seqSubNum);
75     frameInfo->level = (FILLP_INT)(streamFrameInfo->level);
76     frameInfo->timestamp = (FILLP_SLONG)streamFrameInfo->timeStamp;
77     frameInfo->bitMap = (FILLP_UINT32)streamFrameInfo->bitMap;
78 }
79 
AddStreamSocketLock(int fd,std::mutex & streamsocketlock)80 void VtpStreamSocket::AddStreamSocketLock(int fd, std::mutex &streamsocketlock)
81 {
82     std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
83     if (!g_streamSocketLockMap.empty() && g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
84         TRANS_LOGE(TRANS_STREAM, "streamsocketlock for the fd already exists. fd=%{public}d", fd);
85         return;
86     }
87 
88     g_streamSocketLockMap.emplace(std::pair<int, std::mutex &>(fd, streamsocketlock));
89 }
90 
AddStreamSocketListener(int fd,std::shared_ptr<VtpStreamSocket> streamreceiver)91 void VtpStreamSocket::AddStreamSocketListener(int fd, std::shared_ptr<VtpStreamSocket> streamreceiver)
92 {
93     std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
94     if (!g_streamSocketMap.empty() && g_streamSocketMap.find(fd) != g_streamSocketMap.end()) {
95         TRANS_LOGE(TRANS_STREAM, "streamreceiver for the fd already exists. fd=%{public}d", fd);
96         return;
97     }
98 
99     g_streamSocketMap.emplace(std::pair<int, std::shared_ptr<VtpStreamSocket>>(fd, streamreceiver));
100 }
101 
RemoveStreamSocketLock(int fd)102 void VtpStreamSocket::RemoveStreamSocketLock(int fd)
103 {
104     std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
105     if (g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
106         g_streamSocketLockMap.erase(fd);
107         TRANS_LOGI(TRANS_STREAM, "Remove streamsocketlock for the fd success. fd=%{public}d", fd);
108     } else {
109         TRANS_LOGE(TRANS_STREAM,
110             "Streamsocketlock for the fd not exist in the map. fd=%{public}d", fd);
111     }
112 }
113 
RemoveStreamSocketListener(int fd)114 void VtpStreamSocket::RemoveStreamSocketListener(int fd)
115 {
116     std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
117     if (g_streamSocketMap.find(fd) != g_streamSocketMap.end()) {
118         g_streamSocketMap.erase(fd);
119         TRANS_LOGI(TRANS_STREAM, "Remove streamreceiver for the fd success. fd=%{public}d", fd);
120     } else {
121         TRANS_LOGE(TRANS_STREAM, "Streamreceiver for the fd not exist in the map. fd=%{public}d", fd);
122     }
123 }
124 
InsertElementToFuncMap(int type,ValueType valueType,MySetFunc set,MyGetFunc get)125 void VtpStreamSocket::InsertElementToFuncMap(int type, ValueType valueType, MySetFunc set, MyGetFunc get)
126 {
127     OptionFunc fun = {
128         valueType, set, get
129     };
130     optFuncMap_.insert(std::pair<int, OptionFunc>(type, fun));
131 }
132 
VtpStreamSocket()133 VtpStreamSocket::VtpStreamSocket()
134 {
135     InsertElementToFuncMap(TOS, INT_TYPE, &VtpStreamSocket::SetIpTos, &VtpStreamSocket::GetIpTos);
136     InsertElementToFuncMap(FD, INT_TYPE, nullptr, &VtpStreamSocket::GetStreamSocketFd);
137     InsertElementToFuncMap(SERVER_FD, INT_TYPE, nullptr, &VtpStreamSocket::GetListenSocketFd);
138     InsertElementToFuncMap(LOCAL_IP, INT_TYPE, nullptr, &VtpStreamSocket::GetIp);
139     InsertElementToFuncMap(LOCAL_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
140     InsertElementToFuncMap(REMOTE_IP, STRING_TYPE, nullptr, &VtpStreamSocket::GetIp);
141     InsertElementToFuncMap(REMOTE_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
142     InsertElementToFuncMap(BOUND_INTERFACE_IP, STRING_TYPE, &VtpStreamSocket::SetSocketBindToDevices, nullptr);
143     InsertElementToFuncMap(IP_TYPE, STRING_TYPE, nullptr, &VtpStreamSocket::GetIpType);
144     InsertElementToFuncMap(REMOTE_SCOPE_ID, INT_TYPE, nullptr, &VtpStreamSocket::GetRemoteScopeId);
145     InsertElementToFuncMap(NON_BLOCK, BOOL_TYPE, &VtpStreamSocket::SetNonBlockMode, &VtpStreamSocket::GetNonBlockMode);
146     InsertElementToFuncMap(KEEP_ALIVE_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig, nullptr);
147     InsertElementToFuncMap(SEND_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
148         &VtpStreamSocket::GetVtpStackConfig);
149     InsertElementToFuncMap(RECV_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
150         &VtpStreamSocket::GetVtpStackConfig);
151     InsertElementToFuncMap(SEND_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
152         &VtpStreamSocket::GetVtpStackConfig);
153     InsertElementToFuncMap(RECV_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
154         &VtpStreamSocket::GetVtpStackConfig);
155     InsertElementToFuncMap(PACKET_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
156         &VtpStreamSocket::GetVtpStackConfig);
157     InsertElementToFuncMap(MAX_VTP_SOCKET_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
158         &VtpStreamSocket::GetVtpStackConfig);
159     InsertElementToFuncMap(MAX_VTP_CONNECT_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
160         &VtpStreamSocket::GetVtpStackConfig);
161     InsertElementToFuncMap(REDUNANCY_SWITCH, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
162         &VtpStreamSocket::GetVtpStackConfig);
163     InsertElementToFuncMap(REDUNANCY_LEVEL, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
164         &VtpStreamSocket::GetVtpStackConfig);
165     InsertElementToFuncMap(NACK_DELAY, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
166         &VtpStreamSocket::GetVtpStackConfig);
167     InsertElementToFuncMap(NACK_DELAY_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
168         &VtpStreamSocket::GetVtpStackConfig);
169     InsertElementToFuncMap(PACK_INTERVAL_ENLARGE, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
170         &VtpStreamSocket::GetVtpStackConfig);
171     InsertElementToFuncMap(STREAM_TYPE_INT, INT_TYPE, &VtpStreamSocket::SetStreamType, &VtpStreamSocket::GetStreamType);
172     InsertElementToFuncMap(IS_SERVER, INT_TYPE, nullptr, &VtpStreamSocket::IsServer);
173     InsertElementToFuncMap(SCENE, INT_TYPE, &VtpStreamSocket::SetStreamScene, nullptr);
174     InsertElementToFuncMap(STREAM_HEADER_SIZE, INT_TYPE, &VtpStreamSocket::SetStreamHeaderSize, nullptr);
175 
176     scene_ = UNKNOWN_SCENE;
177 }
178 
~VtpStreamSocket()179 VtpStreamSocket::~VtpStreamSocket()
180 {
181     TRANS_LOGW(TRANS_STREAM, "~VtpStreamSocket");
182 }
183 
GetSelf()184 std::shared_ptr<VtpStreamSocket> VtpStreamSocket::GetSelf()
185 {
186     return shared_from_this();
187 }
188 
HandleFillpFrameStats(int fd,const FtEventCbkInfo * info)189 int VtpStreamSocket::HandleFillpFrameStats(int fd, const FtEventCbkInfo *info)
190 {
191     if (info == nullptr) {
192         TRANS_LOGE(TRANS_STREAM, "stats info is nullptr");
193         return SOFTBUS_INVALID_PARAM;
194     }
195     StreamSendStats stats = {};
196     if (memcpy_s(&stats, sizeof(StreamSendStats), &info->info.frameSendStats,
197         sizeof(info->info.frameSendStats)) != EOK) {
198         TRANS_LOGE(TRANS_STREAM, "streamStats info memcpy fail");
199         return SOFTBUS_MEM_ERR;
200     }
201 
202     std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
203     auto itListener = g_streamSocketMap.find(fd);
204     if (itListener != g_streamSocketMap.end()) {
205         if (itListener->second->streamReceiver_ != nullptr) {
206             TRANS_LOGD(TRANS_STREAM, "OnFrameStats enter");
207             itListener->second->streamReceiver_->OnFrameStats(&stats);
208         } else {
209             TRANS_LOGE(TRANS_STREAM, "streamReceiver_ is nullptr");
210         }
211     } else {
212         TRANS_LOGE(TRANS_STREAM, "StreamReceiver for the fd is empty in the map. fd=%{public}d", fd);
213     }
214     return SOFTBUS_OK;
215 }
216 
HandleRipplePolicy(int fd,const FtEventCbkInfo * info)217 int VtpStreamSocket::HandleRipplePolicy(int fd, const FtEventCbkInfo *info)
218 {
219     if (info == nullptr) {
220         TRANS_LOGE(TRANS_STREAM, "policy info is nullptr");
221         return SOFTBUS_INVALID_PARAM;
222     }
223     TrafficStats stats;
224     (void)memset_s(&stats, sizeof(TrafficStats), 0, sizeof(TrafficStats));
225     if (memcpy_s(&stats.stats, sizeof(stats.stats), info->info.trafficData.stats,
226         sizeof(info->info.trafficData.stats)) != EOK) {
227         TRANS_LOGE(TRANS_STREAM, "RipplePolicy info memcpy fail");
228         return SOFTBUS_MEM_ERR;
229     }
230     std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
231     auto itListener = g_streamSocketMap.find(fd);
232     if (itListener != g_streamSocketMap.end()) {
233         if (itListener->second->streamReceiver_ != nullptr) {
234             TRANS_LOGI(TRANS_STREAM, "OnRippleStats enter");
235             itListener->second->streamReceiver_->OnRippleStats(&stats);
236         } else {
237             TRANS_LOGE(TRANS_STREAM, "OnRippleStats streamReceiver_ is nullptr");
238         }
239     } else {
240         TRANS_LOGE(TRANS_STREAM,
241             "OnRippleStats streamReceiver for the fd is empty in the map. fd=%{public}d", fd);
242     }
243     return SOFTBUS_OK;
244 }
245 
HandleFillpFrameEvt(int fd,const FtEventCbkInfo * info)246 int VtpStreamSocket::HandleFillpFrameEvt(int fd, const FtEventCbkInfo *info)
247 {
248     if (info == nullptr) {
249         TRANS_LOGE(TRANS_STREAM, "fd is %{public}d, info is nullptr", fd);
250         return SOFTBUS_INVALID_PARAM;
251     }
252     std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
253     auto itListener = g_streamSocketMap.find(fd);
254     if (itListener != g_streamSocketMap.end()) {
255         return itListener->second->HandleFillpFrameEvtInner(fd, info);
256     } else {
257         TRANS_LOGE(TRANS_STREAM, "OnFillpFrameEvt for the fd is empty in the map. fd=%{public}d", fd);
258     }
259     return SOFTBUS_OK;
260 }
261 
HandleVtpFrameEvtPacked(int fd,OnFrameEvt cb,const FtEventCbkInfo * info)262 static int HandleVtpFrameEvtPacked(int fd, OnFrameEvt cb, const FtEventCbkInfo *info)
263 {
264     ClientEnhanceFuncList *pfnClientEnhanceFuncList = ClientEnhanceFuncListGet();
265     if (pfnClientEnhanceFuncList->handleVtpFrameEvt == nullptr) {
266         TRANS_LOGE(TRANS_STREAM, "func handleVtpFrameEvt is not registered");
267         return SOFTBUS_FUNC_NOT_SUPPORT;
268     }
269     return pfnClientEnhanceFuncList->handleVtpFrameEvt(fd, cb, info);
270 }
271 
HandleFillpFrameEvtInner(int fd,const FtEventCbkInfo * info)272 int VtpStreamSocket::HandleFillpFrameEvtInner(int fd, const FtEventCbkInfo *info)
273 {
274     if (onStreamEvtCb_ != nullptr) {
275         TRANS_LOGD(TRANS_STREAM, "onStreamEvtCb_ enter");
276         return HandleVtpFrameEvtPacked(fd, onStreamEvtCb_, info);
277     } else {
278         TRANS_LOGD(TRANS_STREAM, "onStreamEvtCb_ is nullptr");
279     }
280     return SOFTBUS_OK;
281 }
282 
283 #ifdef FILLP_SUPPORT_BW_DET
FillSupportDet(int fd,const FtEventCbkInfo * info,QosTv * metricList)284 void VtpStreamSocket::FillSupportDet(int fd, const FtEventCbkInfo *info, QosTv *metricList)
285 {
286     if (info == nullptr || metricList == nullptr) {
287         TRANS_LOGE(TRANS_STREAM, "info or metricList is nullptr");
288         return;
289     }
290     if (info->evt == FT_EVT_BW_DET) {
291         TRANS_LOGI(TRANS_STREAM,
292             "[Metric Return]: Fillp bandwidth information of socket fd=%{public}d is returned", fd);
293         TRANS_LOGI(TRANS_STREAM,
294             "[Metric Return]: Changed amount of current available bandwidth=%{public}d", info->info.bwInfo.bwStat);
295         TRANS_LOGI(TRANS_STREAM,
296             "[Metric Return]: Current bandwidth for receiving data rate=%{public}d kbps", info->info.bwInfo.rate);
297         metricList->type = BANDWIDTH_ESTIMATE_VALUE;
298         metricList->info.bandwidthInfo.trend = info->info.bwInfo.bwStat;
299         metricList->info.bandwidthInfo.rate = info->info.bwInfo.rate;
300     }
301     if (info->evt == FT_EVT_JITTER_DET) {
302         TRANS_LOGI(TRANS_STREAM,
303             "[Metric Return]: Fillp connection quality information of socket fd=%{public}d is returned", fd);
304         TRANS_LOGI(TRANS_STREAM,
305             "[Metric Return]: Predeicted network condition jitterLevel=%{public}d", info->info.jitterInfo.jitterLevel);
306         TRANS_LOGI(TRANS_STREAM,
307             "[Metric Return]: Current available receiving buffer time=%{public}d ms", info->info.jitterInfo.bufferTime);
308         metricList->type = JITTER_DETECTION_VALUE;
309         metricList->info.jitterInfo.jitterLevel = info->info.jitterInfo.jitterLevel;
310         metricList->info.jitterInfo.bufferTime = info->info.jitterInfo.bufferTime;
311     }
312 }
313 #endif
314 
IsVtpFrameSentEvtPacked(const FtEventCbkInfo * info)315 static bool IsVtpFrameSentEvtPacked(const FtEventCbkInfo *info)
316 {
317     ClientEnhanceFuncList *pfnClientEnhanceFuncList = ClientEnhanceFuncListGet();
318     if (pfnClientEnhanceFuncList->isVtpFrameSentEvt == nullptr) {
319         TRANS_LOGE(TRANS_STREAM, "Func isVtpFrameSentEvt is not registered");
320         return false;
321     }
322     return pfnClientEnhanceFuncList->isVtpFrameSentEvt(info);
323 }
324 
325 /* This function is used to prompt the metrics returned by FtApiRegEventCallbackFunc() function */
FillpStatistics(int fd,const FtEventCbkInfo * info)326 int VtpStreamSocket::FillpStatistics(int fd, const FtEventCbkInfo *info)
327 {
328     if (info == nullptr || fd < 0) {
329         TRANS_LOGE(TRANS_STREAM, "param invalid fd is %{public}d", fd);
330         return SOFTBUS_INVALID_PARAM;
331     }
332     if (info->evt == FT_EVT_FRAME_STATS) {
333         TRANS_LOGI(TRANS_STREAM, "recv fillp frame stats");
334         return HandleFillpFrameStats(fd, info);
335     } else if (info->evt == FT_EVT_TRAFFIC_DATA) {
336         TRANS_LOGI(TRANS_STREAM, "recv fillp traffic data");
337         return HandleRipplePolicy(fd, info);
338     } else if (IsVtpFrameSentEvtPacked(info)) {
339         TRANS_LOGI(TRANS_STREAM, "fd %{public}d recv fillp frame send evt", fd);
340         return HandleFillpFrameEvt(fd, info);
341     }
342 #ifdef FILLP_SUPPORT_BW_DET
343     if (info->evt == FT_EVT_BW_DET || info->evt == FT_EVT_JITTER_DET) {
344         int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
345         int16_t tvCount = 1;
346         QosTv metricList = {};
347 
348         FillSupportDet(fd, info, &metricList);
349         metricList.info.wifiChannelInfo = {};
350         metricList.info.frameStatusInfo = {};
351 
352         std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
353         auto itLock = g_streamSocketLockMap.find(fd);
354         if (itLock != g_streamSocketLockMap.end()) {
355             std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
356             auto itListener = g_streamSocketMap.find(fd);
357             if (itListener != g_streamSocketMap.end()) {
358                 if (itListener->second->OnQosEvent == nullptr) {
359                     TRANS_LOGE(TRANS_STREAM, "OnQosEvent is empty");
360                     return SOFTBUS_INVALID_PARAM;
361                 }
362                 std::thread([itListener, eventId, tvCount, metricList, &itLock]() {
363                     const std::string threadName = "OS_qosEvent";
364                     pthread_setname_np(pthread_self(), threadName.c_str());
365                     std::lock_guard<std::mutex> guard(itLock->second);
366                     itListener->second->OnQosEvent(eventId, tvCount, &metricList);
367                 }).detach();
368             } else {
369                 TRANS_LOGE(TRANS_STREAM, "StreamReceiver for fd=%{public}d is empty in the map", fd);
370             }
371         } else {
372             TRANS_LOGE(TRANS_STREAM, "StreamSocketLock for fd=%{public}d is empty in the map", fd);
373         }
374     } else {
375         TRANS_LOGE(TRANS_STREAM,
376             "[Metric Return]: Fail to retrieve bandwidth and connection quality information");
377         return -1;
378     }
379 #endif
380     return SOFTBUS_OK;
381 }
382 
FillpAppStatistics()383 void VtpStreamSocket::FillpAppStatistics()
384 {
385     int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
386     int16_t tvCount = 1;
387     QosTv metricList = {};
388     FillpStatisticsPcb fillpPcbStats = {};
389     SoftBusSysTime fillpStatsGetTime = {0};
390 
391     int getStatisticsRet = FtFillpStatsGet(streamFd_, &fillpPcbStats);
392     SoftBusGetTime(&fillpStatsGetTime);
393     if (getStatisticsRet == 0) {
394         metricList.type = STREAM_TRAFFIC_STASTICS;
395         metricList.info.appStatistics.statisticsGotTime = static_cast<uint64_t>((fillpStatsGetTime.sec *
396             MS_PER_SECOND + fillpStatsGetTime.usec / US_PER_MS)); /* ms */
397         metricList.info.appStatistics.periodRecvBits =
398             static_cast<uint64_t>(fillpPcbStats.appFcStastics.periodRecvBits);
399         metricList.info.appStatistics.pktNum = fillpPcbStats.appFcStastics.pktNum;
400         metricList.info.appStatistics.periodRecvPkts = fillpPcbStats.appFcStastics.periodRecvPkts;
401         metricList.info.appStatistics.periodRecvPktLoss = fillpPcbStats.appFcStastics.periodRecvPktLoss;
402         metricList.info.appStatistics.periodRecvRate = fillpPcbStats.appFcStastics.periodRecvRate;
403         metricList.info.appStatistics.periodRecvRateBps = fillpPcbStats.appFcStastics.periodRecvRateBps;
404         metricList.info.appStatistics.periodRtt = fillpPcbStats.appFcStastics.periodRtt;
405         metricList.info.appStatistics.periodRecvPktLossHighPrecision =
406             fillpPcbStats.appFcStastics.periodRecvPktLossHighPrecision;
407         metricList.info.appStatistics.periodSendLostPkts = fillpPcbStats.appFcStastics.periodSendLostPkts;
408         metricList.info.appStatistics.periodSendPkts = fillpPcbStats.appFcStastics.periodSendPkts;
409         metricList.info.appStatistics.periodSendPktLossHighPrecision =
410             fillpPcbStats.appFcStastics.periodSendPktLossHighPrecision;
411         metricList.info.appStatistics.periodSendBits = fillpPcbStats.appFcStastics.periodSendBits;
412         metricList.info.appStatistics.periodSendRateBps = fillpPcbStats.appFcStastics.periodSendRateBps;
413 
414         TRANS_LOGD(TRANS_STREAM,
415             "Succeed to get fillp statistics information for streamfd=%{public}d", streamFd_);
416         TRANS_LOGD(TRANS_STREAM,
417             "[Metric Return]: periodRtt=%{public}d", fillpPcbStats.appFcStastics.periodRtt);
418 
419         std::lock_guard<std::mutex> guard(streamSocketLock_);
420 
421         if (streamReceiver_ != nullptr) {
422             TRANS_LOGD(TRANS_STREAM,
423                 "[Metric Notify]: Fillp traffic statistics information of socket is notified. streamfd=%{public}d",
424                 streamFd_);
425             streamReceiver_->OnQosEvent(eventId, tvCount, &metricList);
426         } else {
427             TRANS_LOGE(TRANS_STREAM, "StreamReceiver for the streamFd is empty. streamFd=%{public}d", streamFd_);
428         }
429     } else {
430         TRANS_LOGE(TRANS_STREAM,
431             "Fail to get fillp statistics information for the streamfd. streamfd=%{public}d, errno=%{public}d",
432             streamFd_, FtGetErrno());
433     }
434 }
435 
CreateClient(IpAndPort & local,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)436 bool VtpStreamSocket::CreateClient(IpAndPort &local, int streamType, std::pair<uint8_t*, uint32_t> sessionKey)
437 {
438     int fd = CreateAndBindSocket(local, false);
439     if (fd == -1) {
440         TRANS_LOGE(TRANS_STREAM, "CreateAndBindSocket failed, errno=%{public}d", FtGetErrno());
441         DestroyStreamSocket();
442         return false;
443     }
444 
445     sessionKey_.second = sessionKey.second;
446     if (sessionKey_.first == nullptr) {
447         sessionKey_.first = new uint8_t[sessionKey_.second];
448     }
449     if (memcpy_s(sessionKey_.first, sessionKey_.second, sessionKey.first, sessionKey.second) != EOK) {
450         TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
451         return false;
452     }
453 
454     streamType_ = streamType;
455     std::lock_guard<std::mutex> guard(streamSocketLock_);
456     streamFd_ = fd;
457     configCv_.notify_all();
458 
459     TRANS_LOGI(TRANS_STREAM,
460         "Success to create a client socket. fd=%{public}d, streamType=%{public}d", fd, streamType);
461     return true;
462 }
463 
CreateClient(IpAndPort & local,const IpAndPort & remote,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)464 bool VtpStreamSocket::CreateClient(IpAndPort &local, const IpAndPort &remote, int streamType,
465     std::pair<uint8_t*, uint32_t> sessionKey)
466 {
467     if (!CreateClient(local, streamType, sessionKey)) {
468         return false;
469     }
470     /* enable the bandwidth and CQE estimation algorithms by FtSetSockOpt() for current ftsocket */
471 #ifdef FILLP_SUPPORT_BW_DET
472     bool isServer = false;
473     EnableBwEstimationAlgo(streamFd_, isServer);
474 #endif
475 
476     bool connectRet = Connect(remote);
477     if (connectRet) {
478         bool isServer = false;
479         RegisterMetricCallback(isServer); /* register the callback function */
480     }
481     return connectRet;
482 }
483 
CreateServer(IpAndPort & local,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)484 bool VtpStreamSocket::CreateServer(IpAndPort &local, int streamType, std::pair<uint8_t*, uint32_t> sessionKey)
485 {
486     TRANS_LOGD(TRANS_STREAM, "enter.");
487     listenFd_ = CreateAndBindSocket(local, true);
488     if (listenFd_ == -1) {
489         TRANS_LOGE(TRANS_STREAM, "create listenFd failed, errno=%{public}d", FtGetErrno());
490         DestroyStreamSocket();
491         return false;
492     }
493 
494     bool ret = FtListen(listenFd_, MAX_CONNECTION_VALUE);
495     if (ret) {
496         TRANS_LOGE(TRANS_STREAM, "FtListen failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
497         DestroyStreamSocket();
498         return false;
499     }
500 
501     epollFd_ = FtEpollCreate();
502     if (epollFd_ < 0) {
503         TRANS_LOGE(TRANS_STREAM, "Failed to create epoll fd errno=%{public}d", FtGetErrno());
504         DestroyStreamSocket();
505         return false;
506     }
507     isStreamRecv_ = true;
508     streamType_ = streamType;
509     sessionKey_.second = sessionKey.second;
510     if (sessionKey_.first == nullptr) {
511         sessionKey_.first = new uint8_t[sessionKey_.second];
512     }
513     if (memcpy_s(sessionKey_.first, sessionKey_.second, sessionKey.first, sessionKey.second) != EOK) {
514         TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
515         return false;
516     }
517 
518     CreateServerProcessThread();
519     TRANS_LOGI(TRANS_STREAM,
520         "CreateServer end, listenFd=%{public}d, epollFd=%{public}d, streamType=%{public}d", listenFd_, epollFd_,
521         streamType_);
522     return true;
523 }
524 
DestroyStreamSocket()525 void VtpStreamSocket::DestroyStreamSocket()
526 {
527     TRANS_LOGD(TRANS_STREAM, "enter.");
528     std::lock_guard<std::mutex> guard(streamSocketLock_);
529     if (isDestroyed_) {
530         TRANS_LOGI(TRANS_STREAM, "StreamSocket is already destroyed");
531         return;
532     }
533     if (listenFd_ != -1) {
534         TRANS_LOGI(TRANS_STREAM, "listenFd_ enter FtClose");
535         FtClose(listenFd_);
536         listenFd_ = -1;
537     }
538 
539     if (streamFd_ != -1) {
540         RemoveStreamSocketLock(streamFd_); /* remove the socket lock from the map */
541         RemoveStreamSocketListener(streamFd_); /* remove the socket listener from the map */
542         TRANS_LOGI(TRANS_STREAM, "streamFd_ enter FtClose");
543         FtClose(streamFd_);
544         streamFd_ = -1;
545     }
546 
547     if (epollFd_ != -1) {
548         TRANS_LOGI(TRANS_STREAM, "epollFd_ enter FtClose");
549         FtClose(epollFd_);
550         epollFd_ = -1;
551     }
552 
553     if (streamReceiver_ != nullptr) {
554         TRANS_LOGI(TRANS_STREAM, "DestroyStreamSocket receiver delete");
555         streamReceiver_->OnStreamStatus(STREAM_CLOSED);
556         streamReceiver_.reset();
557     }
558 
559     QuitStreamBuffer();
560     vtpInstance_->UpdateSocketStreamCount(false);
561     isDestroyed_ = true;
562     TRANS_LOGD(TRANS_STREAM, "ok");
563 }
564 
Connect(const IpAndPort & remote)565 bool VtpStreamSocket::Connect(const IpAndPort &remote)
566 {
567     if (remote.ip.empty()) {
568         TRANS_LOGE(TRANS_STREAM, "remote addr  error, ip is nullptr");
569         DestroyStreamSocket();
570         return false;
571     }
572 
573     TRANS_LOGD(TRANS_STREAM,
574         "Connect to server remotePort=%{public}d", remote.port);
575     remoteIpPort_ = remote;
576 
577     struct sockaddr_in remoteSockAddr;
578     remoteSockAddr.sin_family = AF_INET;
579     remoteSockAddr.sin_port = htons(static_cast<short>(remote.port));
580     remoteSockAddr.sin_addr.s_addr = inet_addr(remote.ip.c_str());
581 
582     int ret = FtConnect(streamFd_, reinterpret_cast<struct sockaddr *>(&remoteSockAddr), sizeof(remoteSockAddr));
583     if (ret != SOFTBUS_OK) {
584         TRANS_LOGE(TRANS_STREAM, "FtConnect failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
585         DestroyStreamSocket();
586         return false;
587     }
588 
589     epollFd_ = FtEpollCreate();
590     if (epollFd_ < 0) {
591         TRANS_LOGE(TRANS_STREAM, "Failed to create epoll fd errno=%{public}d", FtGetErrno());
592         DestroyStreamSocket();
593         return false;
594     }
595 
596     if (SetSocketEpollMode(streamFd_) != ERR_OK) {
597         TRANS_LOGE(TRANS_STREAM, "SetSocketEpollMode failed, streamFd=%{public}d", streamFd_);
598         DestroyStreamSocket();
599         return false;
600     }
601     isStreamRecv_ = true;
602     TRANS_LOGI(TRANS_STREAM, "Success to connect remote, and create a thread to recv data.");
603 
604     CreateClientProcessThread();
605     return true;
606 }
607 
EncryptStreamPacket(std::unique_ptr<IStream> stream,std::unique_ptr<char[]> & data,ssize_t & len)608 bool VtpStreamSocket::EncryptStreamPacket(std::unique_ptr<IStream> stream, std::unique_ptr<char[]> &data, ssize_t &len)
609 {
610     StreamPacketizer packet(streamType_, std::move(stream));
611     auto plainData = packet.PacketizeStream();
612     if (plainData == nullptr) {
613         TRANS_LOGE(TRANS_STREAM, "PacketizeStream failed");
614         return false;
615     }
616     len = packet.GetPacketLen() + GetEncryptOverhead();
617     TRANS_LOGD(TRANS_STREAM, "packetLen=%{public}zd, encryptOverhead=%{public}zd",
618         packet.GetPacketLen(), GetEncryptOverhead());
619     data = std::make_unique<char[]>(len + FRAME_HEADER_LEN);
620     ssize_t encLen = Encrypt(plainData.get(), packet.GetPacketLen(), data.get() + FRAME_HEADER_LEN, len);
621     if (encLen != len) {
622         TRANS_LOGE(TRANS_STREAM, "encrypted failed, dataLen=%{public}zd, encLen=%{public}zd", len, encLen);
623         return false;
624     }
625     InsertBufferLength(len, FRAME_HEADER_LEN, reinterpret_cast<uint8_t *>(data.get()));
626     len += FRAME_HEADER_LEN;
627 
628     return true;
629 }
630 
Send(std::unique_ptr<IStream> stream)631 bool VtpStreamSocket::Send(std::unique_ptr<IStream> stream)
632 {
633     TRANS_LOGD(TRANS_STREAM, "send in... streamType=%{public}d, dataSize=%{public}zd, extSize=%{public}zd",
634         streamType_, stream->GetBufferLen(), stream->GetExtBufferLen());
635 
636     if (!isBlocked_) {
637         isBlocked_ = true;
638         if (!SetNonBlockMode(streamFd_, StreamAttr(false))) {
639             TRANS_LOGE(TRANS_STREAM, "set non block mode fail");
640             return false;
641         }
642     }
643 
644     int32_t ret = -1;
645     std::unique_ptr<char[]> data = nullptr;
646     ssize_t len = 0;
647 
648     const Communication::SoftBus::StreamFrameInfo *streamFrameInfo = stream->GetStreamFrameInfo();
649     if (streamFrameInfo == nullptr) {
650         TRANS_LOGE(TRANS_STREAM, "streamFrameInfo is null");
651         return false;
652     }
653     FrameInfo frameInfo;
654     ConvertStreamFrameInfo2FrameInfo(&frameInfo, streamFrameInfo);
655 
656     if (streamType_ == RAW_STREAM) {
657         data = stream->GetBuffer();
658         len = stream->GetBufferLen();
659 
660         ret = FtSendFrame(streamFd_, data.get(), len, 0, &frameInfo);
661     } else if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
662         if (!EncryptStreamPacket(std::move(stream), data, len)) {
663             return false;
664         }
665         ret = FtSendFrame(streamFd_, data.get(), len, 0, &frameInfo);
666     }
667 
668     if (ret == -1) {
669         TRANS_LOGE(TRANS_STREAM, "send failed, errno=%{public}d", FtGetErrno());
670         return false;
671     }
672 
673     TRANS_LOGD(TRANS_STREAM, "send out..., streamType=%{public}d, len=%{public}zd", streamType_, len);
674     return true;
675 }
676 
SetOption(int type,const StreamAttr & value)677 bool VtpStreamSocket::SetOption(int type, const StreamAttr &value)
678 {
679     PrintOptionInfo(type, value);
680     auto it = optFuncMap_.find(type);
681     if (it == optFuncMap_.end()) {
682         TRANS_LOGW(TRANS_STREAM, "not found type=%{public}d", type);
683         return false;
684     }
685 
686     if (value.GetType() != it->second.valueType) {
687         TRANS_LOGW(TRANS_STREAM,
688             "type=%{public}d, valueType=%{public}d", value.GetType(), it->second.valueType);
689         return false;
690     }
691 
692     MySetFunc set = it->second.set;
693     if (set == nullptr) {
694         TRANS_LOGW(TRANS_STREAM, "set is nullptr");
695         return false;
696     }
697 
698     if (type == NON_BLOCK || type == TOS) {
699         return (this->*set)(static_cast<int>(streamFd_), value);
700     }
701 
702     auto outerIt = FILLP_TYPE_MAP.find(type);
703     if (outerIt != FILLP_TYPE_MAP.end()) {
704         return (this->*set)(outerIt->second, value);
705     }
706 
707     auto innerIt = INNER_FILLP_TYPE_MAP.find(type);
708     if (innerIt != INNER_FILLP_TYPE_MAP.end()) {
709         return (this->*set)(innerIt->second, value);
710     }
711 
712     return (this->*set)(static_cast<int>(type), value);
713 }
714 
GetOption(int type) const715 StreamAttr VtpStreamSocket::GetOption(int type) const
716 {
717     StreamAttr attr {};
718     auto it = optFuncMap_.find(type);
719     if (it != optFuncMap_.end()) {
720         MyGetFunc get = it->second.get;
721         if (get == nullptr) {
722             TRANS_LOGE(TRANS_STREAM, "Can not get option type=%{public}d", type);
723             return std::move(StreamAttr());
724         }
725         if (type == NON_BLOCK) {
726             attr = (this->*get)(static_cast<int>(streamFd_));
727         } else {
728             attr = (this->*get)(static_cast<int>(type));
729         }
730     }
731 
732     PrintOptionInfo(type, attr);
733     return attr;
734 }
735 
SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)736 bool VtpStreamSocket::SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)
737 {
738     if (receiver == nullptr) {
739         TRANS_LOGW(TRANS_STREAM, "receiver is nullptr");
740         return false;
741     }
742 
743     std::lock_guard<std::mutex> guard(streamSocketLock_);
744     streamReceiver_ = receiver;
745     TRANS_LOGI(TRANS_STREAM, "set receiver success");
746     return true;
747 }
748 
InitVtpInstance(const std::string & pkgName)749 bool VtpStreamSocket::InitVtpInstance(const std::string &pkgName)
750 {
751     return vtpInstance_->InitVtp(pkgName);
752 }
753 
DestroyVtpInstance(const std::string & pkgName)754 void VtpStreamSocket::DestroyVtpInstance(const std::string &pkgName)
755 {
756     TRANS_LOGD(TRANS_STREAM, "enter.");
757     vtpInstance_->DestroyVtp(pkgName);
758     TRANS_LOGD(TRANS_STREAM, "ok");
759 }
760 
CreateAndBindSocket(IpAndPort & local,bool isServer)761 int VtpStreamSocket::CreateAndBindSocket(IpAndPort &local, bool isServer)
762 {
763     localIpPort_ = local;
764     vtpInstance_->UpdateSocketStreamCount(true);
765     if (local.ip.empty()) {
766         TRANS_LOGE(TRANS_STREAM, "ip is empty");
767         return -1;
768     }
769 
770     int sockFd = FtSocket(AF_INET, SOCK_STREAM, IPPROTO_FILLP);
771     if (sockFd == -1) {
772         TRANS_LOGE(TRANS_STREAM, "FtSocket failed, errno=%{public}d", FtGetErrno());
773         return -1;
774     }
775     if (!isServer) {
776         TRANS_LOGI(TRANS_STREAM, "FtSocket set client, errno=%{public}d", FtGetErrno());
777         streamFd_ = sockFd;
778     }
779     SetDefaultConfig(sockFd);
780     if (local.port < 0 || local.port > MAX_PORT) {
781         return -1;
782     }
783 
784     // bind
785     sockaddr_in localSockAddr = { 0 };
786     char host[ADDR_MAX_SIZE] = { 0 };
787     localSockAddr.sin_family = AF_INET;
788     localSockAddr.sin_port = htons((short)local.port);
789     localSockAddr.sin_addr.s_addr = inet_addr(local.ip.c_str());
790     localIpPort_.ip = SoftBusInetNtoP(AF_INET, &(localSockAddr.sin_addr), host, ADDR_MAX_SIZE);
791     if (!SetSocketBoundInner(sockFd, localIpPort_.ip)) {
792         TRANS_LOGE(TRANS_STREAM, "SetSocketBoundInner failed, errno=%{public}d", FtGetErrno());
793     }
794 
795     socklen_t localAddrLen = sizeof(localSockAddr);
796     int ret = FtBind(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), localAddrLen);
797     if (ret == -1) {
798         FtClose(sockFd);
799         TRANS_LOGE(TRANS_STREAM, "FtBind failed, errno=%{public}d", FtGetErrno());
800         return -1;
801     }
802 
803     // 获取port
804     ret = FtGetSockName(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), &localAddrLen);
805     if (ret != ERR_OK) {
806         TRANS_LOGE(TRANS_STREAM, "getsockname error ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
807         FtClose(sockFd);
808         return -1;
809     }
810 
811     localIpPort_.port = static_cast<int32_t>(ntohs(localSockAddr.sin_port));
812     local.port = localIpPort_.port;
813 
814     return sockFd;
815 }
816 
817 
EnableBwEstimationAlgo(int streamFd,bool isServer) const818 bool VtpStreamSocket::EnableBwEstimationAlgo(int streamFd, bool isServer) const
819 {
820 #ifdef FILLP_SUPPORT_BW_DET
821     int errBwDet;
822     if (isServer) {
823         int32_t enableBwDet = FILLP_BW_DET_RX_ENABLE;
824         errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
825             &enableBwDet, sizeof(enableBwDet));
826     } else {
827         int32_t enableBwDet = FILLP_BW_DET_TX_ENABLE;
828         errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
829             &enableBwDet, sizeof(enableBwDet));
830     }
831     if (errBwDet < 0) {
832         TRANS_LOGE(TRANS_STREAM,
833             "Fail to enable bandwidth estimation algorithm for streamFd=%{public}d, errno%{public}d",
834             streamFd, FtGetErrno());
835         return true;
836     } else {
837         TRANS_LOGE(TRANS_STREAM,
838             "Success to enable bandwidth estimation algorithm for stream=Fd%{public}d", streamFd);
839         return false;
840     }
841 #else
842     return true;
843 #endif
844 }
845 
EnableJitterDetectionAlgo(int streamFd) const846 bool VtpStreamSocket::EnableJitterDetectionAlgo(int streamFd) const
847 {
848 #ifdef FILLP_SUPPORT_CQE
849     int32_t  enableCQE = FILLP_CQE_ENABLE;
850     auto errCQE = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_CQE_ALGO, &enableCQE, sizeof(enableCQE));
851     if (errCQE < 0) {
852         TRANS_LOGE(TRANS_STREAM,
853             "Fail to enable CQE algorithm for streamFd=%{public}d, errno=%{public}d", streamFd, FtGetErrno());
854         return true;
855     } else {
856         TRANS_LOGE(TRANS_STREAM,
857             "Success to enable CQE algorithm for streamFd=%{public}d", streamFd);
858         return false;
859     }
860 #else
861     return true;
862 #endif
863 }
864 
EnableDirectlySend(int streamFd) const865 bool VtpStreamSocket::EnableDirectlySend(int streamFd) const
866 {
867     int32_t enable = 1;
868     FILLP_INT ret = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_DIRECTLY_SEND, &enable, sizeof(enable));
869     if (ret < 0) {
870         TRANS_LOGE(TRANS_STREAM,
871             "Fail to enable direct send for streamFd=%{public}d, rrno=%{public}d", streamFd, FtGetErrno());
872         return false;
873     }
874     TRANS_LOGI(TRANS_STREAM, "Success to enable direct send for streamFd=%{public}d", streamFd);
875     return true;
876 }
877 
EnableSemiReliable(int streamFd) const878 bool VtpStreamSocket::EnableSemiReliable(int streamFd) const
879 {
880     int32_t enable = 1;
881     FILLP_INT ret = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SEMI_RELIABLE, &enable, sizeof(enable));
882     if (ret < 0) {
883         TRANS_LOGE(TRANS_STREAM,
884             "Fail to enable direct send for streamFd=%{public}d, errno=%{public}d", streamFd, FtGetErrno());
885         return false;
886     }
887     TRANS_LOGI(TRANS_STREAM, "Success to enable semi reliable for streamFd=%{public}d", streamFd);
888     return true;
889 }
890 
RegisterMetricCallback(bool isServer)891 void VtpStreamSocket::RegisterMetricCallback(bool isServer)
892 {
893     VtpStreamSocket::AddStreamSocketLock(streamFd_, streamSocketLock_);
894     auto self = this->GetSelf();
895     VtpStreamSocket::AddStreamSocketListener(streamFd_, self);
896     int regStatisticsRet = FtApiRegEventCallbackFunc(FILLP_CONFIG_ALL_SOCKET, FillpStatistics);
897     int value = 1;
898     auto err = FtSetSockOpt(streamFd_, IPPROTO_FILLP, FILLP_SOCK_TRAFFIC, &value, sizeof(value));
899     if (err < 0) {
900         TRANS_LOGE(TRANS_STREAM, "fail to set socket binding to device");
901         return;
902     }
903     TRANS_LOGD(TRANS_STREAM, "FtSetSockOpt start success");
904     if (isServer) {
905         if (regStatisticsRet == 0) {
906             TRANS_LOGI(TRANS_STREAM,
907                 "Success to register the stream callback function at server side. streamFd=%{public}d", streamFd_);
908         } else {
909             TRANS_LOGE(TRANS_STREAM,
910                 "Fail to register the stream callback function at server side. streamFd=%{public}d, errno=%{public}d",
911                 streamFd_, regStatisticsRet);
912         }
913     } else {
914         if (regStatisticsRet == 0) {
915             TRANS_LOGI(TRANS_STREAM,
916                 "Success to register the stream callback function at client side. streamFd=%{public}d", streamFd_);
917         } else {
918             TRANS_LOGE(TRANS_STREAM,
919                 "Fail to register the stream callback function at client side. streamFd=%{public}d, errno=%{public}d",
920                 streamFd_, regStatisticsRet);
921         }
922     }
923 }
924 
Accept()925 bool VtpStreamSocket::Accept()
926 {
927     TRANS_LOGD(TRANS_STREAM, "enter.");
928     auto fd = FtAccept(listenFd_, nullptr, nullptr);
929     TRANS_LOGI(TRANS_STREAM, "accept streamFd=%{public}d", fd);
930     if (fd == -1) {
931         TRANS_LOGE(TRANS_STREAM, "errno=%{public}d", FtGetErrno());
932         return false;
933     }
934 
935     sockaddr remoteAddr {};
936     socklen_t remoteAddrLen = sizeof(remoteAddr);
937     auto ret = FtGetPeerName(fd, &remoteAddr, &remoteAddrLen);
938     if (ret != ERR_OK) {
939         TRANS_LOGE(TRANS_STREAM, "get name failed, fd=%{public}d", fd);
940         FtClose(fd);
941         return false;
942     }
943 
944     char host[ADDR_MAX_SIZE] = { 0 };
945     if (remoteAddr.sa_family == AF_INET) {
946         auto v4Addr = reinterpret_cast<const sockaddr_in *>(&remoteAddr);
947         remoteIpPort_.ip = SoftBusInetNtoP(AF_INET, &(v4Addr->sin_addr), host, ADDR_MAX_SIZE);
948         remoteIpPort_.port = v4Addr->sin_port;
949     } else {
950         auto v6Addr = reinterpret_cast<const sockaddr_in6 *>(&remoteAddr);
951         remoteIpPort_.ip = SoftBusInetNtoP(AF_INET6, &(v6Addr->sin6_addr), host, ADDR_MAX_SIZE);
952         remoteIpPort_.port = v6Addr->sin6_port;
953     }
954 
955     TRANS_LOGD(TRANS_STREAM, "Accept a client remotePort=%{public}d", remoteIpPort_.port);
956 
957     if (SetSocketEpollMode(fd) != ERR_OK) {
958         TRANS_LOGE(TRANS_STREAM, "SetSocketEpollMode failed, fd=%{public}d", fd);
959         FtClose(fd);
960         return false;
961     }
962 
963     std::lock_guard<std::mutex> guard(streamSocketLock_);
964     streamFd_ = fd;
965     configCv_.notify_all();
966 
967     if (streamReceiver_ != nullptr) {
968         TRANS_LOGI(TRANS_STREAM, "notify stream connected!");
969         streamReceiver_->OnStreamStatus(STREAM_CONNECTED);
970     }
971 
972     bool isServer = true;
973     RegisterMetricCallback(isServer); /* register the callback function */
974     /* enable the bandwidth and CQE estimation algorithms for current ftsocket */
975 #ifdef FILLP_SUPPORT_BW_DET
976     EnableBwEstimationAlgo(streamFd_, isServer);
977 #endif
978 #ifdef FILLP_SUPPORT_CQE
979     EnableJitterDetectionAlgo(streamFd_);
980 #endif
981 
982     TRANS_LOGI(TRANS_STREAM, "accept success!");
983     return true;
984 }
985 
EpollTimeout(int fd,int timeout)986 int VtpStreamSocket::EpollTimeout(int fd, int timeout)
987 {
988     struct SpungeEpollEvent events[MAX_EPOLL_NUM];
989     (void)memset_s(events, sizeof(events), 0, sizeof(events));
990     while (true) {
991         FILLP_INT fdNum = FtEpollWait(epollFd_, events, MAX_EPOLL_NUM, timeout);
992         if (fdNum <= 0) {
993             TRANS_LOGE(TRANS_STREAM,
994                 "FtEpollWait failed, ret=%{public}d, errno=%{public}d", fdNum, FtGetErrno());
995             return -FtGetErrno();
996         }
997 
998         for (FILLP_INT i = 0; i < fdNum; i++) {
999             if (events[i].data.fd != fd) {
1000                 continue;
1001             }
1002 
1003             if (events[i].events & (SPUNGE_EPOLLHUP | SPUNGE_EPOLLERR)) {
1004                 TRANS_LOGE(TRANS_STREAM,
1005                     "EpollTimeout, something may be wrong in this socket, fd=%{public}d, events=%{public}u", fd,
1006                     (unsigned int)events[i].events);
1007                 return -1;
1008             }
1009 
1010             if (events[i].events & SPUNGE_EPOLLIN) {
1011                 return SOFTBUS_OK;
1012             }
1013         }
1014     }
1015 }
1016 
SetSocketEpollMode(int fd)1017 int VtpStreamSocket::SetSocketEpollMode(int fd)
1018 {
1019     if (!SetNonBlockMode(fd, StreamAttr(true))) {
1020         TRANS_LOGE(TRANS_STREAM, "SetNonBlockMode failed, errno=%{public}d", FtGetErrno());
1021         return -1;
1022     }
1023 
1024     struct SpungeEpollEvent event = {0};
1025     event.events = SPUNGE_EPOLLIN;
1026     event.data.fd = fd;
1027 
1028     auto ret = FtEpollCtl(epollFd_, SPUNGE_EPOLL_CTL_ADD, fd, &event);
1029     if (ret != ERR_OK) {
1030         TRANS_LOGE(TRANS_STREAM, "FtEpollCtl failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
1031         return ret;
1032     }
1033 
1034     TRANS_LOGD(TRANS_STREAM, "SetNonBlockMode success");
1035     return SOFTBUS_OK;
1036 }
1037 
InsertBufferLength(int num,int length,uint8_t * output) const1038 void VtpStreamSocket::InsertBufferLength(int num, int length, uint8_t *output) const
1039 {
1040     for (int i = 0; i < length; i++) {
1041         output[length - 1 - i] = static_cast<unsigned int>(
1042             ((static_cast<unsigned int>(num) >> static_cast<unsigned int>(BYTE_TO_BIT * i))) & INT_TO_BYTE);
1043     }
1044 }
1045 
MakeStreamData(StreamData & data,const StreamFrameInfo & info) const1046 std::unique_ptr<IStream> VtpStreamSocket::MakeStreamData(StreamData &data, const StreamFrameInfo &info) const
1047 {
1048     std::unique_ptr<IStream> stream = nullptr;
1049     switch (streamType_) {
1050         case VIDEO_SLICE_STREAM:
1051             TRANS_LOGD(TRANS_STREAM, "do not support VIDEO_SLICE_STREAM streamType=%{public}d", streamType_);
1052             break;
1053         case COMMON_VIDEO_STREAM:
1054         case COMMON_AUDIO_STREAM:
1055             TRANS_LOGD(TRANS_STREAM,
1056                 "streamType=%{public}d, seqNum=%{public}d, streamId=%{public}d",
1057                 streamType_, info.seqNum, info.streamId);
1058             stream = IStream::MakeCommonStream(data, info);
1059             break;
1060         case RAW_STREAM:
1061             TRANS_LOGD(TRANS_STREAM, "streamType=%{public}d", streamType_);
1062             stream = IStream::MakeRawStream(data, info);
1063             break;
1064         default:
1065             TRANS_LOGE(TRANS_STREAM, "do not support streamType=%{public}d", streamType_);
1066             break;
1067     }
1068     if (stream == nullptr) {
1069         TRANS_LOGE(TRANS_STREAM, "IStream construct error");
1070         return nullptr;
1071     }
1072 
1073     return stream;
1074 }
1075 
RecvStreamLen()1076 int32_t VtpStreamSocket::RecvStreamLen()
1077 {
1078     int32_t hdrSize = FRAME_HEADER_LEN;
1079     if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1080         hdrSize = streamHdrSize_;
1081     }
1082 
1083     int32_t len = -1;
1084     int32_t timeout = -1;
1085     auto buffer = std::make_unique<char[]>(hdrSize);
1086     if (EpollTimeout(streamFd_, timeout) == 0) {
1087         do {
1088             len = FtRecv(streamFd_, buffer.get(), hdrSize, 0);
1089         } while (len <= 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
1090     }
1091     TRANS_LOGD(TRANS_STREAM, "recv frame header, len=%{public}d, scene=%{public}d", len, scene_);
1092 
1093     if (len <= 0) {
1094         TRANS_LOGE(TRANS_STREAM, "len invalid, len=%{public}d", len);
1095         return -1;
1096     }
1097 
1098     if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1099         std::lock_guard<std::mutex> guard(streamSocketLock_);
1100         if (streamReceiver_ != nullptr) {
1101             return streamReceiver_->OnStreamHdrReceived(std::move(buffer), hdrSize);
1102         }
1103     }
1104 
1105     return ntohl(*reinterpret_cast<int *>(buffer.get()));
1106 }
1107 
ProcessCommonDataStream(std::unique_ptr<char[]> & dataBuffer,int32_t & dataLength,std::unique_ptr<char[]> & extBuffer,int32_t & extLen,StreamFrameInfo & info)1108 bool VtpStreamSocket::ProcessCommonDataStream(std::unique_ptr<char[]> &dataBuffer,
1109     int32_t &dataLength, std::unique_ptr<char[]> &extBuffer, int32_t &extLen, StreamFrameInfo &info)
1110 {
1111     TRANS_LOGD(TRANS_STREAM, "recv common stream");
1112     int32_t decryptedLength = dataLength;
1113     auto decryptedBuffer = std::move(dataBuffer);
1114 
1115     int32_t plainDataLength = decryptedLength - GetEncryptOverhead();
1116     if (plainDataLength <= 0) {
1117         TRANS_LOGE(TRANS_STREAM, "Decrypt failed, invalid decryptedLen=%{public}d", decryptedLength);
1118         return false;
1119     }
1120     std::unique_ptr<char[]> plainData = std::make_unique<char[]>(plainDataLength);
1121     ssize_t decLen = Decrypt(decryptedBuffer.get(), decryptedLength, plainData.get(), plainDataLength);
1122     if (decLen != plainDataLength) {
1123         TRANS_LOGE(TRANS_STREAM,
1124             "Decrypt failed, dataLen=%{public}d, decryptedLen=%{public}zd", plainDataLength, decLen);
1125         return false;
1126     }
1127     auto header = plainData.get();
1128     StreamDepacketizer decode(streamType_);
1129     if (plainDataLength < static_cast<int32_t>(sizeof(CommonHeader))) {
1130         TRANS_LOGE(TRANS_STREAM,
1131             "failed, plainDataLen=%{public}d, CommonHeader=%{public}zu", plainDataLength, sizeof(CommonHeader));
1132         return false;
1133     }
1134     decode.DepacketizeHeader(header);
1135 
1136     auto buffer = plainData.get() + sizeof(CommonHeader);
1137     decode.DepacketizeBuffer(buffer, plainDataLength - sizeof(CommonHeader));
1138 
1139     extBuffer = decode.GetUserExt();
1140     extLen = decode.GetUserExtSize();
1141     info = decode.GetFrameInfo();
1142     dataBuffer = decode.GetData();
1143     dataLength = decode.GetDataLength();
1144     if (dataLength <= 0) {
1145         TRANS_LOGE(TRANS_STREAM, "common depacketize error, dataLen=%{public}d", dataLength);
1146         return false;
1147     }
1148     return true;
1149 }
1150 
1151 int64_t g_diffTime = 0;
1152 int64_t g_startTime = 0;
1153 int64_t g_lastTime = 0;
1154 const int TIME_OUT = 50;
1155 const int WAIT_UPLOAD_TIME = 300000;
1156 
DoStreamRecv()1157 void VtpStreamSocket::DoStreamRecv()
1158 {
1159     while (isStreamRecv_) {
1160         std::unique_ptr<char[]> dataBuffer = nullptr;
1161         std::unique_ptr<char[]> extBuffer = nullptr;
1162         int32_t extLen = 0;
1163         StreamFrameInfo info = {};
1164         if (g_startTime != 0) {
1165             g_diffTime = GetSoftbusRecordTimeMillis() - g_startTime;
1166             if (g_diffTime > TIME_OUT && GetSoftbusRecordTimeMillis() - g_lastTime >= WAIT_UPLOAD_TIME) {
1167                 g_lastTime = GetSoftbusRecordTimeMillis();
1168                 TRANS_LOGW(TRANS_STREAM, "recv stream difftime = %{public}" PRIu64, g_diffTime);
1169                 TransEventExtra extra = {
1170                     .costTime = (int32_t)g_diffTime,
1171                 };
1172                 TRANS_EVENT(EVENT_SCENE_TRANS_RECV_STREAM, EVENT_STAGE_TRANS_RECV_STREAM, extra);
1173             }
1174         }
1175         g_startTime = GetSoftbusRecordTimeMillis();
1176         TRANS_LOGD(TRANS_STREAM, "recv stream");
1177         int32_t dataLength = VtpStreamSocket::RecvStreamLen();
1178         if (dataLength <= 0 || dataLength > MAX_STREAM_LEN) {
1179             TRANS_LOGE(TRANS_STREAM, "read frame length error, dataLength=%{public}d", dataLength);
1180             break;
1181         }
1182         TRANS_LOGD(TRANS_STREAM,
1183             "recv a new frame, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1184         dataBuffer = VtpStreamSocket::RecvStream(dataLength);
1185 
1186         if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
1187             if (!ProcessCommonDataStream(dataBuffer, dataLength, extBuffer, extLen, info)) {
1188                 break;
1189             }
1190         }
1191 
1192         StreamData data = { std::move(dataBuffer), dataLength, std::move(extBuffer), extLen };
1193         std::unique_ptr<IStream> stream = MakeStreamData(data, info);
1194         if (stream == nullptr) {
1195             TRANS_LOGE(TRANS_STREAM, "MakeStreamData failed, stream is null");
1196             break;
1197         }
1198 
1199         TRANS_LOGD(TRANS_STREAM,
1200             "recv frame done, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1201 
1202         if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1203             std::lock_guard<std::mutex> guard(streamSocketLock_);
1204             if (streamReceiver_ != nullptr) {
1205                 streamReceiver_->OnStreamReceived(std::move(stream));
1206                 continue;
1207             }
1208         }
1209 
1210         PutStream(std::move(stream));
1211         TRANS_LOGD(TRANS_STREAM, "put frame done, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1212     }
1213     TRANS_LOGI(TRANS_STREAM, "recv thread exit");
1214 }
1215 
RecvStream(int32_t dataLength)1216 std::unique_ptr<char[]> VtpStreamSocket::RecvStream(int32_t dataLength)
1217 {
1218     auto buffer = std::make_unique<char[]>(dataLength);
1219     int32_t recvLen = 0;
1220     while (recvLen < dataLength) {
1221         int32_t ret = -1;
1222         int32_t timeout = -1;
1223 
1224         if (EpollTimeout(streamFd_, timeout) == 0) {
1225             do {
1226                 ret = FtRecv(streamFd_, (buffer.get() + recvLen), dataLength - recvLen, 0);
1227             } while (ret < 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
1228         }
1229 
1230         if (ret == -1) {
1231             TRANS_LOGE(TRANS_STREAM, "read frame failed, errno=%{public}d", FtGetErrno());
1232             return nullptr;
1233         }
1234 
1235         recvLen += ret;
1236     }
1237     return std::unique_ptr<char[]>(buffer.release());
1238 }
1239 
SetDefaultConfig(int fd)1240 void VtpStreamSocket::SetDefaultConfig(int fd)
1241 {
1242     if (!SetIpTos(fd, StreamAttr(static_cast<int>(IPTOS_LOWDELAY)))) {
1243         TRANS_LOGW(TRANS_STREAM, "SetIpTos failed");
1244     }
1245     // Set Fillp direct sending
1246     if (!EnableDirectlySend(fd)) {
1247         TRANS_LOGW(TRANS_STREAM, "EnableDirectlySend failed");
1248     }
1249 
1250     if (!EnableSemiReliable(fd)) {
1251         TRANS_LOGW(TRANS_STREAM, "EnableSemiReliable failed");
1252     }
1253     // Set Fillp Differentiated Transmission
1254     FILLP_BOOL enable = 1;
1255     if (!FtConfigSet(FT_CONF_APP_DIFFER_TRANSMIT, &enable, &fd)) {
1256         TRANS_LOGW(TRANS_STREAM, "Set differ transmit failed");
1257     }
1258 
1259     if (!SetOption(RECV_BUF_SIZE, StreamAttr(static_cast<int>(DEFAULT_UDP_BUFFER_RCV_SIZE)))) {
1260         TRANS_LOGW(TRANS_STREAM, "Set recv buff failed");
1261     }
1262 
1263     if (!SetOption(SEND_BUF_SIZE, StreamAttr(static_cast<int>(DEFAULT_UDP_BUFFER_SIZE)))) {
1264         TRANS_LOGW(TRANS_STREAM, "Set send buff failed");
1265     }
1266 
1267     if (!SetOption(InnerStreamOptionType::RECV_CACHE, StreamAttr(static_cast<int>(FILLP_VTP_RECV_CACHE_SIZE)))) {
1268         TRANS_LOGW(TRANS_STREAM, "Set recv cache failed");
1269     }
1270 
1271     if (!SetOption(InnerStreamOptionType::SEND_CACHE, StreamAttr(static_cast<int>(FILLP_VTP_SEND_CACHE_SIZE)))) {
1272         TRANS_LOGW(TRANS_STREAM, "Set send cache failed");
1273     }
1274 }
1275 
SetIpTos(int fd,const StreamAttr & tos)1276 bool VtpStreamSocket::SetIpTos(int fd, const StreamAttr &tos)
1277 {
1278     auto tmp = tos.GetIntValue();
1279     if (FtSetSockOpt(fd, IPPROTO_IP, IP_TOS, &tmp, sizeof(tmp)) != ERR_OK) {
1280         TRANS_LOGE(TRANS_STREAM, "SetIpTos wrong! fd=%{public}d, errno=%{public}d", fd, FtGetErrno());
1281         return false;
1282     }
1283 
1284     TRANS_LOGD(TRANS_STREAM, "Success to set ip tos: fd=%{public}d, tos=%{public}d", fd, tmp);
1285     return true;
1286 }
1287 
GetIpTos(int type) const1288 StreamAttr VtpStreamSocket::GetIpTos(int type) const
1289 {
1290     static_cast<void>(type);
1291     int tos;
1292     int size = sizeof(tos);
1293 
1294     if (FtGetSockOpt(streamFd_, IPPROTO_IP, IP_TOS, &tos, &size) != ERR_OK) {
1295         TRANS_LOGE(TRANS_STREAM, "FtGetSockOpt errno=%{public}d", FtGetErrno());
1296         return std::move(StreamAttr());
1297     }
1298 
1299     return std::move(StreamAttr(tos));
1300 }
1301 
GetStreamSocketFd(int type) const1302 StreamAttr VtpStreamSocket::GetStreamSocketFd(int type) const
1303 {
1304     static_cast<void>(type);
1305     return std::move(StreamAttr(streamFd_));
1306 }
1307 
GetListenSocketFd(int type) const1308 StreamAttr VtpStreamSocket::GetListenSocketFd(int type) const
1309 {
1310     static_cast<void>(type);
1311     return std::move(StreamAttr(listenFd_));
1312 }
1313 
SetSocketBoundInner(int fd,std::string ip) const1314 bool VtpStreamSocket::SetSocketBoundInner(int fd, std::string ip) const
1315 {
1316     auto boundIp = (ip.empty()) ? localIpPort_.ip : ip;
1317     struct ifaddrs *ifList = nullptr;
1318     if (getifaddrs(&ifList) < 0) {
1319         TRANS_LOGE(TRANS_STREAM,
1320             "get interface address return errno=%{public}d, strerror=%{public}s", errno, strerror(errno));
1321         return false;
1322     }
1323 
1324     struct ifaddrs *ifa = nullptr;
1325     for (ifa = ifList; ifa != nullptr; ifa = ifa->ifa_next) {
1326         if (ifa->ifa_addr == nullptr) {
1327             continue;
1328         }
1329         if (ifa->ifa_addr->sa_family != AF_INET) {
1330             continue;
1331         }
1332 
1333         char host[ADDR_MAX_SIZE] = { 0 };
1334         std::string devName(ifa->ifa_name);
1335         if (strcmp(boundIp.c_str(), SoftBusInetNtoP(AF_INET, &(((struct sockaddr_in *)ifa->ifa_addr)->sin_addr),
1336             host, ADDR_MAX_SIZE)) == 0) {
1337             TRANS_LOGI(TRANS_STREAM, "current use interface to bind to socket. ifName=%{public}s", ifa->ifa_name);
1338             auto err = FtSetSockOpt(fd, SOL_SOCKET, SO_BINDTODEVICE, devName.c_str(), devName.size());
1339             if (err < 0) {
1340                 TRANS_LOGE(TRANS_STREAM, "fail to set socket binding to device");
1341                 freeifaddrs(ifList);
1342                 return false;
1343             }
1344             break;
1345         }
1346     }
1347     freeifaddrs(ifList);
1348 
1349     return true;
1350 }
1351 
SetSocketBindToDevices(int type,const StreamAttr & ip)1352 bool VtpStreamSocket::SetSocketBindToDevices(int type, const StreamAttr &ip)
1353 {
1354     static_cast<void>(type);
1355     auto tmp = ip.GetStrValue();
1356     auto boundIp = (tmp.empty()) ? localIpPort_.ip : tmp;
1357     return SetSocketBoundInner(streamFd_, boundIp);
1358 }
1359 
SetVtpStackConfigDelayed(int type,const StreamAttr & value)1360 bool VtpStreamSocket::SetVtpStackConfigDelayed(int type, const StreamAttr &value)
1361 {
1362     std::unique_lock<std::mutex> lock(streamSocketLock_);
1363     if (streamFd_ == -1) {
1364         configCv_.wait(lock, [this] { return streamFd_ != -1; });
1365     }
1366     TRANS_LOGD(TRANS_STREAM, "set vtp stack config, streamFd=%{public}d", streamFd_);
1367     return SetVtpStackConfig(type, value);
1368 }
1369 
SetVtpStackConfig(int type,const StreamAttr & value)1370 bool VtpStreamSocket::SetVtpStackConfig(int type, const StreamAttr &value)
1371 {
1372     if (streamFd_ == -1) {
1373         TRANS_LOGI(TRANS_STREAM, "set vtp stack config when streamFd is legal, type=%{public}d", type);
1374         auto self = GetSelf();
1375         std::thread([self, type, value]() {
1376             const std::string threadName = "OS_setVtpCfg";
1377             pthread_setname_np(pthread_self(), threadName.c_str());
1378             self->SetVtpStackConfigDelayed(type, value);
1379             }).detach();
1380         return true;
1381     }
1382 
1383     if (value.GetType() == INT_TYPE) {
1384         int intVal = value.GetIntValue();
1385         int ret = FtConfigSet(type, &intVal, &streamFd_);
1386         if (ret != SOFTBUS_OK) {
1387             TRANS_LOGE(TRANS_STREAM,
1388                 "FtConfigSet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1389             return false;
1390         }
1391 
1392         TRANS_LOGI(TRANS_STREAM,
1393             "setVtpConfig success, type=%{public}d, streamFd=%{public}d, value=%{public}d", type, streamFd_, intVal);
1394         return true;
1395     }
1396 
1397     if (value.GetType() == BOOL_TYPE) {
1398         bool flag = value.GetBoolValue();
1399         int ret = FtConfigSet(type, &flag, &streamFd_);
1400         if (ret != SOFTBUS_OK) {
1401             TRANS_LOGE(TRANS_STREAM,
1402                 "FtConfigSet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1403             return false;
1404         }
1405 
1406         TRANS_LOGI(TRANS_STREAM,
1407             "setVtpConfig success, streamFd=%{public}d, flag=%{public}d, type=%{public}d", type, streamFd_, flag);
1408         return true;
1409     }
1410 
1411     TRANS_LOGE(TRANS_STREAM, "UNKNOWN TYPE!");
1412     return false;
1413 }
1414 
GetVtpStackConfig(int type) const1415 StreamAttr VtpStreamSocket::GetVtpStackConfig(int type) const
1416 {
1417     int intVal = -1;
1418     int configFd = (streamFd_ == -1) ? FILLP_CONFIG_ALL_SOCKET : streamFd_;
1419     int ret = FtConfigGet(type, &intVal, &configFd);
1420     if (ret != SOFTBUS_OK) {
1421         TRANS_LOGE(TRANS_STREAM,
1422             "FtConfigGet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1423         return std::move(StreamAttr());
1424     }
1425 
1426     int valType = ValueType::UNKNOWN;
1427     for (auto it = FILLP_TYPE_MAP.begin(); it != FILLP_TYPE_MAP.end(); it++) {
1428         if (it->second != type) {
1429             continue;
1430         }
1431 
1432         valType = optFuncMap_.at(it->first).valueType;
1433         break;
1434     }
1435 
1436     if (valType != ValueType::UNKNOWN) {
1437         for (auto it = INNER_FILLP_TYPE_MAP.begin(); it != INNER_FILLP_TYPE_MAP.end(); it++) {
1438             if (it->second != type) {
1439                 continue;
1440             }
1441 
1442             valType = optFuncMap_.at(it->first).valueType;
1443             break;
1444         }
1445     }
1446 
1447     if (valType == BOOL_TYPE) {
1448         return std::move(StreamAttr(!!intVal));
1449     }
1450 
1451     return std::move(StreamAttr(intVal));
1452 }
1453 
SetNonBlockMode(int fd,const StreamAttr & value)1454 bool VtpStreamSocket::SetNonBlockMode(int fd, const StreamAttr &value)
1455 {
1456     FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1457     if (flags < 0) {
1458         TRANS_LOGE(TRANS_STREAM, "failed to get FtFcntl, flags=%{public}d", flags);
1459         flags = 0;
1460     }
1461     bool nonBlock = value.GetBoolValue();
1462 
1463     flags = nonBlock ? static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) | O_NONBLOCK)) :
1464         static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) & ~O_NONBLOCK));
1465 
1466     FILLP_INT res = FtFcntl(fd, F_SETFL, flags);
1467     if (res < 0) {
1468         TRANS_LOGE(TRANS_STREAM, "failed to set FtFcntl, res=%{public}d", res);
1469         return false;
1470     }
1471 
1472     TRANS_LOGI(TRANS_STREAM, "Successfully to set fd=%{public}d, nonBlock=%{public}d", fd, nonBlock);
1473     return true;
1474 }
1475 
GetNonBlockMode(int fd) const1476 StreamAttr VtpStreamSocket::GetNonBlockMode(int fd) const
1477 {
1478     FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1479     if (static_cast<unsigned int>(flags) & O_NONBLOCK) {
1480         return std::move(StreamAttr(true));
1481     }
1482 
1483     return std::move(StreamAttr(false));
1484 }
1485 
GetIp(int type) const1486 StreamAttr VtpStreamSocket::GetIp(int type) const
1487 {
1488     if (type == LOCAL_IP) {
1489         return std::move(StreamAttr(localIpPort_.ip));
1490     }
1491 
1492     return std::move(StreamAttr(remoteIpPort_.ip));
1493 }
1494 
GetPort(int type) const1495 StreamAttr VtpStreamSocket::GetPort(int type) const
1496 {
1497     if (type == LOCAL_PORT) {
1498         return std::move(StreamAttr(localIpPort_.port));
1499     }
1500     return std::move(StreamAttr(remoteIpPort_.port));
1501 }
1502 
SetStreamType(int type,const StreamAttr & value)1503 bool VtpStreamSocket::SetStreamType(int type, const StreamAttr &value)
1504 {
1505     if (type != STREAM_TYPE_INT) {
1506         return false;
1507     }
1508 
1509     streamType_ = value.GetIntValue();
1510     return true;
1511 }
1512 
GetStreamType(int type) const1513 StreamAttr VtpStreamSocket::GetStreamType(int type) const
1514 {
1515     if (type != STREAM_TYPE_INT) {
1516         return std::move(StreamAttr());
1517     }
1518 
1519     return std::move(StreamAttr(streamType_));
1520 }
1521 
SetStreamScene(int type,const StreamAttr & value)1522 bool VtpStreamSocket::SetStreamScene(int type, const StreamAttr &value)
1523 {
1524     static_cast<void>(type);
1525     if (value.GetType() != INT_TYPE) {
1526         TRANS_LOGE(TRANS_STREAM, "value.GetType=%{public}d", value.GetType());
1527         return false;
1528     }
1529     scene_ = value.GetIntValue();
1530     TRANS_LOGI(TRANS_STREAM, "Set scene=%{public}d", scene_);
1531     return true;
1532 }
1533 
SetStreamHeaderSize(int type,const StreamAttr & value)1534 bool VtpStreamSocket::SetStreamHeaderSize(int type, const StreamAttr &value)
1535 {
1536     static_cast<void>(type);
1537     if (value.GetType() != INT_TYPE) {
1538         TRANS_LOGE(TRANS_STREAM, "value.GetType=%{public}d", value.GetType());
1539         return false;
1540     }
1541     streamHdrSize_ = value.GetIntValue();
1542     TRANS_LOGI(TRANS_STREAM, "Set headerSize=%{public}d", streamHdrSize_);
1543     return true;
1544 }
1545 
NotifyStreamListener()1546 void VtpStreamSocket::NotifyStreamListener()
1547 {
1548     while (isStreamRecv_) {
1549         int streamNum = GetStreamNum();
1550         if (streamNum >= STREAM_BUFFER_THRESHOLD) {
1551             TRANS_LOGW(TRANS_STREAM, "Too many data in receiver, streamNum=%{public}d", streamNum);
1552         }
1553 
1554         auto stream = TakeStream();
1555         if (stream == nullptr) {
1556             TRANS_LOGE(TRANS_STREAM, "Pop stream failed");
1557             break;
1558         }
1559 
1560         std::lock_guard<std::mutex> guard(streamSocketLock_);
1561         if (streamReceiver_ != nullptr) {
1562             TRANS_LOGD(TRANS_STREAM, "notify listener");
1563             streamReceiver_->OnStreamReceived(std::move(stream));
1564             TRANS_LOGD(TRANS_STREAM, "notify listener done.");
1565         }
1566     }
1567     TRANS_LOGI(TRANS_STREAM, "notify thread exit");
1568 }
1569 
GetEncryptOverhead() const1570 ssize_t VtpStreamSocket::GetEncryptOverhead() const
1571 {
1572     return OVERHEAD_LEN;
1573 }
1574 
Encrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1575 ssize_t VtpStreamSocket::Encrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1576 {
1577     if (in == nullptr || out == nullptr) {
1578         TRANS_LOGE(TRANS_STREAM, "param invalid");
1579         return SOFTBUS_INVALID_PARAM;
1580     }
1581     AesGcmCipherKey cipherKey = {0};
1582 
1583     if (inLen - OVERHEAD_LEN > outLen) {
1584         TRANS_LOGE(TRANS_STREAM, "Encrypt invalid para.");
1585         return SOFTBUS_INVALID_PARAM;
1586     }
1587 
1588     cipherKey.keyLen = SESSION_KEY_LENGTH;
1589     if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.first, sessionKey_.second) != EOK) {
1590         TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
1591         return SOFTBUS_MEM_ERR;
1592     }
1593 
1594     int ret = SoftBusEncryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1595     (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1596     if (ret != SOFTBUS_OK || outLen != inLen + OVERHEAD_LEN) {
1597         TRANS_LOGE(TRANS_STREAM, "Encrypt Data fail. ret=%{public}d", ret);
1598         return SOFTBUS_ENCRYPT_ERR;
1599     }
1600     return outLen;
1601 }
1602 
Decrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1603 ssize_t VtpStreamSocket::Decrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1604 {
1605     if (in == nullptr || out == nullptr) {
1606         TRANS_LOGE(TRANS_STREAM, "param invalid");
1607         return SOFTBUS_INVALID_PARAM;
1608     }
1609     AesGcmCipherKey cipherKey = {0};
1610 
1611     if (inLen - OVERHEAD_LEN > outLen) {
1612         TRANS_LOGE(TRANS_STREAM, "Decrypt invalid para.");
1613         return SOFTBUS_INVALID_PARAM;
1614     }
1615 
1616     cipherKey.keyLen = SESSION_KEY_LENGTH; // 256 bit encryption
1617     if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.first, sessionKey_.second) != EOK) {
1618         TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
1619         return SOFTBUS_MEM_ERR;
1620     }
1621     int ret = SoftBusDecryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1622     (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1623     if (ret != SOFTBUS_OK) {
1624         TRANS_LOGE(TRANS_STREAM, "Decrypt Data fail. ret=%{public}d ", ret);
1625         return SOFTBUS_DECRYPT_ERR;
1626     }
1627 
1628     return outLen;
1629 }
1630 
VtpSetSocketMultiLayerPacked(int fd,OnFrameEvt * cb,const void * para)1631 static int32_t VtpSetSocketMultiLayerPacked(int fd, OnFrameEvt *cb, const void *para)
1632 {
1633     ClientEnhanceFuncList *pfnClientEnhanceFuncList = ClientEnhanceFuncListGet();
1634     if (pfnClientEnhanceFuncList->vtpSetSocketMultiLayer == nullptr) {
1635         TRANS_LOGE(TRANS_STREAM, "Func vtpSetSocketMultiLayer is not registered");
1636         return SOFTBUS_FUNC_NOT_SUPPORT;
1637     }
1638     return pfnClientEnhanceFuncList->vtpSetSocketMultiLayer(fd, cb, para);
1639 }
1640 
SetMultiLayer(const void * para)1641 int32_t VtpStreamSocket::SetMultiLayer(const void *para)
1642 {
1643     int fd = GetStreamSocketFd(FD).GetIntValue();
1644     return VtpSetSocketMultiLayerPacked(fd, &onStreamEvtCb_, para);
1645 }
1646 
CreateServerProcessThread()1647 void VtpStreamSocket::CreateServerProcessThread()
1648 {
1649     auto self = this->GetSelf();
1650     std::thread([self]() {
1651         const std::string threadName = "OS_sntfStmLsn";
1652         pthread_setname_np(pthread_self(), threadName.c_str());
1653         self->NotifyStreamListener();
1654         }).detach();
1655 
1656     std::thread([self]() {
1657         const std::string threadName = "OS_sdstyStmSkt";
1658         pthread_setname_np(pthread_self(), threadName.c_str());
1659         if (!self->Accept()) {
1660             self->DestroyStreamSocket();
1661             return;
1662         }
1663         self->DoStreamRecv();
1664         self->DestroyStreamSocket();
1665         }).detach();
1666 
1667     bool &isDestroyed = isDestroyed_;
1668     std::thread([self, &isDestroyed]() {
1669         const std::string threadName = "OS_sfillStatic";
1670         pthread_setname_np(pthread_self(), threadName.c_str());
1671         while (!isDestroyed) {
1672             self->FillpAppStatistics();
1673             std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
1674         }
1675         }).detach();
1676 }
1677 
CreateClientProcessThread()1678 void VtpStreamSocket::CreateClientProcessThread()
1679 {
1680     auto self = this->GetSelf();
1681     std::thread([self]() {
1682         const std::string threadName = "OS_cntfStmLsn";
1683         pthread_setname_np(pthread_self(), threadName.c_str());
1684         self->NotifyStreamListener();
1685         }).detach();
1686 
1687     std::thread([self]() {
1688         const std::string threadName = "OS_cdstyStmSkt";
1689         pthread_setname_np(pthread_self(), threadName.c_str());
1690         self->DoStreamRecv();
1691         self->DestroyStreamSocket();
1692         }).detach();
1693 
1694     bool &isDestroyed = isDestroyed_;
1695     std::thread([self, &isDestroyed]() {
1696         const std::string threadName = "OS_cfillStatic";
1697         pthread_setname_np(pthread_self(), threadName.c_str());
1698         while (!isDestroyed) {
1699             self->FillpAppStatistics();
1700             std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
1701         }
1702         }).detach();
1703 }
1704 } // namespace SoftBus
1705 } // namespace Communication
1706