1 /*
2 * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "vtp_stream_socket.h"
17
18 #include <ifaddrs.h>
19 #include <thread>
20
21 #include "fillpinc.h"
22 #include "g_enhance_sdk_func.h"
23 #include "raw_stream_data.h"
24 #include "session.h"
25 #include "legacy/softbus_hisysevt_transreporter.h"
26 #include "softbus_adapter_crypto.h"
27 #include "softbus_adapter_socket.h"
28 #include "stream_depacketizer.h"
29 #include "stream_packetizer.h"
30 #include "trans_event.h"
31
32 namespace Communication {
33 namespace SoftBus {
34 bool g_logOn = false;
35 const int32_t FEED_BACK_PERIOD = 1; /* feedback period of fillp stream traffic statistics is 1s */
36 const int32_t MS_PER_SECOND = 1000;
37 const int32_t US_PER_MS = 1000;
38 static constexpr int32_t MAX_PORT = 65535;
39
40 namespace {
PrintOptionInfo(int type,const StreamAttr & value)41 void PrintOptionInfo(int type, const StreamAttr &value)
42 {
43 switch (value.GetType()) {
44 case INT_TYPE:
45 TRANS_LOGI(TRANS_STREAM,
46 "Int option: type=%{public}d, value=%{public}d", type, value.GetIntValue());
47 break;
48 case BOOL_TYPE:
49 TRANS_LOGI(TRANS_STREAM,
50 "Bool option: type=%{public}d, value=%{public}d", type, value.GetBoolValue());
51 break;
52 case STRING_TYPE:
53 TRANS_LOGD(TRANS_STREAM,
54 "String option: type=%{public}d, value=%{public}s", type, value.GetStrValue().c_str());
55 break;
56 default:
57 TRANS_LOGE(TRANS_STREAM, "Wrong StreamAttr!");
58 (void)type;
59 }
60 }
61 } // namespace
62 std::shared_ptr<VtpInstance> VtpStreamSocket::vtpInstance_ = VtpInstance::GetVtpInstance();
63
64 std::map<int, std::mutex &> VtpStreamSocket::g_streamSocketLockMap;
65 std::mutex VtpStreamSocket::g_streamSocketLockMapLock_;
66 std::map<int, std::shared_ptr<VtpStreamSocket>> VtpStreamSocket::g_streamSocketMap;
67 std::mutex VtpStreamSocket::g_streamSocketMapLock_;
68
ConvertStreamFrameInfo2FrameInfo(FrameInfo * frameInfo,const Communication::SoftBus::StreamFrameInfo * streamFrameInfo)69 static inline void ConvertStreamFrameInfo2FrameInfo(FrameInfo* frameInfo,
70 const Communication::SoftBus::StreamFrameInfo* streamFrameInfo)
71 {
72 frameInfo->frameType = (FILLP_INT)(streamFrameInfo->frameType);
73 frameInfo->seqNum = (FILLP_INT)(streamFrameInfo->seqNum);
74 frameInfo->subSeqNum = (FILLP_INT)(streamFrameInfo->seqSubNum);
75 frameInfo->level = (FILLP_INT)(streamFrameInfo->level);
76 frameInfo->timestamp = (FILLP_SLONG)streamFrameInfo->timeStamp;
77 frameInfo->bitMap = (FILLP_UINT32)streamFrameInfo->bitMap;
78 }
79
AddStreamSocketLock(int fd,std::mutex & streamsocketlock)80 void VtpStreamSocket::AddStreamSocketLock(int fd, std::mutex &streamsocketlock)
81 {
82 std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
83 if (!g_streamSocketLockMap.empty() && g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
84 TRANS_LOGE(TRANS_STREAM, "streamsocketlock for the fd already exists. fd=%{public}d", fd);
85 return;
86 }
87
88 g_streamSocketLockMap.emplace(std::pair<int, std::mutex &>(fd, streamsocketlock));
89 }
90
AddStreamSocketListener(int fd,std::shared_ptr<VtpStreamSocket> streamreceiver)91 void VtpStreamSocket::AddStreamSocketListener(int fd, std::shared_ptr<VtpStreamSocket> streamreceiver)
92 {
93 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
94 if (!g_streamSocketMap.empty() && g_streamSocketMap.find(fd) != g_streamSocketMap.end()) {
95 TRANS_LOGE(TRANS_STREAM, "streamreceiver for the fd already exists. fd=%{public}d", fd);
96 return;
97 }
98
99 g_streamSocketMap.emplace(std::pair<int, std::shared_ptr<VtpStreamSocket>>(fd, streamreceiver));
100 }
101
RemoveStreamSocketLock(int fd)102 void VtpStreamSocket::RemoveStreamSocketLock(int fd)
103 {
104 std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
105 if (g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
106 g_streamSocketLockMap.erase(fd);
107 TRANS_LOGI(TRANS_STREAM, "Remove streamsocketlock for the fd success. fd=%{public}d", fd);
108 } else {
109 TRANS_LOGE(TRANS_STREAM,
110 "Streamsocketlock for the fd not exist in the map. fd=%{public}d", fd);
111 }
112 }
113
RemoveStreamSocketListener(int fd)114 void VtpStreamSocket::RemoveStreamSocketListener(int fd)
115 {
116 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
117 if (g_streamSocketMap.find(fd) != g_streamSocketMap.end()) {
118 g_streamSocketMap.erase(fd);
119 TRANS_LOGI(TRANS_STREAM, "Remove streamreceiver for the fd success. fd=%{public}d", fd);
120 } else {
121 TRANS_LOGE(TRANS_STREAM, "Streamreceiver for the fd not exist in the map. fd=%{public}d", fd);
122 }
123 }
124
InsertElementToFuncMap(int type,ValueType valueType,MySetFunc set,MyGetFunc get)125 void VtpStreamSocket::InsertElementToFuncMap(int type, ValueType valueType, MySetFunc set, MyGetFunc get)
126 {
127 OptionFunc fun = {
128 valueType, set, get
129 };
130 optFuncMap_.insert(std::pair<int, OptionFunc>(type, fun));
131 }
132
VtpStreamSocket()133 VtpStreamSocket::VtpStreamSocket()
134 {
135 InsertElementToFuncMap(TOS, INT_TYPE, &VtpStreamSocket::SetIpTos, &VtpStreamSocket::GetIpTos);
136 InsertElementToFuncMap(FD, INT_TYPE, nullptr, &VtpStreamSocket::GetStreamSocketFd);
137 InsertElementToFuncMap(SERVER_FD, INT_TYPE, nullptr, &VtpStreamSocket::GetListenSocketFd);
138 InsertElementToFuncMap(LOCAL_IP, INT_TYPE, nullptr, &VtpStreamSocket::GetIp);
139 InsertElementToFuncMap(LOCAL_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
140 InsertElementToFuncMap(REMOTE_IP, STRING_TYPE, nullptr, &VtpStreamSocket::GetIp);
141 InsertElementToFuncMap(REMOTE_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
142 InsertElementToFuncMap(BOUND_INTERFACE_IP, STRING_TYPE, &VtpStreamSocket::SetSocketBindToDevices, nullptr);
143 InsertElementToFuncMap(IP_TYPE, STRING_TYPE, nullptr, &VtpStreamSocket::GetIpType);
144 InsertElementToFuncMap(REMOTE_SCOPE_ID, INT_TYPE, nullptr, &VtpStreamSocket::GetRemoteScopeId);
145 InsertElementToFuncMap(NON_BLOCK, BOOL_TYPE, &VtpStreamSocket::SetNonBlockMode, &VtpStreamSocket::GetNonBlockMode);
146 InsertElementToFuncMap(KEEP_ALIVE_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig, nullptr);
147 InsertElementToFuncMap(SEND_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
148 &VtpStreamSocket::GetVtpStackConfig);
149 InsertElementToFuncMap(RECV_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
150 &VtpStreamSocket::GetVtpStackConfig);
151 InsertElementToFuncMap(SEND_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
152 &VtpStreamSocket::GetVtpStackConfig);
153 InsertElementToFuncMap(RECV_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
154 &VtpStreamSocket::GetVtpStackConfig);
155 InsertElementToFuncMap(PACKET_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
156 &VtpStreamSocket::GetVtpStackConfig);
157 InsertElementToFuncMap(MAX_VTP_SOCKET_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
158 &VtpStreamSocket::GetVtpStackConfig);
159 InsertElementToFuncMap(MAX_VTP_CONNECT_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
160 &VtpStreamSocket::GetVtpStackConfig);
161 InsertElementToFuncMap(REDUNANCY_SWITCH, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
162 &VtpStreamSocket::GetVtpStackConfig);
163 InsertElementToFuncMap(REDUNANCY_LEVEL, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
164 &VtpStreamSocket::GetVtpStackConfig);
165 InsertElementToFuncMap(NACK_DELAY, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
166 &VtpStreamSocket::GetVtpStackConfig);
167 InsertElementToFuncMap(NACK_DELAY_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
168 &VtpStreamSocket::GetVtpStackConfig);
169 InsertElementToFuncMap(PACK_INTERVAL_ENLARGE, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
170 &VtpStreamSocket::GetVtpStackConfig);
171 InsertElementToFuncMap(STREAM_TYPE_INT, INT_TYPE, &VtpStreamSocket::SetStreamType, &VtpStreamSocket::GetStreamType);
172 InsertElementToFuncMap(IS_SERVER, INT_TYPE, nullptr, &VtpStreamSocket::IsServer);
173 InsertElementToFuncMap(SCENE, INT_TYPE, &VtpStreamSocket::SetStreamScene, nullptr);
174 InsertElementToFuncMap(STREAM_HEADER_SIZE, INT_TYPE, &VtpStreamSocket::SetStreamHeaderSize, nullptr);
175
176 scene_ = UNKNOWN_SCENE;
177 }
178
~VtpStreamSocket()179 VtpStreamSocket::~VtpStreamSocket()
180 {
181 TRANS_LOGW(TRANS_STREAM, "~VtpStreamSocket");
182 }
183
GetSelf()184 std::shared_ptr<VtpStreamSocket> VtpStreamSocket::GetSelf()
185 {
186 return shared_from_this();
187 }
188
HandleFillpFrameStats(int fd,const FtEventCbkInfo * info)189 int VtpStreamSocket::HandleFillpFrameStats(int fd, const FtEventCbkInfo *info)
190 {
191 if (info == nullptr) {
192 TRANS_LOGE(TRANS_STREAM, "stats info is nullptr");
193 return SOFTBUS_INVALID_PARAM;
194 }
195 StreamSendStats stats = {};
196 if (memcpy_s(&stats, sizeof(StreamSendStats), &info->info.frameSendStats,
197 sizeof(info->info.frameSendStats)) != EOK) {
198 TRANS_LOGE(TRANS_STREAM, "streamStats info memcpy fail");
199 return SOFTBUS_MEM_ERR;
200 }
201
202 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
203 auto itListener = g_streamSocketMap.find(fd);
204 if (itListener != g_streamSocketMap.end()) {
205 if (itListener->second->streamReceiver_ != nullptr) {
206 TRANS_LOGD(TRANS_STREAM, "OnFrameStats enter");
207 itListener->second->streamReceiver_->OnFrameStats(&stats);
208 } else {
209 TRANS_LOGE(TRANS_STREAM, "streamReceiver_ is nullptr");
210 }
211 } else {
212 TRANS_LOGE(TRANS_STREAM, "StreamReceiver for the fd is empty in the map. fd=%{public}d", fd);
213 }
214 return SOFTBUS_OK;
215 }
216
HandleRipplePolicy(int fd,const FtEventCbkInfo * info)217 int VtpStreamSocket::HandleRipplePolicy(int fd, const FtEventCbkInfo *info)
218 {
219 if (info == nullptr) {
220 TRANS_LOGE(TRANS_STREAM, "policy info is nullptr");
221 return SOFTBUS_INVALID_PARAM;
222 }
223 TrafficStats stats;
224 (void)memset_s(&stats, sizeof(TrafficStats), 0, sizeof(TrafficStats));
225 if (memcpy_s(&stats.stats, sizeof(stats.stats), info->info.trafficData.stats,
226 sizeof(info->info.trafficData.stats)) != EOK) {
227 TRANS_LOGE(TRANS_STREAM, "RipplePolicy info memcpy fail");
228 return SOFTBUS_MEM_ERR;
229 }
230 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
231 auto itListener = g_streamSocketMap.find(fd);
232 if (itListener != g_streamSocketMap.end()) {
233 if (itListener->second->streamReceiver_ != nullptr) {
234 TRANS_LOGI(TRANS_STREAM, "OnRippleStats enter");
235 itListener->second->streamReceiver_->OnRippleStats(&stats);
236 } else {
237 TRANS_LOGE(TRANS_STREAM, "OnRippleStats streamReceiver_ is nullptr");
238 }
239 } else {
240 TRANS_LOGE(TRANS_STREAM,
241 "OnRippleStats streamReceiver for the fd is empty in the map. fd=%{public}d", fd);
242 }
243 return SOFTBUS_OK;
244 }
245
HandleFillpFrameEvt(int fd,const FtEventCbkInfo * info)246 int VtpStreamSocket::HandleFillpFrameEvt(int fd, const FtEventCbkInfo *info)
247 {
248 if (info == nullptr) {
249 TRANS_LOGE(TRANS_STREAM, "fd is %{public}d, info is nullptr", fd);
250 return SOFTBUS_INVALID_PARAM;
251 }
252 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
253 auto itListener = g_streamSocketMap.find(fd);
254 if (itListener != g_streamSocketMap.end()) {
255 return itListener->second->HandleFillpFrameEvtInner(fd, info);
256 } else {
257 TRANS_LOGE(TRANS_STREAM, "OnFillpFrameEvt for the fd is empty in the map. fd=%{public}d", fd);
258 }
259 return SOFTBUS_OK;
260 }
261
HandleVtpFrameEvtPacked(int fd,OnFrameEvt cb,const FtEventCbkInfo * info)262 static int HandleVtpFrameEvtPacked(int fd, OnFrameEvt cb, const FtEventCbkInfo *info)
263 {
264 ClientEnhanceFuncList *pfnClientEnhanceFuncList = ClientEnhanceFuncListGet();
265 if (pfnClientEnhanceFuncList->handleVtpFrameEvt == nullptr) {
266 TRANS_LOGE(TRANS_STREAM, "func handleVtpFrameEvt is not registered");
267 return SOFTBUS_FUNC_NOT_SUPPORT;
268 }
269 return pfnClientEnhanceFuncList->handleVtpFrameEvt(fd, cb, info);
270 }
271
HandleFillpFrameEvtInner(int fd,const FtEventCbkInfo * info)272 int VtpStreamSocket::HandleFillpFrameEvtInner(int fd, const FtEventCbkInfo *info)
273 {
274 if (onStreamEvtCb_ != nullptr) {
275 TRANS_LOGD(TRANS_STREAM, "onStreamEvtCb_ enter");
276 return HandleVtpFrameEvtPacked(fd, onStreamEvtCb_, info);
277 } else {
278 TRANS_LOGD(TRANS_STREAM, "onStreamEvtCb_ is nullptr");
279 }
280 return SOFTBUS_OK;
281 }
282
283 #ifdef FILLP_SUPPORT_BW_DET
FillSupportDet(int fd,const FtEventCbkInfo * info,QosTv * metricList)284 void VtpStreamSocket::FillSupportDet(int fd, const FtEventCbkInfo *info, QosTv *metricList)
285 {
286 if (info == nullptr || metricList == nullptr) {
287 TRANS_LOGE(TRANS_STREAM, "info or metricList is nullptr");
288 return;
289 }
290 if (info->evt == FT_EVT_BW_DET) {
291 TRANS_LOGI(TRANS_STREAM,
292 "[Metric Return]: Fillp bandwidth information of socket fd=%{public}d is returned", fd);
293 TRANS_LOGI(TRANS_STREAM,
294 "[Metric Return]: Changed amount of current available bandwidth=%{public}d", info->info.bwInfo.bwStat);
295 TRANS_LOGI(TRANS_STREAM,
296 "[Metric Return]: Current bandwidth for receiving data rate=%{public}d kbps", info->info.bwInfo.rate);
297 metricList->type = BANDWIDTH_ESTIMATE_VALUE;
298 metricList->info.bandwidthInfo.trend = info->info.bwInfo.bwStat;
299 metricList->info.bandwidthInfo.rate = info->info.bwInfo.rate;
300 }
301 if (info->evt == FT_EVT_JITTER_DET) {
302 TRANS_LOGI(TRANS_STREAM,
303 "[Metric Return]: Fillp connection quality information of socket fd=%{public}d is returned", fd);
304 TRANS_LOGI(TRANS_STREAM,
305 "[Metric Return]: Predeicted network condition jitterLevel=%{public}d", info->info.jitterInfo.jitterLevel);
306 TRANS_LOGI(TRANS_STREAM,
307 "[Metric Return]: Current available receiving buffer time=%{public}d ms", info->info.jitterInfo.bufferTime);
308 metricList->type = JITTER_DETECTION_VALUE;
309 metricList->info.jitterInfo.jitterLevel = info->info.jitterInfo.jitterLevel;
310 metricList->info.jitterInfo.bufferTime = info->info.jitterInfo.bufferTime;
311 }
312 }
313 #endif
314
IsVtpFrameSentEvtPacked(const FtEventCbkInfo * info)315 static bool IsVtpFrameSentEvtPacked(const FtEventCbkInfo *info)
316 {
317 ClientEnhanceFuncList *pfnClientEnhanceFuncList = ClientEnhanceFuncListGet();
318 if (pfnClientEnhanceFuncList->isVtpFrameSentEvt == nullptr) {
319 TRANS_LOGE(TRANS_STREAM, "Func isVtpFrameSentEvt is not registered");
320 return false;
321 }
322 return pfnClientEnhanceFuncList->isVtpFrameSentEvt(info);
323 }
324
325 /* This function is used to prompt the metrics returned by FtApiRegEventCallbackFunc() function */
FillpStatistics(int fd,const FtEventCbkInfo * info)326 int VtpStreamSocket::FillpStatistics(int fd, const FtEventCbkInfo *info)
327 {
328 if (info == nullptr || fd < 0) {
329 TRANS_LOGE(TRANS_STREAM, "param invalid fd is %{public}d", fd);
330 return SOFTBUS_INVALID_PARAM;
331 }
332 if (info->evt == FT_EVT_FRAME_STATS) {
333 TRANS_LOGI(TRANS_STREAM, "recv fillp frame stats");
334 return HandleFillpFrameStats(fd, info);
335 } else if (info->evt == FT_EVT_TRAFFIC_DATA) {
336 TRANS_LOGI(TRANS_STREAM, "recv fillp traffic data");
337 return HandleRipplePolicy(fd, info);
338 } else if (IsVtpFrameSentEvtPacked(info)) {
339 TRANS_LOGI(TRANS_STREAM, "fd %{public}d recv fillp frame send evt", fd);
340 return HandleFillpFrameEvt(fd, info);
341 }
342 #ifdef FILLP_SUPPORT_BW_DET
343 if (info->evt == FT_EVT_BW_DET || info->evt == FT_EVT_JITTER_DET) {
344 int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
345 int16_t tvCount = 1;
346 QosTv metricList = {};
347
348 FillSupportDet(fd, info, &metricList);
349 metricList.info.wifiChannelInfo = {};
350 metricList.info.frameStatusInfo = {};
351
352 std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
353 auto itLock = g_streamSocketLockMap.find(fd);
354 if (itLock != g_streamSocketLockMap.end()) {
355 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
356 auto itListener = g_streamSocketMap.find(fd);
357 if (itListener != g_streamSocketMap.end()) {
358 if (itListener->second->OnQosEvent == nullptr) {
359 TRANS_LOGE(TRANS_STREAM, "OnQosEvent is empty");
360 return SOFTBUS_INVALID_PARAM;
361 }
362 std::thread([itListener, eventId, tvCount, metricList, &itLock]() {
363 const std::string threadName = "OS_qosEvent";
364 pthread_setname_np(pthread_self(), threadName.c_str());
365 std::lock_guard<std::mutex> guard(itLock->second);
366 itListener->second->OnQosEvent(eventId, tvCount, &metricList);
367 }).detach();
368 } else {
369 TRANS_LOGE(TRANS_STREAM, "StreamReceiver for fd=%{public}d is empty in the map", fd);
370 }
371 } else {
372 TRANS_LOGE(TRANS_STREAM, "StreamSocketLock for fd=%{public}d is empty in the map", fd);
373 }
374 } else {
375 TRANS_LOGE(TRANS_STREAM,
376 "[Metric Return]: Fail to retrieve bandwidth and connection quality information");
377 return -1;
378 }
379 #endif
380 return SOFTBUS_OK;
381 }
382
FillpAppStatistics()383 void VtpStreamSocket::FillpAppStatistics()
384 {
385 int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
386 int16_t tvCount = 1;
387 QosTv metricList = {};
388 FillpStatisticsPcb fillpPcbStats = {};
389 SoftBusSysTime fillpStatsGetTime = {0};
390
391 int getStatisticsRet = FtFillpStatsGet(streamFd_, &fillpPcbStats);
392 SoftBusGetTime(&fillpStatsGetTime);
393 if (getStatisticsRet == 0) {
394 metricList.type = STREAM_TRAFFIC_STASTICS;
395 metricList.info.appStatistics.statisticsGotTime = static_cast<uint64_t>((fillpStatsGetTime.sec *
396 MS_PER_SECOND + fillpStatsGetTime.usec / US_PER_MS)); /* ms */
397 metricList.info.appStatistics.periodRecvBits =
398 static_cast<uint64_t>(fillpPcbStats.appFcStastics.periodRecvBits);
399 metricList.info.appStatistics.pktNum = fillpPcbStats.appFcStastics.pktNum;
400 metricList.info.appStatistics.periodRecvPkts = fillpPcbStats.appFcStastics.periodRecvPkts;
401 metricList.info.appStatistics.periodRecvPktLoss = fillpPcbStats.appFcStastics.periodRecvPktLoss;
402 metricList.info.appStatistics.periodRecvRate = fillpPcbStats.appFcStastics.periodRecvRate;
403 metricList.info.appStatistics.periodRecvRateBps = fillpPcbStats.appFcStastics.periodRecvRateBps;
404 metricList.info.appStatistics.periodRtt = fillpPcbStats.appFcStastics.periodRtt;
405 metricList.info.appStatistics.periodRecvPktLossHighPrecision =
406 fillpPcbStats.appFcStastics.periodRecvPktLossHighPrecision;
407 metricList.info.appStatistics.periodSendLostPkts = fillpPcbStats.appFcStastics.periodSendLostPkts;
408 metricList.info.appStatistics.periodSendPkts = fillpPcbStats.appFcStastics.periodSendPkts;
409 metricList.info.appStatistics.periodSendPktLossHighPrecision =
410 fillpPcbStats.appFcStastics.periodSendPktLossHighPrecision;
411 metricList.info.appStatistics.periodSendBits = fillpPcbStats.appFcStastics.periodSendBits;
412 metricList.info.appStatistics.periodSendRateBps = fillpPcbStats.appFcStastics.periodSendRateBps;
413
414 TRANS_LOGD(TRANS_STREAM,
415 "Succeed to get fillp statistics information for streamfd=%{public}d", streamFd_);
416 TRANS_LOGD(TRANS_STREAM,
417 "[Metric Return]: periodRtt=%{public}d", fillpPcbStats.appFcStastics.periodRtt);
418
419 std::lock_guard<std::mutex> guard(streamSocketLock_);
420
421 if (streamReceiver_ != nullptr) {
422 TRANS_LOGD(TRANS_STREAM,
423 "[Metric Notify]: Fillp traffic statistics information of socket is notified. streamfd=%{public}d",
424 streamFd_);
425 streamReceiver_->OnQosEvent(eventId, tvCount, &metricList);
426 } else {
427 TRANS_LOGE(TRANS_STREAM, "StreamReceiver for the streamFd is empty. streamFd=%{public}d", streamFd_);
428 }
429 } else {
430 TRANS_LOGE(TRANS_STREAM,
431 "Fail to get fillp statistics information for the streamfd. streamfd=%{public}d, errno=%{public}d",
432 streamFd_, FtGetErrno());
433 }
434 }
435
CreateClient(IpAndPort & local,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)436 bool VtpStreamSocket::CreateClient(IpAndPort &local, int streamType, std::pair<uint8_t*, uint32_t> sessionKey)
437 {
438 int fd = CreateAndBindSocket(local, false);
439 if (fd == -1) {
440 TRANS_LOGE(TRANS_STREAM, "CreateAndBindSocket failed, errno=%{public}d", FtGetErrno());
441 DestroyStreamSocket();
442 return false;
443 }
444
445 sessionKey_.second = sessionKey.second;
446 if (sessionKey_.first == nullptr) {
447 sessionKey_.first = new uint8_t[sessionKey_.second];
448 }
449 if (memcpy_s(sessionKey_.first, sessionKey_.second, sessionKey.first, sessionKey.second) != EOK) {
450 TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
451 return false;
452 }
453
454 streamType_ = streamType;
455 std::lock_guard<std::mutex> guard(streamSocketLock_);
456 streamFd_ = fd;
457 configCv_.notify_all();
458
459 TRANS_LOGI(TRANS_STREAM,
460 "Success to create a client socket. fd=%{public}d, streamType=%{public}d", fd, streamType);
461 return true;
462 }
463
CreateClient(IpAndPort & local,const IpAndPort & remote,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)464 bool VtpStreamSocket::CreateClient(IpAndPort &local, const IpAndPort &remote, int streamType,
465 std::pair<uint8_t*, uint32_t> sessionKey)
466 {
467 if (!CreateClient(local, streamType, sessionKey)) {
468 return false;
469 }
470 /* enable the bandwidth and CQE estimation algorithms by FtSetSockOpt() for current ftsocket */
471 #ifdef FILLP_SUPPORT_BW_DET
472 bool isServer = false;
473 EnableBwEstimationAlgo(streamFd_, isServer);
474 #endif
475
476 bool connectRet = Connect(remote);
477 if (connectRet) {
478 bool isServer = false;
479 RegisterMetricCallback(isServer); /* register the callback function */
480 }
481 return connectRet;
482 }
483
CreateServer(IpAndPort & local,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)484 bool VtpStreamSocket::CreateServer(IpAndPort &local, int streamType, std::pair<uint8_t*, uint32_t> sessionKey)
485 {
486 TRANS_LOGD(TRANS_STREAM, "enter.");
487 listenFd_ = CreateAndBindSocket(local, true);
488 if (listenFd_ == -1) {
489 TRANS_LOGE(TRANS_STREAM, "create listenFd failed, errno=%{public}d", FtGetErrno());
490 DestroyStreamSocket();
491 return false;
492 }
493
494 bool ret = FtListen(listenFd_, MAX_CONNECTION_VALUE);
495 if (ret) {
496 TRANS_LOGE(TRANS_STREAM, "FtListen failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
497 DestroyStreamSocket();
498 return false;
499 }
500
501 epollFd_ = FtEpollCreate();
502 if (epollFd_ < 0) {
503 TRANS_LOGE(TRANS_STREAM, "Failed to create epoll fd errno=%{public}d", FtGetErrno());
504 DestroyStreamSocket();
505 return false;
506 }
507 isStreamRecv_ = true;
508 streamType_ = streamType;
509 sessionKey_.second = sessionKey.second;
510 if (sessionKey_.first == nullptr) {
511 sessionKey_.first = new uint8_t[sessionKey_.second];
512 }
513 if (memcpy_s(sessionKey_.first, sessionKey_.second, sessionKey.first, sessionKey.second) != EOK) {
514 TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
515 return false;
516 }
517
518 CreateServerProcessThread();
519 TRANS_LOGI(TRANS_STREAM,
520 "CreateServer end, listenFd=%{public}d, epollFd=%{public}d, streamType=%{public}d", listenFd_, epollFd_,
521 streamType_);
522 return true;
523 }
524
DestroyStreamSocket()525 void VtpStreamSocket::DestroyStreamSocket()
526 {
527 TRANS_LOGD(TRANS_STREAM, "enter.");
528 std::lock_guard<std::mutex> guard(streamSocketLock_);
529 if (isDestroyed_) {
530 TRANS_LOGI(TRANS_STREAM, "StreamSocket is already destroyed");
531 return;
532 }
533 if (listenFd_ != -1) {
534 TRANS_LOGI(TRANS_STREAM, "listenFd_ enter FtClose");
535 FtClose(listenFd_);
536 listenFd_ = -1;
537 }
538
539 if (streamFd_ != -1) {
540 RemoveStreamSocketLock(streamFd_); /* remove the socket lock from the map */
541 RemoveStreamSocketListener(streamFd_); /* remove the socket listener from the map */
542 TRANS_LOGI(TRANS_STREAM, "streamFd_ enter FtClose");
543 FtClose(streamFd_);
544 streamFd_ = -1;
545 }
546
547 if (epollFd_ != -1) {
548 TRANS_LOGI(TRANS_STREAM, "epollFd_ enter FtClose");
549 FtClose(epollFd_);
550 epollFd_ = -1;
551 }
552
553 if (streamReceiver_ != nullptr) {
554 TRANS_LOGI(TRANS_STREAM, "DestroyStreamSocket receiver delete");
555 streamReceiver_->OnStreamStatus(STREAM_CLOSED);
556 streamReceiver_.reset();
557 }
558
559 QuitStreamBuffer();
560 vtpInstance_->UpdateSocketStreamCount(false);
561 isDestroyed_ = true;
562 TRANS_LOGD(TRANS_STREAM, "ok");
563 }
564
Connect(const IpAndPort & remote)565 bool VtpStreamSocket::Connect(const IpAndPort &remote)
566 {
567 if (remote.ip.empty()) {
568 TRANS_LOGE(TRANS_STREAM, "remote addr error, ip is nullptr");
569 DestroyStreamSocket();
570 return false;
571 }
572
573 TRANS_LOGD(TRANS_STREAM,
574 "Connect to server remotePort=%{public}d", remote.port);
575 remoteIpPort_ = remote;
576
577 struct sockaddr_in remoteSockAddr;
578 remoteSockAddr.sin_family = AF_INET;
579 remoteSockAddr.sin_port = htons(static_cast<short>(remote.port));
580 remoteSockAddr.sin_addr.s_addr = inet_addr(remote.ip.c_str());
581
582 int ret = FtConnect(streamFd_, reinterpret_cast<struct sockaddr *>(&remoteSockAddr), sizeof(remoteSockAddr));
583 if (ret != SOFTBUS_OK) {
584 TRANS_LOGE(TRANS_STREAM, "FtConnect failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
585 DestroyStreamSocket();
586 return false;
587 }
588
589 epollFd_ = FtEpollCreate();
590 if (epollFd_ < 0) {
591 TRANS_LOGE(TRANS_STREAM, "Failed to create epoll fd errno=%{public}d", FtGetErrno());
592 DestroyStreamSocket();
593 return false;
594 }
595
596 if (SetSocketEpollMode(streamFd_) != ERR_OK) {
597 TRANS_LOGE(TRANS_STREAM, "SetSocketEpollMode failed, streamFd=%{public}d", streamFd_);
598 DestroyStreamSocket();
599 return false;
600 }
601 isStreamRecv_ = true;
602 TRANS_LOGI(TRANS_STREAM, "Success to connect remote, and create a thread to recv data.");
603
604 CreateClientProcessThread();
605 return true;
606 }
607
EncryptStreamPacket(std::unique_ptr<IStream> stream,std::unique_ptr<char[]> & data,ssize_t & len)608 bool VtpStreamSocket::EncryptStreamPacket(std::unique_ptr<IStream> stream, std::unique_ptr<char[]> &data, ssize_t &len)
609 {
610 StreamPacketizer packet(streamType_, std::move(stream));
611 auto plainData = packet.PacketizeStream();
612 if (plainData == nullptr) {
613 TRANS_LOGE(TRANS_STREAM, "PacketizeStream failed");
614 return false;
615 }
616 len = packet.GetPacketLen() + GetEncryptOverhead();
617 TRANS_LOGD(TRANS_STREAM, "packetLen=%{public}zd, encryptOverhead=%{public}zd",
618 packet.GetPacketLen(), GetEncryptOverhead());
619 data = std::make_unique<char[]>(len + FRAME_HEADER_LEN);
620 ssize_t encLen = Encrypt(plainData.get(), packet.GetPacketLen(), data.get() + FRAME_HEADER_LEN, len);
621 if (encLen != len) {
622 TRANS_LOGE(TRANS_STREAM, "encrypted failed, dataLen=%{public}zd, encLen=%{public}zd", len, encLen);
623 return false;
624 }
625 InsertBufferLength(len, FRAME_HEADER_LEN, reinterpret_cast<uint8_t *>(data.get()));
626 len += FRAME_HEADER_LEN;
627
628 return true;
629 }
630
Send(std::unique_ptr<IStream> stream)631 bool VtpStreamSocket::Send(std::unique_ptr<IStream> stream)
632 {
633 TRANS_LOGD(TRANS_STREAM, "send in... streamType=%{public}d, dataSize=%{public}zd, extSize=%{public}zd",
634 streamType_, stream->GetBufferLen(), stream->GetExtBufferLen());
635
636 if (!isBlocked_) {
637 isBlocked_ = true;
638 if (!SetNonBlockMode(streamFd_, StreamAttr(false))) {
639 TRANS_LOGE(TRANS_STREAM, "set non block mode fail");
640 return false;
641 }
642 }
643
644 int32_t ret = -1;
645 std::unique_ptr<char[]> data = nullptr;
646 ssize_t len = 0;
647
648 const Communication::SoftBus::StreamFrameInfo *streamFrameInfo = stream->GetStreamFrameInfo();
649 if (streamFrameInfo == nullptr) {
650 TRANS_LOGE(TRANS_STREAM, "streamFrameInfo is null");
651 return false;
652 }
653 FrameInfo frameInfo;
654 ConvertStreamFrameInfo2FrameInfo(&frameInfo, streamFrameInfo);
655
656 if (streamType_ == RAW_STREAM) {
657 data = stream->GetBuffer();
658 len = stream->GetBufferLen();
659
660 ret = FtSendFrame(streamFd_, data.get(), len, 0, &frameInfo);
661 } else if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
662 if (!EncryptStreamPacket(std::move(stream), data, len)) {
663 return false;
664 }
665 ret = FtSendFrame(streamFd_, data.get(), len, 0, &frameInfo);
666 }
667
668 if (ret == -1) {
669 TRANS_LOGE(TRANS_STREAM, "send failed, errno=%{public}d", FtGetErrno());
670 return false;
671 }
672
673 TRANS_LOGD(TRANS_STREAM, "send out..., streamType=%{public}d, len=%{public}zd", streamType_, len);
674 return true;
675 }
676
SetOption(int type,const StreamAttr & value)677 bool VtpStreamSocket::SetOption(int type, const StreamAttr &value)
678 {
679 PrintOptionInfo(type, value);
680 auto it = optFuncMap_.find(type);
681 if (it == optFuncMap_.end()) {
682 TRANS_LOGW(TRANS_STREAM, "not found type=%{public}d", type);
683 return false;
684 }
685
686 if (value.GetType() != it->second.valueType) {
687 TRANS_LOGW(TRANS_STREAM,
688 "type=%{public}d, valueType=%{public}d", value.GetType(), it->second.valueType);
689 return false;
690 }
691
692 MySetFunc set = it->second.set;
693 if (set == nullptr) {
694 TRANS_LOGW(TRANS_STREAM, "set is nullptr");
695 return false;
696 }
697
698 if (type == NON_BLOCK || type == TOS) {
699 return (this->*set)(static_cast<int>(streamFd_), value);
700 }
701
702 auto outerIt = FILLP_TYPE_MAP.find(type);
703 if (outerIt != FILLP_TYPE_MAP.end()) {
704 return (this->*set)(outerIt->second, value);
705 }
706
707 auto innerIt = INNER_FILLP_TYPE_MAP.find(type);
708 if (innerIt != INNER_FILLP_TYPE_MAP.end()) {
709 return (this->*set)(innerIt->second, value);
710 }
711
712 return (this->*set)(static_cast<int>(type), value);
713 }
714
GetOption(int type) const715 StreamAttr VtpStreamSocket::GetOption(int type) const
716 {
717 StreamAttr attr {};
718 auto it = optFuncMap_.find(type);
719 if (it != optFuncMap_.end()) {
720 MyGetFunc get = it->second.get;
721 if (get == nullptr) {
722 TRANS_LOGE(TRANS_STREAM, "Can not get option type=%{public}d", type);
723 return std::move(StreamAttr());
724 }
725 if (type == NON_BLOCK) {
726 attr = (this->*get)(static_cast<int>(streamFd_));
727 } else {
728 attr = (this->*get)(static_cast<int>(type));
729 }
730 }
731
732 PrintOptionInfo(type, attr);
733 return attr;
734 }
735
SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)736 bool VtpStreamSocket::SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)
737 {
738 if (receiver == nullptr) {
739 TRANS_LOGW(TRANS_STREAM, "receiver is nullptr");
740 return false;
741 }
742
743 std::lock_guard<std::mutex> guard(streamSocketLock_);
744 streamReceiver_ = receiver;
745 TRANS_LOGI(TRANS_STREAM, "set receiver success");
746 return true;
747 }
748
InitVtpInstance(const std::string & pkgName)749 bool VtpStreamSocket::InitVtpInstance(const std::string &pkgName)
750 {
751 return vtpInstance_->InitVtp(pkgName);
752 }
753
DestroyVtpInstance(const std::string & pkgName)754 void VtpStreamSocket::DestroyVtpInstance(const std::string &pkgName)
755 {
756 TRANS_LOGD(TRANS_STREAM, "enter.");
757 vtpInstance_->DestroyVtp(pkgName);
758 TRANS_LOGD(TRANS_STREAM, "ok");
759 }
760
CreateAndBindSocket(IpAndPort & local,bool isServer)761 int VtpStreamSocket::CreateAndBindSocket(IpAndPort &local, bool isServer)
762 {
763 localIpPort_ = local;
764 vtpInstance_->UpdateSocketStreamCount(true);
765 if (local.ip.empty()) {
766 TRANS_LOGE(TRANS_STREAM, "ip is empty");
767 return -1;
768 }
769
770 int sockFd = FtSocket(AF_INET, SOCK_STREAM, IPPROTO_FILLP);
771 if (sockFd == -1) {
772 TRANS_LOGE(TRANS_STREAM, "FtSocket failed, errno=%{public}d", FtGetErrno());
773 return -1;
774 }
775 if (!isServer) {
776 TRANS_LOGI(TRANS_STREAM, "FtSocket set client, errno=%{public}d", FtGetErrno());
777 streamFd_ = sockFd;
778 }
779 SetDefaultConfig(sockFd);
780 if (local.port < 0 || local.port > MAX_PORT) {
781 return -1;
782 }
783
784 // bind
785 sockaddr_in localSockAddr = { 0 };
786 char host[ADDR_MAX_SIZE] = { 0 };
787 localSockAddr.sin_family = AF_INET;
788 localSockAddr.sin_port = htons((short)local.port);
789 localSockAddr.sin_addr.s_addr = inet_addr(local.ip.c_str());
790 localIpPort_.ip = SoftBusInetNtoP(AF_INET, &(localSockAddr.sin_addr), host, ADDR_MAX_SIZE);
791 if (!SetSocketBoundInner(sockFd, localIpPort_.ip)) {
792 TRANS_LOGE(TRANS_STREAM, "SetSocketBoundInner failed, errno=%{public}d", FtGetErrno());
793 }
794
795 socklen_t localAddrLen = sizeof(localSockAddr);
796 int ret = FtBind(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), localAddrLen);
797 if (ret == -1) {
798 FtClose(sockFd);
799 TRANS_LOGE(TRANS_STREAM, "FtBind failed, errno=%{public}d", FtGetErrno());
800 return -1;
801 }
802
803 // 获取port
804 ret = FtGetSockName(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), &localAddrLen);
805 if (ret != ERR_OK) {
806 TRANS_LOGE(TRANS_STREAM, "getsockname error ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
807 FtClose(sockFd);
808 return -1;
809 }
810
811 localIpPort_.port = static_cast<int32_t>(ntohs(localSockAddr.sin_port));
812 local.port = localIpPort_.port;
813
814 return sockFd;
815 }
816
817
EnableBwEstimationAlgo(int streamFd,bool isServer) const818 bool VtpStreamSocket::EnableBwEstimationAlgo(int streamFd, bool isServer) const
819 {
820 #ifdef FILLP_SUPPORT_BW_DET
821 int errBwDet;
822 if (isServer) {
823 int32_t enableBwDet = FILLP_BW_DET_RX_ENABLE;
824 errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
825 &enableBwDet, sizeof(enableBwDet));
826 } else {
827 int32_t enableBwDet = FILLP_BW_DET_TX_ENABLE;
828 errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
829 &enableBwDet, sizeof(enableBwDet));
830 }
831 if (errBwDet < 0) {
832 TRANS_LOGE(TRANS_STREAM,
833 "Fail to enable bandwidth estimation algorithm for streamFd=%{public}d, errno%{public}d",
834 streamFd, FtGetErrno());
835 return true;
836 } else {
837 TRANS_LOGE(TRANS_STREAM,
838 "Success to enable bandwidth estimation algorithm for stream=Fd%{public}d", streamFd);
839 return false;
840 }
841 #else
842 return true;
843 #endif
844 }
845
EnableJitterDetectionAlgo(int streamFd) const846 bool VtpStreamSocket::EnableJitterDetectionAlgo(int streamFd) const
847 {
848 #ifdef FILLP_SUPPORT_CQE
849 int32_t enableCQE = FILLP_CQE_ENABLE;
850 auto errCQE = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_CQE_ALGO, &enableCQE, sizeof(enableCQE));
851 if (errCQE < 0) {
852 TRANS_LOGE(TRANS_STREAM,
853 "Fail to enable CQE algorithm for streamFd=%{public}d, errno=%{public}d", streamFd, FtGetErrno());
854 return true;
855 } else {
856 TRANS_LOGE(TRANS_STREAM,
857 "Success to enable CQE algorithm for streamFd=%{public}d", streamFd);
858 return false;
859 }
860 #else
861 return true;
862 #endif
863 }
864
EnableDirectlySend(int streamFd) const865 bool VtpStreamSocket::EnableDirectlySend(int streamFd) const
866 {
867 int32_t enable = 1;
868 FILLP_INT ret = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_DIRECTLY_SEND, &enable, sizeof(enable));
869 if (ret < 0) {
870 TRANS_LOGE(TRANS_STREAM,
871 "Fail to enable direct send for streamFd=%{public}d, rrno=%{public}d", streamFd, FtGetErrno());
872 return false;
873 }
874 TRANS_LOGI(TRANS_STREAM, "Success to enable direct send for streamFd=%{public}d", streamFd);
875 return true;
876 }
877
EnableSemiReliable(int streamFd) const878 bool VtpStreamSocket::EnableSemiReliable(int streamFd) const
879 {
880 int32_t enable = 1;
881 FILLP_INT ret = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SEMI_RELIABLE, &enable, sizeof(enable));
882 if (ret < 0) {
883 TRANS_LOGE(TRANS_STREAM,
884 "Fail to enable direct send for streamFd=%{public}d, errno=%{public}d", streamFd, FtGetErrno());
885 return false;
886 }
887 TRANS_LOGI(TRANS_STREAM, "Success to enable semi reliable for streamFd=%{public}d", streamFd);
888 return true;
889 }
890
RegisterMetricCallback(bool isServer)891 void VtpStreamSocket::RegisterMetricCallback(bool isServer)
892 {
893 VtpStreamSocket::AddStreamSocketLock(streamFd_, streamSocketLock_);
894 auto self = this->GetSelf();
895 VtpStreamSocket::AddStreamSocketListener(streamFd_, self);
896 int regStatisticsRet = FtApiRegEventCallbackFunc(FILLP_CONFIG_ALL_SOCKET, FillpStatistics);
897 int value = 1;
898 auto err = FtSetSockOpt(streamFd_, IPPROTO_FILLP, FILLP_SOCK_TRAFFIC, &value, sizeof(value));
899 if (err < 0) {
900 TRANS_LOGE(TRANS_STREAM, "fail to set socket binding to device");
901 return;
902 }
903 TRANS_LOGD(TRANS_STREAM, "FtSetSockOpt start success");
904 if (isServer) {
905 if (regStatisticsRet == 0) {
906 TRANS_LOGI(TRANS_STREAM,
907 "Success to register the stream callback function at server side. streamFd=%{public}d", streamFd_);
908 } else {
909 TRANS_LOGE(TRANS_STREAM,
910 "Fail to register the stream callback function at server side. streamFd=%{public}d, errno=%{public}d",
911 streamFd_, regStatisticsRet);
912 }
913 } else {
914 if (regStatisticsRet == 0) {
915 TRANS_LOGI(TRANS_STREAM,
916 "Success to register the stream callback function at client side. streamFd=%{public}d", streamFd_);
917 } else {
918 TRANS_LOGE(TRANS_STREAM,
919 "Fail to register the stream callback function at client side. streamFd=%{public}d, errno=%{public}d",
920 streamFd_, regStatisticsRet);
921 }
922 }
923 }
924
Accept()925 bool VtpStreamSocket::Accept()
926 {
927 TRANS_LOGD(TRANS_STREAM, "enter.");
928 auto fd = FtAccept(listenFd_, nullptr, nullptr);
929 TRANS_LOGI(TRANS_STREAM, "accept streamFd=%{public}d", fd);
930 if (fd == -1) {
931 TRANS_LOGE(TRANS_STREAM, "errno=%{public}d", FtGetErrno());
932 return false;
933 }
934
935 sockaddr remoteAddr {};
936 socklen_t remoteAddrLen = sizeof(remoteAddr);
937 auto ret = FtGetPeerName(fd, &remoteAddr, &remoteAddrLen);
938 if (ret != ERR_OK) {
939 TRANS_LOGE(TRANS_STREAM, "get name failed, fd=%{public}d", fd);
940 FtClose(fd);
941 return false;
942 }
943
944 char host[ADDR_MAX_SIZE] = { 0 };
945 if (remoteAddr.sa_family == AF_INET) {
946 auto v4Addr = reinterpret_cast<const sockaddr_in *>(&remoteAddr);
947 remoteIpPort_.ip = SoftBusInetNtoP(AF_INET, &(v4Addr->sin_addr), host, ADDR_MAX_SIZE);
948 remoteIpPort_.port = v4Addr->sin_port;
949 } else {
950 auto v6Addr = reinterpret_cast<const sockaddr_in6 *>(&remoteAddr);
951 remoteIpPort_.ip = SoftBusInetNtoP(AF_INET6, &(v6Addr->sin6_addr), host, ADDR_MAX_SIZE);
952 remoteIpPort_.port = v6Addr->sin6_port;
953 }
954
955 TRANS_LOGD(TRANS_STREAM, "Accept a client remotePort=%{public}d", remoteIpPort_.port);
956
957 if (SetSocketEpollMode(fd) != ERR_OK) {
958 TRANS_LOGE(TRANS_STREAM, "SetSocketEpollMode failed, fd=%{public}d", fd);
959 FtClose(fd);
960 return false;
961 }
962
963 std::lock_guard<std::mutex> guard(streamSocketLock_);
964 streamFd_ = fd;
965 configCv_.notify_all();
966
967 if (streamReceiver_ != nullptr) {
968 TRANS_LOGI(TRANS_STREAM, "notify stream connected!");
969 streamReceiver_->OnStreamStatus(STREAM_CONNECTED);
970 }
971
972 bool isServer = true;
973 RegisterMetricCallback(isServer); /* register the callback function */
974 /* enable the bandwidth and CQE estimation algorithms for current ftsocket */
975 #ifdef FILLP_SUPPORT_BW_DET
976 EnableBwEstimationAlgo(streamFd_, isServer);
977 #endif
978 #ifdef FILLP_SUPPORT_CQE
979 EnableJitterDetectionAlgo(streamFd_);
980 #endif
981
982 TRANS_LOGI(TRANS_STREAM, "accept success!");
983 return true;
984 }
985
EpollTimeout(int fd,int timeout)986 int VtpStreamSocket::EpollTimeout(int fd, int timeout)
987 {
988 struct SpungeEpollEvent events[MAX_EPOLL_NUM];
989 (void)memset_s(events, sizeof(events), 0, sizeof(events));
990 while (true) {
991 FILLP_INT fdNum = FtEpollWait(epollFd_, events, MAX_EPOLL_NUM, timeout);
992 if (fdNum <= 0) {
993 TRANS_LOGE(TRANS_STREAM,
994 "FtEpollWait failed, ret=%{public}d, errno=%{public}d", fdNum, FtGetErrno());
995 return -FtGetErrno();
996 }
997
998 for (FILLP_INT i = 0; i < fdNum; i++) {
999 if (events[i].data.fd != fd) {
1000 continue;
1001 }
1002
1003 if (events[i].events & (SPUNGE_EPOLLHUP | SPUNGE_EPOLLERR)) {
1004 TRANS_LOGE(TRANS_STREAM,
1005 "EpollTimeout, something may be wrong in this socket, fd=%{public}d, events=%{public}u", fd,
1006 (unsigned int)events[i].events);
1007 return -1;
1008 }
1009
1010 if (events[i].events & SPUNGE_EPOLLIN) {
1011 return SOFTBUS_OK;
1012 }
1013 }
1014 }
1015 }
1016
SetSocketEpollMode(int fd)1017 int VtpStreamSocket::SetSocketEpollMode(int fd)
1018 {
1019 if (!SetNonBlockMode(fd, StreamAttr(true))) {
1020 TRANS_LOGE(TRANS_STREAM, "SetNonBlockMode failed, errno=%{public}d", FtGetErrno());
1021 return -1;
1022 }
1023
1024 struct SpungeEpollEvent event = {0};
1025 event.events = SPUNGE_EPOLLIN;
1026 event.data.fd = fd;
1027
1028 auto ret = FtEpollCtl(epollFd_, SPUNGE_EPOLL_CTL_ADD, fd, &event);
1029 if (ret != ERR_OK) {
1030 TRANS_LOGE(TRANS_STREAM, "FtEpollCtl failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
1031 return ret;
1032 }
1033
1034 TRANS_LOGD(TRANS_STREAM, "SetNonBlockMode success");
1035 return SOFTBUS_OK;
1036 }
1037
InsertBufferLength(int num,int length,uint8_t * output) const1038 void VtpStreamSocket::InsertBufferLength(int num, int length, uint8_t *output) const
1039 {
1040 for (int i = 0; i < length; i++) {
1041 output[length - 1 - i] = static_cast<unsigned int>(
1042 ((static_cast<unsigned int>(num) >> static_cast<unsigned int>(BYTE_TO_BIT * i))) & INT_TO_BYTE);
1043 }
1044 }
1045
MakeStreamData(StreamData & data,const StreamFrameInfo & info) const1046 std::unique_ptr<IStream> VtpStreamSocket::MakeStreamData(StreamData &data, const StreamFrameInfo &info) const
1047 {
1048 std::unique_ptr<IStream> stream = nullptr;
1049 switch (streamType_) {
1050 case VIDEO_SLICE_STREAM:
1051 TRANS_LOGD(TRANS_STREAM, "do not support VIDEO_SLICE_STREAM streamType=%{public}d", streamType_);
1052 break;
1053 case COMMON_VIDEO_STREAM:
1054 case COMMON_AUDIO_STREAM:
1055 TRANS_LOGD(TRANS_STREAM,
1056 "streamType=%{public}d, seqNum=%{public}d, streamId=%{public}d",
1057 streamType_, info.seqNum, info.streamId);
1058 stream = IStream::MakeCommonStream(data, info);
1059 break;
1060 case RAW_STREAM:
1061 TRANS_LOGD(TRANS_STREAM, "streamType=%{public}d", streamType_);
1062 stream = IStream::MakeRawStream(data, info);
1063 break;
1064 default:
1065 TRANS_LOGE(TRANS_STREAM, "do not support streamType=%{public}d", streamType_);
1066 break;
1067 }
1068 if (stream == nullptr) {
1069 TRANS_LOGE(TRANS_STREAM, "IStream construct error");
1070 return nullptr;
1071 }
1072
1073 return stream;
1074 }
1075
RecvStreamLen()1076 int32_t VtpStreamSocket::RecvStreamLen()
1077 {
1078 int32_t hdrSize = FRAME_HEADER_LEN;
1079 if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1080 hdrSize = streamHdrSize_;
1081 }
1082
1083 int32_t len = -1;
1084 int32_t timeout = -1;
1085 auto buffer = std::make_unique<char[]>(hdrSize);
1086 if (EpollTimeout(streamFd_, timeout) == 0) {
1087 do {
1088 len = FtRecv(streamFd_, buffer.get(), hdrSize, 0);
1089 } while (len <= 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
1090 }
1091 TRANS_LOGD(TRANS_STREAM, "recv frame header, len=%{public}d, scene=%{public}d", len, scene_);
1092
1093 if (len <= 0) {
1094 TRANS_LOGE(TRANS_STREAM, "len invalid, len=%{public}d", len);
1095 return -1;
1096 }
1097
1098 if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1099 std::lock_guard<std::mutex> guard(streamSocketLock_);
1100 if (streamReceiver_ != nullptr) {
1101 return streamReceiver_->OnStreamHdrReceived(std::move(buffer), hdrSize);
1102 }
1103 }
1104
1105 return ntohl(*reinterpret_cast<int *>(buffer.get()));
1106 }
1107
ProcessCommonDataStream(std::unique_ptr<char[]> & dataBuffer,int32_t & dataLength,std::unique_ptr<char[]> & extBuffer,int32_t & extLen,StreamFrameInfo & info)1108 bool VtpStreamSocket::ProcessCommonDataStream(std::unique_ptr<char[]> &dataBuffer,
1109 int32_t &dataLength, std::unique_ptr<char[]> &extBuffer, int32_t &extLen, StreamFrameInfo &info)
1110 {
1111 TRANS_LOGD(TRANS_STREAM, "recv common stream");
1112 int32_t decryptedLength = dataLength;
1113 auto decryptedBuffer = std::move(dataBuffer);
1114
1115 int32_t plainDataLength = decryptedLength - GetEncryptOverhead();
1116 if (plainDataLength <= 0) {
1117 TRANS_LOGE(TRANS_STREAM, "Decrypt failed, invalid decryptedLen=%{public}d", decryptedLength);
1118 return false;
1119 }
1120 std::unique_ptr<char[]> plainData = std::make_unique<char[]>(plainDataLength);
1121 ssize_t decLen = Decrypt(decryptedBuffer.get(), decryptedLength, plainData.get(), plainDataLength);
1122 if (decLen != plainDataLength) {
1123 TRANS_LOGE(TRANS_STREAM,
1124 "Decrypt failed, dataLen=%{public}d, decryptedLen=%{public}zd", plainDataLength, decLen);
1125 return false;
1126 }
1127 auto header = plainData.get();
1128 StreamDepacketizer decode(streamType_);
1129 if (plainDataLength < static_cast<int32_t>(sizeof(CommonHeader))) {
1130 TRANS_LOGE(TRANS_STREAM,
1131 "failed, plainDataLen=%{public}d, CommonHeader=%{public}zu", plainDataLength, sizeof(CommonHeader));
1132 return false;
1133 }
1134 decode.DepacketizeHeader(header);
1135
1136 auto buffer = plainData.get() + sizeof(CommonHeader);
1137 decode.DepacketizeBuffer(buffer, plainDataLength - sizeof(CommonHeader));
1138
1139 extBuffer = decode.GetUserExt();
1140 extLen = decode.GetUserExtSize();
1141 info = decode.GetFrameInfo();
1142 dataBuffer = decode.GetData();
1143 dataLength = decode.GetDataLength();
1144 if (dataLength <= 0) {
1145 TRANS_LOGE(TRANS_STREAM, "common depacketize error, dataLen=%{public}d", dataLength);
1146 return false;
1147 }
1148 return true;
1149 }
1150
1151 int64_t g_diffTime = 0;
1152 int64_t g_startTime = 0;
1153 int64_t g_lastTime = 0;
1154 const int TIME_OUT = 50;
1155 const int WAIT_UPLOAD_TIME = 300000;
1156
DoStreamRecv()1157 void VtpStreamSocket::DoStreamRecv()
1158 {
1159 while (isStreamRecv_) {
1160 std::unique_ptr<char[]> dataBuffer = nullptr;
1161 std::unique_ptr<char[]> extBuffer = nullptr;
1162 int32_t extLen = 0;
1163 StreamFrameInfo info = {};
1164 if (g_startTime != 0) {
1165 g_diffTime = GetSoftbusRecordTimeMillis() - g_startTime;
1166 if (g_diffTime > TIME_OUT && GetSoftbusRecordTimeMillis() - g_lastTime >= WAIT_UPLOAD_TIME) {
1167 g_lastTime = GetSoftbusRecordTimeMillis();
1168 TRANS_LOGW(TRANS_STREAM, "recv stream difftime = %{public}" PRIu64, g_diffTime);
1169 TransEventExtra extra = {
1170 .costTime = (int32_t)g_diffTime,
1171 };
1172 TRANS_EVENT(EVENT_SCENE_TRANS_RECV_STREAM, EVENT_STAGE_TRANS_RECV_STREAM, extra);
1173 }
1174 }
1175 g_startTime = GetSoftbusRecordTimeMillis();
1176 TRANS_LOGD(TRANS_STREAM, "recv stream");
1177 int32_t dataLength = VtpStreamSocket::RecvStreamLen();
1178 if (dataLength <= 0 || dataLength > MAX_STREAM_LEN) {
1179 TRANS_LOGE(TRANS_STREAM, "read frame length error, dataLength=%{public}d", dataLength);
1180 break;
1181 }
1182 TRANS_LOGD(TRANS_STREAM,
1183 "recv a new frame, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1184 dataBuffer = VtpStreamSocket::RecvStream(dataLength);
1185
1186 if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
1187 if (!ProcessCommonDataStream(dataBuffer, dataLength, extBuffer, extLen, info)) {
1188 break;
1189 }
1190 }
1191
1192 StreamData data = { std::move(dataBuffer), dataLength, std::move(extBuffer), extLen };
1193 std::unique_ptr<IStream> stream = MakeStreamData(data, info);
1194 if (stream == nullptr) {
1195 TRANS_LOGE(TRANS_STREAM, "MakeStreamData failed, stream is null");
1196 break;
1197 }
1198
1199 TRANS_LOGD(TRANS_STREAM,
1200 "recv frame done, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1201
1202 if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1203 std::lock_guard<std::mutex> guard(streamSocketLock_);
1204 if (streamReceiver_ != nullptr) {
1205 streamReceiver_->OnStreamReceived(std::move(stream));
1206 continue;
1207 }
1208 }
1209
1210 PutStream(std::move(stream));
1211 TRANS_LOGD(TRANS_STREAM, "put frame done, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1212 }
1213 TRANS_LOGI(TRANS_STREAM, "recv thread exit");
1214 }
1215
RecvStream(int32_t dataLength)1216 std::unique_ptr<char[]> VtpStreamSocket::RecvStream(int32_t dataLength)
1217 {
1218 auto buffer = std::make_unique<char[]>(dataLength);
1219 int32_t recvLen = 0;
1220 while (recvLen < dataLength) {
1221 int32_t ret = -1;
1222 int32_t timeout = -1;
1223
1224 if (EpollTimeout(streamFd_, timeout) == 0) {
1225 do {
1226 ret = FtRecv(streamFd_, (buffer.get() + recvLen), dataLength - recvLen, 0);
1227 } while (ret < 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
1228 }
1229
1230 if (ret == -1) {
1231 TRANS_LOGE(TRANS_STREAM, "read frame failed, errno=%{public}d", FtGetErrno());
1232 return nullptr;
1233 }
1234
1235 recvLen += ret;
1236 }
1237 return std::unique_ptr<char[]>(buffer.release());
1238 }
1239
SetDefaultConfig(int fd)1240 void VtpStreamSocket::SetDefaultConfig(int fd)
1241 {
1242 if (!SetIpTos(fd, StreamAttr(static_cast<int>(IPTOS_LOWDELAY)))) {
1243 TRANS_LOGW(TRANS_STREAM, "SetIpTos failed");
1244 }
1245 // Set Fillp direct sending
1246 if (!EnableDirectlySend(fd)) {
1247 TRANS_LOGW(TRANS_STREAM, "EnableDirectlySend failed");
1248 }
1249
1250 if (!EnableSemiReliable(fd)) {
1251 TRANS_LOGW(TRANS_STREAM, "EnableSemiReliable failed");
1252 }
1253 // Set Fillp Differentiated Transmission
1254 FILLP_BOOL enable = 1;
1255 if (!FtConfigSet(FT_CONF_APP_DIFFER_TRANSMIT, &enable, &fd)) {
1256 TRANS_LOGW(TRANS_STREAM, "Set differ transmit failed");
1257 }
1258
1259 if (!SetOption(RECV_BUF_SIZE, StreamAttr(static_cast<int>(DEFAULT_UDP_BUFFER_RCV_SIZE)))) {
1260 TRANS_LOGW(TRANS_STREAM, "Set recv buff failed");
1261 }
1262
1263 if (!SetOption(SEND_BUF_SIZE, StreamAttr(static_cast<int>(DEFAULT_UDP_BUFFER_SIZE)))) {
1264 TRANS_LOGW(TRANS_STREAM, "Set send buff failed");
1265 }
1266
1267 if (!SetOption(InnerStreamOptionType::RECV_CACHE, StreamAttr(static_cast<int>(FILLP_VTP_RECV_CACHE_SIZE)))) {
1268 TRANS_LOGW(TRANS_STREAM, "Set recv cache failed");
1269 }
1270
1271 if (!SetOption(InnerStreamOptionType::SEND_CACHE, StreamAttr(static_cast<int>(FILLP_VTP_SEND_CACHE_SIZE)))) {
1272 TRANS_LOGW(TRANS_STREAM, "Set send cache failed");
1273 }
1274 }
1275
SetIpTos(int fd,const StreamAttr & tos)1276 bool VtpStreamSocket::SetIpTos(int fd, const StreamAttr &tos)
1277 {
1278 auto tmp = tos.GetIntValue();
1279 if (FtSetSockOpt(fd, IPPROTO_IP, IP_TOS, &tmp, sizeof(tmp)) != ERR_OK) {
1280 TRANS_LOGE(TRANS_STREAM, "SetIpTos wrong! fd=%{public}d, errno=%{public}d", fd, FtGetErrno());
1281 return false;
1282 }
1283
1284 TRANS_LOGD(TRANS_STREAM, "Success to set ip tos: fd=%{public}d, tos=%{public}d", fd, tmp);
1285 return true;
1286 }
1287
GetIpTos(int type) const1288 StreamAttr VtpStreamSocket::GetIpTos(int type) const
1289 {
1290 static_cast<void>(type);
1291 int tos;
1292 int size = sizeof(tos);
1293
1294 if (FtGetSockOpt(streamFd_, IPPROTO_IP, IP_TOS, &tos, &size) != ERR_OK) {
1295 TRANS_LOGE(TRANS_STREAM, "FtGetSockOpt errno=%{public}d", FtGetErrno());
1296 return std::move(StreamAttr());
1297 }
1298
1299 return std::move(StreamAttr(tos));
1300 }
1301
GetStreamSocketFd(int type) const1302 StreamAttr VtpStreamSocket::GetStreamSocketFd(int type) const
1303 {
1304 static_cast<void>(type);
1305 return std::move(StreamAttr(streamFd_));
1306 }
1307
GetListenSocketFd(int type) const1308 StreamAttr VtpStreamSocket::GetListenSocketFd(int type) const
1309 {
1310 static_cast<void>(type);
1311 return std::move(StreamAttr(listenFd_));
1312 }
1313
SetSocketBoundInner(int fd,std::string ip) const1314 bool VtpStreamSocket::SetSocketBoundInner(int fd, std::string ip) const
1315 {
1316 auto boundIp = (ip.empty()) ? localIpPort_.ip : ip;
1317 struct ifaddrs *ifList = nullptr;
1318 if (getifaddrs(&ifList) < 0) {
1319 TRANS_LOGE(TRANS_STREAM,
1320 "get interface address return errno=%{public}d, strerror=%{public}s", errno, strerror(errno));
1321 return false;
1322 }
1323
1324 struct ifaddrs *ifa = nullptr;
1325 for (ifa = ifList; ifa != nullptr; ifa = ifa->ifa_next) {
1326 if (ifa->ifa_addr == nullptr) {
1327 continue;
1328 }
1329 if (ifa->ifa_addr->sa_family != AF_INET) {
1330 continue;
1331 }
1332
1333 char host[ADDR_MAX_SIZE] = { 0 };
1334 std::string devName(ifa->ifa_name);
1335 if (strcmp(boundIp.c_str(), SoftBusInetNtoP(AF_INET, &(((struct sockaddr_in *)ifa->ifa_addr)->sin_addr),
1336 host, ADDR_MAX_SIZE)) == 0) {
1337 TRANS_LOGI(TRANS_STREAM, "current use interface to bind to socket. ifName=%{public}s", ifa->ifa_name);
1338 auto err = FtSetSockOpt(fd, SOL_SOCKET, SO_BINDTODEVICE, devName.c_str(), devName.size());
1339 if (err < 0) {
1340 TRANS_LOGE(TRANS_STREAM, "fail to set socket binding to device");
1341 freeifaddrs(ifList);
1342 return false;
1343 }
1344 break;
1345 }
1346 }
1347 freeifaddrs(ifList);
1348
1349 return true;
1350 }
1351
SetSocketBindToDevices(int type,const StreamAttr & ip)1352 bool VtpStreamSocket::SetSocketBindToDevices(int type, const StreamAttr &ip)
1353 {
1354 static_cast<void>(type);
1355 auto tmp = ip.GetStrValue();
1356 auto boundIp = (tmp.empty()) ? localIpPort_.ip : tmp;
1357 return SetSocketBoundInner(streamFd_, boundIp);
1358 }
1359
SetVtpStackConfigDelayed(int type,const StreamAttr & value)1360 bool VtpStreamSocket::SetVtpStackConfigDelayed(int type, const StreamAttr &value)
1361 {
1362 std::unique_lock<std::mutex> lock(streamSocketLock_);
1363 if (streamFd_ == -1) {
1364 configCv_.wait(lock, [this] { return streamFd_ != -1; });
1365 }
1366 TRANS_LOGD(TRANS_STREAM, "set vtp stack config, streamFd=%{public}d", streamFd_);
1367 return SetVtpStackConfig(type, value);
1368 }
1369
SetVtpStackConfig(int type,const StreamAttr & value)1370 bool VtpStreamSocket::SetVtpStackConfig(int type, const StreamAttr &value)
1371 {
1372 if (streamFd_ == -1) {
1373 TRANS_LOGI(TRANS_STREAM, "set vtp stack config when streamFd is legal, type=%{public}d", type);
1374 auto self = GetSelf();
1375 std::thread([self, type, value]() {
1376 const std::string threadName = "OS_setVtpCfg";
1377 pthread_setname_np(pthread_self(), threadName.c_str());
1378 self->SetVtpStackConfigDelayed(type, value);
1379 }).detach();
1380 return true;
1381 }
1382
1383 if (value.GetType() == INT_TYPE) {
1384 int intVal = value.GetIntValue();
1385 int ret = FtConfigSet(type, &intVal, &streamFd_);
1386 if (ret != SOFTBUS_OK) {
1387 TRANS_LOGE(TRANS_STREAM,
1388 "FtConfigSet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1389 return false;
1390 }
1391
1392 TRANS_LOGI(TRANS_STREAM,
1393 "setVtpConfig success, type=%{public}d, streamFd=%{public}d, value=%{public}d", type, streamFd_, intVal);
1394 return true;
1395 }
1396
1397 if (value.GetType() == BOOL_TYPE) {
1398 bool flag = value.GetBoolValue();
1399 int ret = FtConfigSet(type, &flag, &streamFd_);
1400 if (ret != SOFTBUS_OK) {
1401 TRANS_LOGE(TRANS_STREAM,
1402 "FtConfigSet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1403 return false;
1404 }
1405
1406 TRANS_LOGI(TRANS_STREAM,
1407 "setVtpConfig success, streamFd=%{public}d, flag=%{public}d, type=%{public}d", type, streamFd_, flag);
1408 return true;
1409 }
1410
1411 TRANS_LOGE(TRANS_STREAM, "UNKNOWN TYPE!");
1412 return false;
1413 }
1414
GetVtpStackConfig(int type) const1415 StreamAttr VtpStreamSocket::GetVtpStackConfig(int type) const
1416 {
1417 int intVal = -1;
1418 int configFd = (streamFd_ == -1) ? FILLP_CONFIG_ALL_SOCKET : streamFd_;
1419 int ret = FtConfigGet(type, &intVal, &configFd);
1420 if (ret != SOFTBUS_OK) {
1421 TRANS_LOGE(TRANS_STREAM,
1422 "FtConfigGet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1423 return std::move(StreamAttr());
1424 }
1425
1426 int valType = ValueType::UNKNOWN;
1427 for (auto it = FILLP_TYPE_MAP.begin(); it != FILLP_TYPE_MAP.end(); it++) {
1428 if (it->second != type) {
1429 continue;
1430 }
1431
1432 valType = optFuncMap_.at(it->first).valueType;
1433 break;
1434 }
1435
1436 if (valType != ValueType::UNKNOWN) {
1437 for (auto it = INNER_FILLP_TYPE_MAP.begin(); it != INNER_FILLP_TYPE_MAP.end(); it++) {
1438 if (it->second != type) {
1439 continue;
1440 }
1441
1442 valType = optFuncMap_.at(it->first).valueType;
1443 break;
1444 }
1445 }
1446
1447 if (valType == BOOL_TYPE) {
1448 return std::move(StreamAttr(!!intVal));
1449 }
1450
1451 return std::move(StreamAttr(intVal));
1452 }
1453
SetNonBlockMode(int fd,const StreamAttr & value)1454 bool VtpStreamSocket::SetNonBlockMode(int fd, const StreamAttr &value)
1455 {
1456 FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1457 if (flags < 0) {
1458 TRANS_LOGE(TRANS_STREAM, "failed to get FtFcntl, flags=%{public}d", flags);
1459 flags = 0;
1460 }
1461 bool nonBlock = value.GetBoolValue();
1462
1463 flags = nonBlock ? static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) | O_NONBLOCK)) :
1464 static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) & ~O_NONBLOCK));
1465
1466 FILLP_INT res = FtFcntl(fd, F_SETFL, flags);
1467 if (res < 0) {
1468 TRANS_LOGE(TRANS_STREAM, "failed to set FtFcntl, res=%{public}d", res);
1469 return false;
1470 }
1471
1472 TRANS_LOGI(TRANS_STREAM, "Successfully to set fd=%{public}d, nonBlock=%{public}d", fd, nonBlock);
1473 return true;
1474 }
1475
GetNonBlockMode(int fd) const1476 StreamAttr VtpStreamSocket::GetNonBlockMode(int fd) const
1477 {
1478 FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1479 if (static_cast<unsigned int>(flags) & O_NONBLOCK) {
1480 return std::move(StreamAttr(true));
1481 }
1482
1483 return std::move(StreamAttr(false));
1484 }
1485
GetIp(int type) const1486 StreamAttr VtpStreamSocket::GetIp(int type) const
1487 {
1488 if (type == LOCAL_IP) {
1489 return std::move(StreamAttr(localIpPort_.ip));
1490 }
1491
1492 return std::move(StreamAttr(remoteIpPort_.ip));
1493 }
1494
GetPort(int type) const1495 StreamAttr VtpStreamSocket::GetPort(int type) const
1496 {
1497 if (type == LOCAL_PORT) {
1498 return std::move(StreamAttr(localIpPort_.port));
1499 }
1500 return std::move(StreamAttr(remoteIpPort_.port));
1501 }
1502
SetStreamType(int type,const StreamAttr & value)1503 bool VtpStreamSocket::SetStreamType(int type, const StreamAttr &value)
1504 {
1505 if (type != STREAM_TYPE_INT) {
1506 return false;
1507 }
1508
1509 streamType_ = value.GetIntValue();
1510 return true;
1511 }
1512
GetStreamType(int type) const1513 StreamAttr VtpStreamSocket::GetStreamType(int type) const
1514 {
1515 if (type != STREAM_TYPE_INT) {
1516 return std::move(StreamAttr());
1517 }
1518
1519 return std::move(StreamAttr(streamType_));
1520 }
1521
SetStreamScene(int type,const StreamAttr & value)1522 bool VtpStreamSocket::SetStreamScene(int type, const StreamAttr &value)
1523 {
1524 static_cast<void>(type);
1525 if (value.GetType() != INT_TYPE) {
1526 TRANS_LOGE(TRANS_STREAM, "value.GetType=%{public}d", value.GetType());
1527 return false;
1528 }
1529 scene_ = value.GetIntValue();
1530 TRANS_LOGI(TRANS_STREAM, "Set scene=%{public}d", scene_);
1531 return true;
1532 }
1533
SetStreamHeaderSize(int type,const StreamAttr & value)1534 bool VtpStreamSocket::SetStreamHeaderSize(int type, const StreamAttr &value)
1535 {
1536 static_cast<void>(type);
1537 if (value.GetType() != INT_TYPE) {
1538 TRANS_LOGE(TRANS_STREAM, "value.GetType=%{public}d", value.GetType());
1539 return false;
1540 }
1541 streamHdrSize_ = value.GetIntValue();
1542 TRANS_LOGI(TRANS_STREAM, "Set headerSize=%{public}d", streamHdrSize_);
1543 return true;
1544 }
1545
NotifyStreamListener()1546 void VtpStreamSocket::NotifyStreamListener()
1547 {
1548 while (isStreamRecv_) {
1549 int streamNum = GetStreamNum();
1550 if (streamNum >= STREAM_BUFFER_THRESHOLD) {
1551 TRANS_LOGW(TRANS_STREAM, "Too many data in receiver, streamNum=%{public}d", streamNum);
1552 }
1553
1554 auto stream = TakeStream();
1555 if (stream == nullptr) {
1556 TRANS_LOGE(TRANS_STREAM, "Pop stream failed");
1557 break;
1558 }
1559
1560 std::lock_guard<std::mutex> guard(streamSocketLock_);
1561 if (streamReceiver_ != nullptr) {
1562 TRANS_LOGD(TRANS_STREAM, "notify listener");
1563 streamReceiver_->OnStreamReceived(std::move(stream));
1564 TRANS_LOGD(TRANS_STREAM, "notify listener done.");
1565 }
1566 }
1567 TRANS_LOGI(TRANS_STREAM, "notify thread exit");
1568 }
1569
GetEncryptOverhead() const1570 ssize_t VtpStreamSocket::GetEncryptOverhead() const
1571 {
1572 return OVERHEAD_LEN;
1573 }
1574
Encrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1575 ssize_t VtpStreamSocket::Encrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1576 {
1577 if (in == nullptr || out == nullptr) {
1578 TRANS_LOGE(TRANS_STREAM, "param invalid");
1579 return SOFTBUS_INVALID_PARAM;
1580 }
1581 AesGcmCipherKey cipherKey = {0};
1582
1583 if (inLen - OVERHEAD_LEN > outLen) {
1584 TRANS_LOGE(TRANS_STREAM, "Encrypt invalid para.");
1585 return SOFTBUS_INVALID_PARAM;
1586 }
1587
1588 cipherKey.keyLen = SESSION_KEY_LENGTH;
1589 if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.first, sessionKey_.second) != EOK) {
1590 TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
1591 return SOFTBUS_MEM_ERR;
1592 }
1593
1594 int ret = SoftBusEncryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1595 (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1596 if (ret != SOFTBUS_OK || outLen != inLen + OVERHEAD_LEN) {
1597 TRANS_LOGE(TRANS_STREAM, "Encrypt Data fail. ret=%{public}d", ret);
1598 return SOFTBUS_ENCRYPT_ERR;
1599 }
1600 return outLen;
1601 }
1602
Decrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1603 ssize_t VtpStreamSocket::Decrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1604 {
1605 if (in == nullptr || out == nullptr) {
1606 TRANS_LOGE(TRANS_STREAM, "param invalid");
1607 return SOFTBUS_INVALID_PARAM;
1608 }
1609 AesGcmCipherKey cipherKey = {0};
1610
1611 if (inLen - OVERHEAD_LEN > outLen) {
1612 TRANS_LOGE(TRANS_STREAM, "Decrypt invalid para.");
1613 return SOFTBUS_INVALID_PARAM;
1614 }
1615
1616 cipherKey.keyLen = SESSION_KEY_LENGTH; // 256 bit encryption
1617 if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.first, sessionKey_.second) != EOK) {
1618 TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
1619 return SOFTBUS_MEM_ERR;
1620 }
1621 int ret = SoftBusDecryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1622 (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1623 if (ret != SOFTBUS_OK) {
1624 TRANS_LOGE(TRANS_STREAM, "Decrypt Data fail. ret=%{public}d ", ret);
1625 return SOFTBUS_DECRYPT_ERR;
1626 }
1627
1628 return outLen;
1629 }
1630
VtpSetSocketMultiLayerPacked(int fd,OnFrameEvt * cb,const void * para)1631 static int32_t VtpSetSocketMultiLayerPacked(int fd, OnFrameEvt *cb, const void *para)
1632 {
1633 ClientEnhanceFuncList *pfnClientEnhanceFuncList = ClientEnhanceFuncListGet();
1634 if (pfnClientEnhanceFuncList->vtpSetSocketMultiLayer == nullptr) {
1635 TRANS_LOGE(TRANS_STREAM, "Func vtpSetSocketMultiLayer is not registered");
1636 return SOFTBUS_FUNC_NOT_SUPPORT;
1637 }
1638 return pfnClientEnhanceFuncList->vtpSetSocketMultiLayer(fd, cb, para);
1639 }
1640
SetMultiLayer(const void * para)1641 int32_t VtpStreamSocket::SetMultiLayer(const void *para)
1642 {
1643 int fd = GetStreamSocketFd(FD).GetIntValue();
1644 return VtpSetSocketMultiLayerPacked(fd, &onStreamEvtCb_, para);
1645 }
1646
CreateServerProcessThread()1647 void VtpStreamSocket::CreateServerProcessThread()
1648 {
1649 auto self = this->GetSelf();
1650 std::thread([self]() {
1651 const std::string threadName = "OS_sntfStmLsn";
1652 pthread_setname_np(pthread_self(), threadName.c_str());
1653 self->NotifyStreamListener();
1654 }).detach();
1655
1656 std::thread([self]() {
1657 const std::string threadName = "OS_sdstyStmSkt";
1658 pthread_setname_np(pthread_self(), threadName.c_str());
1659 if (!self->Accept()) {
1660 self->DestroyStreamSocket();
1661 return;
1662 }
1663 self->DoStreamRecv();
1664 self->DestroyStreamSocket();
1665 }).detach();
1666
1667 bool &isDestroyed = isDestroyed_;
1668 std::thread([self, &isDestroyed]() {
1669 const std::string threadName = "OS_sfillStatic";
1670 pthread_setname_np(pthread_self(), threadName.c_str());
1671 while (!isDestroyed) {
1672 self->FillpAppStatistics();
1673 std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
1674 }
1675 }).detach();
1676 }
1677
CreateClientProcessThread()1678 void VtpStreamSocket::CreateClientProcessThread()
1679 {
1680 auto self = this->GetSelf();
1681 std::thread([self]() {
1682 const std::string threadName = "OS_cntfStmLsn";
1683 pthread_setname_np(pthread_self(), threadName.c_str());
1684 self->NotifyStreamListener();
1685 }).detach();
1686
1687 std::thread([self]() {
1688 const std::string threadName = "OS_cdstyStmSkt";
1689 pthread_setname_np(pthread_self(), threadName.c_str());
1690 self->DoStreamRecv();
1691 self->DestroyStreamSocket();
1692 }).detach();
1693
1694 bool &isDestroyed = isDestroyed_;
1695 std::thread([self, &isDestroyed]() {
1696 const std::string threadName = "OS_cfillStatic";
1697 pthread_setname_np(pthread_self(), threadName.c_str());
1698 while (!isDestroyed) {
1699 self->FillpAppStatistics();
1700 std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
1701 }
1702 }).detach();
1703 }
1704 } // namespace SoftBus
1705 } // namespace Communication
1706