1 /*
2 * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "vtp_stream_socket.h"
17
18 #include <ifaddrs.h>
19 #include <thread>
20
21 #include "softbus_adapter_crypto.h"
22 #include "softbus_adapter_socket.h"
23 #include "stream_depacketizer.h"
24 #include "stream_packetizer.h"
25
26 namespace Communication {
27 namespace SoftBus {
28 bool g_logOn = false;
29 const int FEED_BACK_PERIOD = 1; /* feedback period of fillp stream traffic statistics is 1s */
30 const int MS_PER_SECOND = 1000;
31 const int US_PER_MS = 1000;
32
33 namespace {
PrintOptionInfo(int type,const StreamAttr & value)34 void PrintOptionInfo(int type, const StreamAttr &value)
35 {
36 switch (value.GetType()) {
37 case INT_TYPE:
38 TRANS_LOGI(TRANS_STREAM,
39 "Int option: type=%{public}d, value=%{public}d", type, value.GetIntValue());
40 break;
41 case BOOL_TYPE:
42 TRANS_LOGI(TRANS_STREAM,
43 "Bool option: type=%{public}d, value=%{public}d", type, value.GetBoolValue());
44 break;
45 case STRING_TYPE:
46 TRANS_LOGD(TRANS_STREAM,
47 "String option: type=%{public}d, value=%{public}s", type, value.GetStrValue().c_str());
48 break;
49 default:
50 TRANS_LOGE(TRANS_STREAM, "Wrong StreamAttr!");
51 (void)type;
52 }
53 }
54 } // namespace
55 std::shared_ptr<VtpInstance> VtpStreamSocket::vtpInstance_ = VtpInstance::GetVtpInstance();
56
57 std::map<int, std::mutex &> VtpStreamSocket::g_streamSocketLockMap;
58 std::mutex VtpStreamSocket::g_streamSocketLockMapLock_;
59 std::map<int, std::shared_ptr<VtpStreamSocket>> VtpStreamSocket::g_streamSocketMap;
60 std::mutex VtpStreamSocket::g_streamSocketMapLock_;
61
ConvertStreamFrameInfo2FrameInfo(FrameInfo * frameInfo,const Communication::SoftBus::StreamFrameInfo * streamFrameInfo)62 static inline void ConvertStreamFrameInfo2FrameInfo(FrameInfo* frameInfo,
63 const Communication::SoftBus::StreamFrameInfo* streamFrameInfo)
64 {
65 frameInfo->frameType = (FILLP_INT)(streamFrameInfo->frameType);
66 frameInfo->seqNum = (FILLP_INT)(streamFrameInfo->seqNum);
67 frameInfo->subSeqNum = (FILLP_INT)(streamFrameInfo->seqSubNum);
68 frameInfo->level = (FILLP_INT)(streamFrameInfo->level);
69 frameInfo->timestamp = (FILLP_SLONG)streamFrameInfo->timeStamp;
70 frameInfo->bitMap = (FILLP_UINT32)streamFrameInfo->bitMap;
71 }
72
AddStreamSocketLock(int fd,std::mutex & streamsocketlock)73 void VtpStreamSocket::AddStreamSocketLock(int fd, std::mutex &streamsocketlock)
74 {
75 std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
76 if (!g_streamSocketLockMap.empty() && g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
77 TRANS_LOGE(TRANS_STREAM, "streamsocketlock for the fd already exists. fd=%{public}d", fd);
78 return;
79 }
80
81 g_streamSocketLockMap.emplace(std::pair<int, std::mutex &>(fd, streamsocketlock));
82 }
83
AddStreamSocketListener(int fd,std::shared_ptr<VtpStreamSocket> streamreceiver)84 void VtpStreamSocket::AddStreamSocketListener(int fd, std::shared_ptr<VtpStreamSocket> streamreceiver)
85 {
86 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
87 if (!g_streamSocketMap.empty() && g_streamSocketMap.find(fd) != g_streamSocketMap.end()) {
88 TRANS_LOGE(TRANS_STREAM, "streamreceiver for the fd already exists. fd=%{public}d", fd);
89 return;
90 }
91
92 g_streamSocketMap.emplace(std::pair<int, std::shared_ptr<VtpStreamSocket>>(fd, streamreceiver));
93 }
94
RemoveStreamSocketLock(int fd)95 void VtpStreamSocket::RemoveStreamSocketLock(int fd)
96 {
97 std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
98 if (g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
99 g_streamSocketLockMap.erase(fd);
100 TRANS_LOGI(TRANS_STREAM, "Remove streamsocketlock for the fd success. fd=%{public}d", fd);
101 } else {
102 TRANS_LOGE(TRANS_STREAM,
103 "Streamsocketlock for the fd not exist in the map. fd=%{public}d", fd);
104 }
105 }
106
RemoveStreamSocketListener(int fd)107 void VtpStreamSocket::RemoveStreamSocketListener(int fd)
108 {
109 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
110 if (g_streamSocketMap.find(fd) != g_streamSocketMap.end()) {
111 g_streamSocketMap.erase(fd);
112 TRANS_LOGI(TRANS_STREAM, "Remove streamreceiver for the fd success. fd=%{public}d", fd);
113 } else {
114 TRANS_LOGE(TRANS_STREAM, "Streamreceiver for the fd not exist in the map. fd=%{public}d", fd);
115 }
116 }
117
InsertElementToFuncMap(int type,ValueType valueType,MySetFunc set,MyGetFunc get)118 void VtpStreamSocket::InsertElementToFuncMap(int type, ValueType valueType, MySetFunc set, MyGetFunc get)
119 {
120 OptionFunc fun = {
121 valueType, set, get
122 };
123 optFuncMap_.insert(std::pair<int, OptionFunc>(type, fun));
124 }
125
VtpStreamSocket()126 VtpStreamSocket::VtpStreamSocket()
127 {
128 InsertElementToFuncMap(TOS, INT_TYPE, &VtpStreamSocket::SetIpTos, &VtpStreamSocket::GetIpTos);
129 InsertElementToFuncMap(FD, INT_TYPE, nullptr, &VtpStreamSocket::GetStreamSocketFd);
130 InsertElementToFuncMap(SERVER_FD, INT_TYPE, nullptr, &VtpStreamSocket::GetListenSocketFd);
131 InsertElementToFuncMap(LOCAL_IP, INT_TYPE, nullptr, &VtpStreamSocket::GetIp);
132 InsertElementToFuncMap(LOCAL_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
133 InsertElementToFuncMap(REMOTE_IP, STRING_TYPE, nullptr, &VtpStreamSocket::GetIp);
134 InsertElementToFuncMap(REMOTE_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
135 InsertElementToFuncMap(BOUND_INTERFACE_IP, STRING_TYPE, &VtpStreamSocket::SetSocketBindToDevices, nullptr);
136 InsertElementToFuncMap(IP_TYPE, STRING_TYPE, nullptr, &VtpStreamSocket::GetIpType);
137 InsertElementToFuncMap(REMOTE_SCOPE_ID, INT_TYPE, nullptr, &VtpStreamSocket::GetRemoteScopeId);
138 InsertElementToFuncMap(NON_BLOCK, BOOL_TYPE, &VtpStreamSocket::SetNonBlockMode, &VtpStreamSocket::GetNonBlockMode);
139 InsertElementToFuncMap(KEEP_ALIVE_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig, nullptr);
140 InsertElementToFuncMap(SEND_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
141 &VtpStreamSocket::GetVtpStackConfig);
142 InsertElementToFuncMap(RECV_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
143 &VtpStreamSocket::GetVtpStackConfig);
144 InsertElementToFuncMap(SEND_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
145 &VtpStreamSocket::GetVtpStackConfig);
146 InsertElementToFuncMap(RECV_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
147 &VtpStreamSocket::GetVtpStackConfig);
148 InsertElementToFuncMap(PACKET_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
149 &VtpStreamSocket::GetVtpStackConfig);
150 InsertElementToFuncMap(MAX_VTP_SOCKET_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
151 &VtpStreamSocket::GetVtpStackConfig);
152 InsertElementToFuncMap(MAX_VTP_CONNECT_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
153 &VtpStreamSocket::GetVtpStackConfig);
154 InsertElementToFuncMap(REDUNANCY_SWITCH, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
155 &VtpStreamSocket::GetVtpStackConfig);
156 InsertElementToFuncMap(REDUNANCY_LEVEL, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
157 &VtpStreamSocket::GetVtpStackConfig);
158 InsertElementToFuncMap(NACK_DELAY, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
159 &VtpStreamSocket::GetVtpStackConfig);
160 InsertElementToFuncMap(NACK_DELAY_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
161 &VtpStreamSocket::GetVtpStackConfig);
162 InsertElementToFuncMap(PACK_INTERVAL_ENLARGE, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
163 &VtpStreamSocket::GetVtpStackConfig);
164 InsertElementToFuncMap(STREAM_TYPE_INT, INT_TYPE, &VtpStreamSocket::SetStreamType, &VtpStreamSocket::GetStreamType);
165 InsertElementToFuncMap(IS_SERVER, INT_TYPE, nullptr, &VtpStreamSocket::IsServer);
166 InsertElementToFuncMap(SCENE, INT_TYPE, &VtpStreamSocket::SetStreamScene, nullptr);
167 InsertElementToFuncMap(STREAM_HEADER_SIZE, INT_TYPE, &VtpStreamSocket::SetStreamHeaderSize, nullptr);
168
169 scene_ = UNKNOWN_SCENE;
170 }
171
~VtpStreamSocket()172 VtpStreamSocket::~VtpStreamSocket()
173 {
174 TRANS_LOGW(TRANS_STREAM, "~VtpStreamSocket");
175 }
176
GetSelf()177 std::shared_ptr<VtpStreamSocket> VtpStreamSocket::GetSelf()
178 {
179 return shared_from_this();
180 }
181
HandleFillpFrameStats(int fd,const FtEventCbkInfo * info)182 int VtpStreamSocket::HandleFillpFrameStats(int fd, const FtEventCbkInfo *info)
183 {
184 if (info == nullptr) {
185 TRANS_LOGE(TRANS_STREAM, "stats info is nullptr");
186 return SOFTBUS_INVALID_PARAM;
187 }
188 StreamSendStats stats = {};
189 if (memcpy_s(&stats, sizeof(StreamSendStats), &info->info.frameSendStats,
190 sizeof(info->info.frameSendStats)) != EOK) {
191 TRANS_LOGE(TRANS_STREAM, "streamStats info memcpy fail");
192 return SOFTBUS_MEM_ERR;
193 }
194
195 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
196 auto itListener = g_streamSocketMap.find(fd);
197 if (itListener != g_streamSocketMap.end()) {
198 if (itListener->second->streamReceiver_ != nullptr) {
199 TRANS_LOGD(TRANS_STREAM, "OnFrameStats enter");
200 itListener->second->streamReceiver_->OnFrameStats(&stats);
201 } else {
202 TRANS_LOGE(TRANS_STREAM, "streamReceiver_ is nullptr");
203 }
204 } else {
205 TRANS_LOGE(TRANS_STREAM, "StreamReceiver for the fd is empty in the map. fd=%{public}d", fd);
206 }
207 return SOFTBUS_OK;
208 }
209
HandleRipplePolicy(int fd,const FtEventCbkInfo * info)210 int VtpStreamSocket::HandleRipplePolicy(int fd, const FtEventCbkInfo *info)
211 {
212 if (info == nullptr) {
213 TRANS_LOGE(TRANS_STREAM, "policy info is nullptr");
214 return SOFTBUS_INVALID_PARAM;
215 }
216 TrafficStats stats;
217 (void)memset_s(&stats, sizeof(TrafficStats), 0, sizeof(TrafficStats));
218 if (memcpy_s(&stats.stats, sizeof(stats.stats), info->info.trafficData.stats,
219 sizeof(info->info.trafficData.stats)) != EOK) {
220 TRANS_LOGE(TRANS_STREAM, "RipplePolicy info memcpy fail");
221 return SOFTBUS_MEM_ERR;
222 }
223 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
224 auto itListener = g_streamSocketMap.find(fd);
225 if (itListener != g_streamSocketMap.end()) {
226 if (itListener->second->streamReceiver_ != nullptr) {
227 TRANS_LOGI(TRANS_STREAM, "OnRippleStats enter");
228 itListener->second->streamReceiver_->OnRippleStats(&stats);
229 } else {
230 TRANS_LOGE(TRANS_STREAM, "OnRippleStats streamReceiver_ is nullptr");
231 }
232 } else {
233 TRANS_LOGE(TRANS_STREAM,
234 "OnRippleStats streamReceiver for the fd is empty in the map. fd=%{public}d", fd);
235 }
236 return SOFTBUS_OK;
237 }
238
HandleFillpFrameEvt(int fd,const FtEventCbkInfo * info)239 int VtpStreamSocket::HandleFillpFrameEvt(int fd, const FtEventCbkInfo *info)
240 {
241 if (info == nullptr) {
242 TRANS_LOGE(TRANS_STREAM, "fd is %{public}d, info is nullptr", fd);
243 return SOFTBUS_INVALID_PARAM;
244 }
245 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
246 auto itListener = g_streamSocketMap.find(fd);
247 if (itListener != g_streamSocketMap.end()) {
248 return itListener->second->HandleFillpFrameEvtInner(fd, info);
249 } else {
250 TRANS_LOGE(TRANS_STREAM, "OnFillpFrameEvt for the fd is empty in the map. fd=%{public}d", fd);
251 }
252 return SOFTBUS_OK;
253 }
254
HandleFillpFrameEvtInner(int fd,const FtEventCbkInfo * info)255 int VtpStreamSocket::HandleFillpFrameEvtInner(int fd, const FtEventCbkInfo *info)
256 {
257 if (onStreamEvtCb_ != nullptr) {
258 TRANS_LOGD(TRANS_STREAM, "onStreamEvtCb_ enter");
259 return HandleVtpFrameEvt(fd, onStreamEvtCb_, info);
260 } else {
261 TRANS_LOGD(TRANS_STREAM, "onStreamEvtCb_ is nullptr");
262 }
263 return SOFTBUS_OK;
264 }
265
266 #ifdef FILLP_SUPPORT_BW_DET
FillSupportDet(int fd,const FtEventCbkInfo * info,QosTv * metricList)267 void VtpStreamSocket::FillSupportDet(int fd, const FtEventCbkInfo *info, QosTv *metricList)
268 {
269 if (info == nullptr || metricList == nullptr) {
270 TRANS_LOGE(TRANS_STREAM, "info or metricList is nullptr");
271 return;
272 }
273 if (info->evt == FT_EVT_BW_DET) {
274 TRANS_LOGI(TRANS_STREAM,
275 "[Metric Return]: Fillp bandwidth information of socket fd=%{public}d is returned", fd);
276 TRANS_LOGI(TRANS_STREAM,
277 "[Metric Return]: Changed amount of current available bandwidth=%{public}d", info->info.bwInfo.bwStat);
278 TRANS_LOGI(TRANS_STREAM,
279 "[Metric Return]: Current bandwidth for receiving data rate=%{public}d kbps", info->info.bwInfo.rate);
280 metricList->type = BANDWIDTH_ESTIMATE_VALUE;
281 metricList->info.bandwidthInfo.trend = info->info.bwInfo.bwStat;
282 metricList->info.bandwidthInfo.rate = info->info.bwInfo.rate;
283 }
284 if (info->evt == FT_EVT_JITTER_DET) {
285 TRANS_LOGI(TRANS_STREAM,
286 "[Metric Return]: Fillp connection quality information of socket fd=%{public}d is returned", fd);
287 TRANS_LOGI(TRANS_STREAM,
288 "[Metric Return]: Predeicted network condition jitterLevel=%{public}d", info->info.jitterInfo.jitterLevel);
289 TRANS_LOGI(TRANS_STREAM,
290 "[Metric Return]: Current available receiving buffer time=%{public}d ms", info->info.jitterInfo.bufferTime);
291 metricList->type = JITTER_DETECTION_VALUE;
292 metricList->info.jitterInfo.jitterLevel = info->info.jitterInfo.jitterLevel;
293 metricList->info.jitterInfo.bufferTime = info->info.jitterInfo.bufferTime;
294 }
295 }
296 #endif
297
298 /* This function is used to prompt the metrics returned by FtApiRegEventCallbackFunc() function */
FillpStatistics(int fd,const FtEventCbkInfo * info)299 int VtpStreamSocket::FillpStatistics(int fd, const FtEventCbkInfo *info)
300 {
301 if (info == nullptr || fd < 0) {
302 TRANS_LOGE(TRANS_STREAM, "param invalid fd is %{public}d", fd);
303 return SOFTBUS_INVALID_PARAM;
304 }
305 if (info->evt == FT_EVT_FRAME_STATS) {
306 TRANS_LOGI(TRANS_STREAM, "recv fillp frame stats");
307 return HandleFillpFrameStats(fd, info);
308 } else if (info->evt == FT_EVT_TRAFFIC_DATA) {
309 TRANS_LOGI(TRANS_STREAM, "recv fillp traffic data");
310 return HandleRipplePolicy(fd, info);
311 } else if (IsVtpFrameSentEvt(info)) {
312 TRANS_LOGI(TRANS_STREAM, "fd %{public}d recv fillp frame send evt", fd);
313 return HandleFillpFrameEvt(fd, info);
314 }
315 #ifdef FILLP_SUPPORT_BW_DET
316 if (info->evt == FT_EVT_BW_DET || info->evt == FT_EVT_JITTER_DET) {
317 int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
318 int16_t tvCount = 1;
319 QosTv metricList = {};
320
321 FillSupportDet(fd, info, &metricList);
322 metricList.info.wifiChannelInfo = {};
323 metricList.info.frameStatusInfo = {};
324
325 std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
326 auto itLock = g_streamSocketLockMap.find(fd);
327 if (itLock != g_streamSocketLockMap.end()) {
328 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
329 auto itListener = g_streamSocketMap.find(fd);
330 if (itListener != g_streamSocketMap.end()) {
331 std::thread([itListener, eventId, tvCount, metricList, &itLock]() {
332 const std::string threadName = "OS_qosEvent";
333 pthread_setname_np(pthread_self(), threadName.c_str());
334 std::lock_guard<std::mutex> guard(itLock->second);
335 itListener->second->OnQosEvent(eventId, tvCount, &metricList);
336 }).detach();
337 } else {
338 TRANS_LOGE(TRANS_STREAM, "StreamReceiver for fd=%{public}d is empty in the map", fd);
339 }
340 } else {
341 TRANS_LOGE(TRANS_STREAM, "StreamSocketLock for fd=%{public}d is empty in the map", fd);
342 }
343 } else {
344 TRANS_LOGE(TRANS_STREAM,
345 "[Metric Return]: Fail to retrieve bandwidth and connection quality information");
346 return -1;
347 }
348 #endif
349 return SOFTBUS_OK;
350 }
351
FillpAppStatistics()352 void VtpStreamSocket::FillpAppStatistics()
353 {
354 int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
355 int16_t tvCount = 1;
356 QosTv metricList = {};
357 FillpStatisticsPcb fillpPcbStats = {};
358 SoftBusSysTime fillpStatsGetTime = {0};
359
360 int getStatisticsRet = FtFillpStatsGet(streamFd_, &fillpPcbStats);
361 SoftBusGetTime(&fillpStatsGetTime);
362 if (getStatisticsRet == 0) {
363 metricList.type = STREAM_TRAFFIC_STASTICS;
364 metricList.info.appStatistics.statisticsGotTime = static_cast<uint64_t>((fillpStatsGetTime.sec *
365 MS_PER_SECOND + fillpStatsGetTime.usec / US_PER_MS)); /* ms */
366 metricList.info.appStatistics.periodRecvBits =
367 static_cast<uint64_t>(fillpPcbStats.appFcStastics.periodRecvBits);
368 metricList.info.appStatistics.pktNum = fillpPcbStats.appFcStastics.pktNum;
369 metricList.info.appStatistics.periodRecvPkts = fillpPcbStats.appFcStastics.periodRecvPkts;
370 metricList.info.appStatistics.periodRecvPktLoss = fillpPcbStats.appFcStastics.periodRecvPktLoss;
371 metricList.info.appStatistics.periodRecvRate = fillpPcbStats.appFcStastics.periodRecvRate;
372 metricList.info.appStatistics.periodRecvRateBps = fillpPcbStats.appFcStastics.periodRecvRateBps;
373 metricList.info.appStatistics.periodRtt = fillpPcbStats.appFcStastics.periodRtt;
374 metricList.info.appStatistics.periodRecvPktLossHighPrecision =
375 fillpPcbStats.appFcStastics.periodRecvPktLossHighPrecision;
376 metricList.info.appStatistics.periodSendLostPkts = fillpPcbStats.appFcStastics.periodSendLostPkts;
377 metricList.info.appStatistics.periodSendPkts = fillpPcbStats.appFcStastics.periodSendPkts;
378 metricList.info.appStatistics.periodSendPktLossHighPrecision =
379 fillpPcbStats.appFcStastics.periodSendPktLossHighPrecision;
380 metricList.info.appStatistics.periodSendBits = fillpPcbStats.appFcStastics.periodSendBits;
381 metricList.info.appStatistics.periodSendRateBps = fillpPcbStats.appFcStastics.periodSendRateBps;
382
383 TRANS_LOGD(TRANS_STREAM,
384 "Succeed to get fillp statistics information for streamfd=%{public}d", streamFd_);
385 TRANS_LOGD(TRANS_STREAM,
386 "[Metric Return]: periodRtt=%{public}d", fillpPcbStats.appFcStastics.periodRtt);
387
388 std::lock_guard<std::mutex> guard(streamSocketLock_);
389
390 if (streamReceiver_ != nullptr) {
391 TRANS_LOGD(TRANS_STREAM,
392 "[Metric Notify]: Fillp traffic statistics information of socket is notified. streamfd=%{public}d",
393 streamFd_);
394 streamReceiver_->OnQosEvent(eventId, tvCount, &metricList);
395 } else {
396 TRANS_LOGE(TRANS_STREAM, "StreamReceiver for the streamFd is empty. streamFd=%{public}d", streamFd_);
397 }
398 } else {
399 TRANS_LOGE(TRANS_STREAM,
400 "Fail to get fillp statistics information for the streamfd. streamfd=%{public}d, errno=%{public}d",
401 streamFd_, FtGetErrno());
402 }
403 }
404
CreateClient(IpAndPort & local,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)405 bool VtpStreamSocket::CreateClient(IpAndPort &local, int streamType, std::pair<uint8_t*, uint32_t> sessionKey)
406 {
407 int fd = CreateAndBindSocket(local, false);
408 if (fd == -1) {
409 TRANS_LOGE(TRANS_STREAM, "CreateAndBindSocket failed, errno=%{public}d", FtGetErrno());
410 DestroyStreamSocket();
411 return false;
412 }
413
414 sessionKey_.second = sessionKey.second;
415 if (sessionKey_.first == nullptr) {
416 sessionKey_.first = new uint8_t[sessionKey_.second];
417 }
418 if (memcpy_s(sessionKey_.first, sessionKey_.second, sessionKey.first, sessionKey.second) != EOK) {
419 TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
420 return false;
421 }
422
423 streamType_ = streamType;
424 std::lock_guard<std::mutex> guard(streamSocketLock_);
425 streamFd_ = fd;
426 configCv_.notify_all();
427
428 TRANS_LOGI(TRANS_STREAM,
429 "Success to create a client socket. fd=%{public}d, streamType=%{public}d", fd, streamType);
430 return true;
431 }
432
CreateClient(IpAndPort & local,const IpAndPort & remote,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)433 bool VtpStreamSocket::CreateClient(IpAndPort &local, const IpAndPort &remote, int streamType,
434 std::pair<uint8_t*, uint32_t> sessionKey)
435 {
436 if (!CreateClient(local, streamType, sessionKey)) {
437 return false;
438 }
439 /* enable the bandwidth and CQE estimation algorithms by FtSetSockOpt() for current ftsocket */
440 #ifdef FILLP_SUPPORT_BW_DET
441 bool isServer = false;
442 EnableBwEstimationAlgo(streamFd_, isServer);
443 #endif
444
445 bool connectRet = Connect(remote);
446 if (connectRet) {
447 bool isServer = false;
448 RegisterMetricCallback(isServer); /* register the callback function */
449 }
450 return connectRet;
451 }
452
CreateServer(IpAndPort & local,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)453 bool VtpStreamSocket::CreateServer(IpAndPort &local, int streamType, std::pair<uint8_t*, uint32_t> sessionKey)
454 {
455 TRANS_LOGD(TRANS_STREAM, "enter.");
456 listenFd_ = CreateAndBindSocket(local, true);
457 if (listenFd_ == -1) {
458 TRANS_LOGE(TRANS_STREAM, "create listenFd failed, errno=%{public}d", FtGetErrno());
459 DestroyStreamSocket();
460 return false;
461 }
462
463 bool ret = FtListen(listenFd_, MAX_CONNECTION_VALUE);
464 if (ret) {
465 TRANS_LOGE(TRANS_STREAM, "FtListen failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
466 DestroyStreamSocket();
467 return false;
468 }
469
470 epollFd_ = FtEpollCreate();
471 if (epollFd_ < 0) {
472 TRANS_LOGE(TRANS_STREAM, "Failed to create epoll fd errno=%{public}d", FtGetErrno());
473 DestroyStreamSocket();
474 return false;
475 }
476 isStreamRecv_ = true;
477 streamType_ = streamType;
478 sessionKey_.second = sessionKey.second;
479 if (sessionKey_.first == nullptr) {
480 sessionKey_.first = new uint8_t[sessionKey_.second];
481 }
482 if (memcpy_s(sessionKey_.first, sessionKey_.second, sessionKey.first, sessionKey.second) != EOK) {
483 TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
484 return false;
485 }
486
487 CreateServerProcessThread();
488 TRANS_LOGI(TRANS_STREAM,
489 "CreateServer end, listenFd=%{public}d, epollFd=%{public}d, streamType=%{public}d", listenFd_, epollFd_,
490 streamType_);
491 return true;
492 }
493
DestroyStreamSocket()494 void VtpStreamSocket::DestroyStreamSocket()
495 {
496 TRANS_LOGD(TRANS_STREAM, "enter.");
497 std::lock_guard<std::mutex> guard(streamSocketLock_);
498 if (isDestroyed_) {
499 TRANS_LOGI(TRANS_STREAM, "StreamSocket is already destroyed");
500 return;
501 }
502 if (listenFd_ != -1) {
503 TRANS_LOGI(TRANS_STREAM, "listenFd_ enter FtClose");
504 FtClose(listenFd_);
505 listenFd_ = -1;
506 }
507
508 if (streamFd_ != -1) {
509 RemoveStreamSocketLock(streamFd_); /* remove the socket lock from the map */
510 RemoveStreamSocketListener(streamFd_); /* remove the socket listener from the map */
511 TRANS_LOGI(TRANS_STREAM, "streamFd_ enter FtClose");
512 FtClose(streamFd_);
513 streamFd_ = -1;
514 }
515
516 if (epollFd_ != -1) {
517 TRANS_LOGI(TRANS_STREAM, "epollFd_ enter FtClose");
518 FtClose(epollFd_);
519 epollFd_ = -1;
520 }
521
522 if (streamReceiver_ != nullptr) {
523 TRANS_LOGI(TRANS_STREAM, "DestroyStreamSocket receiver delete");
524 streamReceiver_->OnStreamStatus(STREAM_CLOSED);
525 streamReceiver_.reset();
526 }
527
528 QuitStreamBuffer();
529 vtpInstance_->UpdateSocketStreamCount(false);
530 isDestroyed_ = true;
531 TRANS_LOGD(TRANS_STREAM, "ok");
532 }
533
Connect(const IpAndPort & remote)534 bool VtpStreamSocket::Connect(const IpAndPort &remote)
535 {
536 if (remote.ip.empty()) {
537 TRANS_LOGE(TRANS_STREAM, "remote addr error, ip is nullptr");
538 DestroyStreamSocket();
539 return false;
540 }
541
542 TRANS_LOGD(TRANS_STREAM,
543 "Connect to server remotePort=%{public}d", remote.port);
544 remoteIpPort_ = remote;
545
546 struct sockaddr_in remoteSockAddr;
547 remoteSockAddr.sin_family = AF_INET;
548 remoteSockAddr.sin_port = htons(static_cast<short>(remote.port));
549 remoteSockAddr.sin_addr.s_addr = inet_addr(remote.ip.c_str());
550
551 int ret = FtConnect(streamFd_, reinterpret_cast<struct sockaddr *>(&remoteSockAddr), sizeof(remoteSockAddr));
552 if (ret != SOFTBUS_OK) {
553 TRANS_LOGE(TRANS_STREAM, "FtConnect failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
554 DestroyStreamSocket();
555 return false;
556 }
557
558 epollFd_ = FtEpollCreate();
559 if (epollFd_ < 0) {
560 TRANS_LOGE(TRANS_STREAM, "Failed to create epoll fd errno=%{public}d", FtGetErrno());
561 DestroyStreamSocket();
562 return false;
563 }
564
565 if (SetSocketEpollMode(streamFd_) != ERR_OK) {
566 TRANS_LOGE(TRANS_STREAM, "SetSocketEpollMode failed, streamFd=%{public}d", streamFd_);
567 DestroyStreamSocket();
568 return false;
569 }
570 isStreamRecv_ = true;
571 TRANS_LOGI(TRANS_STREAM, "Success to connect remote, and create a thread to recv data.");
572
573 CreateClientProcessThread();
574 return true;
575 }
576
EncryptStreamPacket(std::unique_ptr<IStream> stream,std::unique_ptr<char[]> & data,ssize_t & len)577 bool VtpStreamSocket::EncryptStreamPacket(std::unique_ptr<IStream> stream, std::unique_ptr<char[]> &data, ssize_t &len)
578 {
579 StreamPacketizer packet(streamType_, std::move(stream));
580 auto plainData = packet.PacketizeStream();
581 if (plainData == nullptr) {
582 TRANS_LOGE(TRANS_STREAM, "PacketizeStream failed");
583 return false;
584 }
585 len = packet.GetPacketLen() + GetEncryptOverhead();
586 TRANS_LOGD(TRANS_STREAM, "packetLen=%{public}zd, encryptOverhead=%{public}zd",
587 packet.GetPacketLen(), GetEncryptOverhead());
588 data = std::make_unique<char[]>(len + FRAME_HEADER_LEN);
589 ssize_t encLen = Encrypt(plainData.get(), packet.GetPacketLen(), data.get() + FRAME_HEADER_LEN, len);
590 if (encLen != len) {
591 TRANS_LOGE(TRANS_STREAM, "encrypted failed, dataLen=%{public}zd, encLen=%{public}zd", len, encLen);
592 return false;
593 }
594 InsertBufferLength(len, FRAME_HEADER_LEN, reinterpret_cast<uint8_t *>(data.get()));
595 len += FRAME_HEADER_LEN;
596
597 return true;
598 }
599
Send(std::unique_ptr<IStream> stream)600 bool VtpStreamSocket::Send(std::unique_ptr<IStream> stream)
601 {
602 TRANS_LOGD(TRANS_STREAM, "send in... streamType=%{public}d, dataSize=%{public}zd, extSize=%{public}zd",
603 streamType_, stream->GetBufferLen(), stream->GetExtBufferLen());
604
605 if (!isBlocked_) {
606 isBlocked_ = true;
607 if (!SetNonBlockMode(streamFd_, StreamAttr(false))) {
608 TRANS_LOGE(TRANS_STREAM, "set non block mode fail");
609 return false;
610 }
611 }
612
613 int32_t ret = -1;
614 std::unique_ptr<char[]> data = nullptr;
615 ssize_t len = 0;
616
617 const Communication::SoftBus::StreamFrameInfo *streamFrameInfo = stream->GetStreamFrameInfo();
618 if (streamFrameInfo == nullptr) {
619 TRANS_LOGE(TRANS_STREAM, "streamFrameInfo is null");
620 return false;
621 }
622 FrameInfo frameInfo;
623 ConvertStreamFrameInfo2FrameInfo(&frameInfo, streamFrameInfo);
624
625 if (streamType_ == RAW_STREAM) {
626 data = stream->GetBuffer();
627 len = stream->GetBufferLen();
628
629 ret = FtSendFrame(streamFd_, data.get(), len, 0, &frameInfo);
630 } else if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
631 if (!EncryptStreamPacket(std::move(stream), data, len)) {
632 return false;
633 }
634 ret = FtSendFrame(streamFd_, data.get(), len, 0, &frameInfo);
635 }
636
637 if (ret == -1) {
638 TRANS_LOGE(TRANS_STREAM, "send failed, errno=%{public}d", FtGetErrno());
639 return false;
640 }
641
642 TRANS_LOGD(TRANS_STREAM, "send out..., streamType=%{public}d, len=%{public}zd", streamType_, len);
643 return true;
644 }
645
SetOption(int type,const StreamAttr & value)646 bool VtpStreamSocket::SetOption(int type, const StreamAttr &value)
647 {
648 PrintOptionInfo(type, value);
649 auto it = optFuncMap_.find(type);
650 if (it == optFuncMap_.end()) {
651 TRANS_LOGW(TRANS_STREAM, "not found type=%{public}d", type);
652 return false;
653 }
654
655 if (value.GetType() != it->second.valueType) {
656 TRANS_LOGW(TRANS_STREAM,
657 "type=%{public}d, valueType=%{public}d", value.GetType(), it->second.valueType);
658 return false;
659 }
660
661 MySetFunc set = it->second.set;
662 if (set == nullptr) {
663 TRANS_LOGW(TRANS_STREAM, "set is nullptr");
664 return false;
665 }
666
667 if (type == NON_BLOCK || type == TOS) {
668 return (this->*set)(static_cast<int>(streamFd_), value);
669 }
670
671 auto outerIt = FILLP_TYPE_MAP.find(type);
672 if (outerIt != FILLP_TYPE_MAP.end()) {
673 return (this->*set)(outerIt->second, value);
674 }
675
676 auto innerIt = INNER_FILLP_TYPE_MAP.find(type);
677 if (innerIt != INNER_FILLP_TYPE_MAP.end()) {
678 return (this->*set)(innerIt->second, value);
679 }
680
681 return (this->*set)(static_cast<int>(type), value);
682 }
683
GetOption(int type) const684 StreamAttr VtpStreamSocket::GetOption(int type) const
685 {
686 StreamAttr attr {};
687 auto it = optFuncMap_.find(type);
688 if (it != optFuncMap_.end()) {
689 MyGetFunc get = it->second.get;
690 if (get == nullptr) {
691 TRANS_LOGE(TRANS_STREAM, "Can not get option type=%{public}d", type);
692 return std::move(StreamAttr());
693 }
694 if (type == NON_BLOCK) {
695 attr = (this->*get)(static_cast<int>(streamFd_));
696 } else {
697 attr = (this->*get)(static_cast<int>(type));
698 }
699 }
700
701 PrintOptionInfo(type, attr);
702 return attr;
703 }
704
SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)705 bool VtpStreamSocket::SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)
706 {
707 if (receiver == nullptr) {
708 TRANS_LOGW(TRANS_STREAM, "receiver is nullptr");
709 return false;
710 }
711
712 std::lock_guard<std::mutex> guard(streamSocketLock_);
713 streamReceiver_ = receiver;
714 TRANS_LOGI(TRANS_STREAM, "set receiver success");
715 return true;
716 }
717
InitVtpInstance(const std::string & pkgName)718 bool VtpStreamSocket::InitVtpInstance(const std::string &pkgName)
719 {
720 return vtpInstance_->InitVtp(pkgName);
721 }
722
DestroyVtpInstance(const std::string & pkgName)723 void VtpStreamSocket::DestroyVtpInstance(const std::string &pkgName)
724 {
725 TRANS_LOGD(TRANS_STREAM, "enter.");
726 vtpInstance_->DestroyVtp(pkgName);
727 TRANS_LOGD(TRANS_STREAM, "ok");
728 }
729
CreateAndBindSocket(IpAndPort & local,bool isServer)730 int VtpStreamSocket::CreateAndBindSocket(IpAndPort &local, bool isServer)
731 {
732 localIpPort_ = local;
733 vtpInstance_->UpdateSocketStreamCount(true);
734 if (local.ip.empty()) {
735 TRANS_LOGE(TRANS_STREAM, "ip is empty");
736 return -1;
737 }
738
739 int sockFd = FtSocket(AF_INET, SOCK_STREAM, IPPROTO_FILLP);
740 if (sockFd == -1) {
741 TRANS_LOGE(TRANS_STREAM, "FtSocket failed, errno=%{public}d", FtGetErrno());
742 return -1;
743 }
744 if (!isServer) {
745 TRANS_LOGI(TRANS_STREAM, "FtSocket set client, errno=%{public}d", FtGetErrno());
746 streamFd_ = sockFd;
747 }
748 SetDefaultConfig(sockFd);
749
750 // bind
751 sockaddr_in localSockAddr = { 0 };
752 char host[ADDR_MAX_SIZE];
753 localSockAddr.sin_family = AF_INET;
754 localSockAddr.sin_port = htons((short)local.port);
755 localSockAddr.sin_addr.s_addr = inet_addr(local.ip.c_str());
756 localIpPort_.ip = SoftBusInetNtoP(AF_INET, &(localSockAddr.sin_addr), host, ADDR_MAX_SIZE);
757 if (!SetSocketBoundInner(sockFd, localIpPort_.ip)) {
758 TRANS_LOGE(TRANS_STREAM, "SetSocketBoundInner failed, errno=%{public}d", FtGetErrno());
759 }
760
761 socklen_t localAddrLen = sizeof(localSockAddr);
762 int ret = FtBind(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), localAddrLen);
763 if (ret == -1) {
764 FtClose(sockFd);
765 TRANS_LOGE(TRANS_STREAM, "FtBind failed, errno=%{public}d", FtGetErrno());
766 return -1;
767 }
768
769 // 获取port
770 ret = FtGetSockName(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), &localAddrLen);
771 if (ret != ERR_OK) {
772 TRANS_LOGE(TRANS_STREAM, "getsockname error ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
773 FtClose(sockFd);
774 return -1;
775 }
776
777 localIpPort_.port = static_cast<int32_t>(ntohs(localSockAddr.sin_port));
778 local.port = localIpPort_.port;
779
780 return sockFd;
781 }
782
783
EnableBwEstimationAlgo(int streamFd,bool isServer) const784 bool VtpStreamSocket::EnableBwEstimationAlgo(int streamFd, bool isServer) const
785 {
786 #ifdef FILLP_SUPPORT_BW_DET
787 int errBwDet;
788 if (isServer) {
789 int32_t enableBwDet = FILLP_BW_DET_RX_ENABLE;
790 errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
791 &enableBwDet, sizeof(enableBwDet));
792 } else {
793 int32_t enableBwDet = FILLP_BW_DET_TX_ENABLE;
794 errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
795 &enableBwDet, sizeof(enableBwDet));
796 }
797 if (errBwDet < 0) {
798 TRANS_LOGE(TRANS_STREAM,
799 "Fail to enable bandwidth estimation algorithm for streamFd=%{public}d, errno%{public}d",
800 streamFd, FtGetErrno());
801 return true;
802 } else {
803 TRANS_LOGE(TRANS_STREAM,
804 "Success to enable bandwidth estimation algorithm for stream=Fd%{public}d", streamFd);
805 return false;
806 }
807 #else
808 return true;
809 #endif
810 }
811
EnableJitterDetectionAlgo(int streamFd) const812 bool VtpStreamSocket::EnableJitterDetectionAlgo(int streamFd) const
813 {
814 #ifdef FILLP_SUPPORT_CQE
815 int32_t enableCQE = FILLP_CQE_ENABLE;
816 auto errCQE = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_CQE_ALGO, &enableCQE, sizeof(enableCQE));
817 if (errCQE < 0) {
818 TRANS_LOGE(TRANS_STREAM,
819 "Fail to enable CQE algorithm for streamFd=%{public}d, errno=%{public}d", streamFd, FtGetErrno());
820 return true;
821 } else {
822 TRANS_LOGE(TRANS_STREAM,
823 "Success to enable CQE algorithm for streamFd=%{public}d", streamFd);
824 return false;
825 }
826 #else
827 return true;
828 #endif
829 }
830
EnableDirectlySend(int streamFd) const831 bool VtpStreamSocket::EnableDirectlySend(int streamFd) const
832 {
833 int32_t enable = 1;
834 FILLP_INT ret = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_DIRECTLY_SEND, &enable, sizeof(enable));
835 if (ret < 0) {
836 TRANS_LOGE(TRANS_STREAM,
837 "Fail to enable direct send for streamFd=%{public}d, rrno=%{public}d", streamFd, FtGetErrno());
838 return false;
839 }
840 TRANS_LOGI(TRANS_STREAM, "Success to enable direct send for streamFd=%{public}d", streamFd);
841 return true;
842 }
843
EnableSemiReliable(int streamFd) const844 bool VtpStreamSocket::EnableSemiReliable(int streamFd) const
845 {
846 int32_t enable = 1;
847 FILLP_INT ret = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SEMI_RELIABLE, &enable, sizeof(enable));
848 if (ret < 0) {
849 TRANS_LOGE(TRANS_STREAM,
850 "Fail to enable direct send for streamFd=%{public}d, errno=%{public}d", streamFd, FtGetErrno());
851 return false;
852 }
853 TRANS_LOGI(TRANS_STREAM, "Success to enable semi reliable for streamFd=%{public}d", streamFd);
854 return true;
855 }
856
RegisterMetricCallback(bool isServer)857 void VtpStreamSocket::RegisterMetricCallback(bool isServer)
858 {
859 VtpStreamSocket::AddStreamSocketLock(streamFd_, streamSocketLock_);
860 auto self = this->GetSelf();
861 VtpStreamSocket::AddStreamSocketListener(streamFd_, self);
862 int regStatisticsRet = FtApiRegEventCallbackFunc(FILLP_CONFIG_ALL_SOCKET, FillpStatistics);
863 int value = 1;
864 auto err = FtSetSockOpt(streamFd_, IPPROTO_FILLP, FILLP_SOCK_TRAFFIC, &value, sizeof(value));
865 if (err < 0) {
866 TRANS_LOGE(TRANS_STREAM, "fail to set socket binding to device");
867 return;
868 }
869 TRANS_LOGD(TRANS_STREAM, "FtSetSockOpt start success");
870 if (isServer) {
871 if (regStatisticsRet == 0) {
872 TRANS_LOGI(TRANS_STREAM,
873 "Success to register the stream callback function at server side. streamFd=%{public}d", streamFd_);
874 } else {
875 TRANS_LOGE(TRANS_STREAM,
876 "Fail to register the stream callback function at server side. streamFd=%{public}d, errno=%{public}d",
877 streamFd_, regStatisticsRet);
878 }
879 } else {
880 if (regStatisticsRet == 0) {
881 TRANS_LOGI(TRANS_STREAM,
882 "Success to register the stream callback function at client side. streamFd=%{public}d", streamFd_);
883 } else {
884 TRANS_LOGE(TRANS_STREAM,
885 "Fail to register the stream callback function at client side. streamFd=%{public}d, errno=%{public}d",
886 streamFd_, regStatisticsRet);
887 }
888 }
889 }
890
Accept()891 bool VtpStreamSocket::Accept()
892 {
893 TRANS_LOGD(TRANS_STREAM, "enter.");
894 auto fd = FtAccept(listenFd_, nullptr, nullptr);
895 TRANS_LOGI(TRANS_STREAM, "accept streamFd=%{public}d", fd);
896 if (fd == -1) {
897 TRANS_LOGE(TRANS_STREAM, "errno=%{public}d", FtGetErrno());
898 return false;
899 }
900
901 sockaddr remoteAddr {};
902 socklen_t remoteAddrLen = sizeof(remoteAddr);
903 auto ret = FtGetPeerName(fd, &remoteAddr, &remoteAddrLen);
904 if (ret != ERR_OK) {
905 TRANS_LOGE(TRANS_STREAM, "get name failed, fd=%{public}d", fd);
906 FtClose(fd);
907 return false;
908 }
909
910 char host[ADDR_MAX_SIZE];
911 if (remoteAddr.sa_family == AF_INET) {
912 auto v4Addr = reinterpret_cast<const sockaddr_in *>(&remoteAddr);
913 remoteIpPort_.ip = SoftBusInetNtoP(AF_INET, &(v4Addr->sin_addr), host, ADDR_MAX_SIZE);
914 remoteIpPort_.port = v4Addr->sin_port;
915 } else {
916 auto v6Addr = reinterpret_cast<const sockaddr_in6 *>(&remoteAddr);
917 remoteIpPort_.ip = SoftBusInetNtoP(AF_INET6, &(v6Addr->sin6_addr), host, ADDR_MAX_SIZE);
918 remoteIpPort_.port = v6Addr->sin6_port;
919 }
920
921 TRANS_LOGD(TRANS_STREAM, "Accept a client remotePort=%{public}d", remoteIpPort_.port);
922
923 if (SetSocketEpollMode(fd) != ERR_OK) {
924 TRANS_LOGE(TRANS_STREAM, "SetSocketEpollMode failed, fd=%{public}d", fd);
925 FtClose(fd);
926 return false;
927 }
928
929 std::lock_guard<std::mutex> guard(streamSocketLock_);
930 streamFd_ = fd;
931 configCv_.notify_all();
932
933 if (streamReceiver_ != nullptr) {
934 TRANS_LOGI(TRANS_STREAM, "notify stream connected!");
935 streamReceiver_->OnStreamStatus(STREAM_CONNECTED);
936 }
937
938 bool isServer = true;
939 RegisterMetricCallback(isServer); /* register the callback function */
940 /* enable the bandwidth and CQE estimation algorithms for current ftsocket */
941 #ifdef FILLP_SUPPORT_BW_DET
942 EnableBwEstimationAlgo(streamFd_, isServer);
943 #endif
944 #ifdef FILLP_SUPPORT_CQE
945 EnableJitterDetectionAlgo(streamFd_);
946 #endif
947
948 TRANS_LOGI(TRANS_STREAM, "accept success!");
949 return true;
950 }
951
EpollTimeout(int fd,int timeout)952 int VtpStreamSocket::EpollTimeout(int fd, int timeout)
953 {
954 struct SpungeEpollEvent events[MAX_EPOLL_NUM];
955 (void)memset_s(events, sizeof(events), 0, sizeof(events));
956 while (true) {
957 FILLP_INT fdNum = FtEpollWait(epollFd_, events, MAX_EPOLL_NUM, timeout);
958 if (fdNum <= 0) {
959 TRANS_LOGE(TRANS_STREAM,
960 "FtEpollWait failed, ret=%{public}d, errno=%{public}d", fdNum, FtGetErrno());
961 return -FtGetErrno();
962 }
963
964 for (FILLP_INT i = 0; i < fdNum; i++) {
965 if (events[i].data.fd != fd) {
966 continue;
967 }
968
969 if (events[i].events & (SPUNGE_EPOLLHUP | SPUNGE_EPOLLERR)) {
970 TRANS_LOGE(TRANS_STREAM,
971 "EpollTimeout, something may be wrong in this socket, fd=%{public}d, events=%{public}u", fd,
972 (unsigned int)events[i].events);
973 return -1;
974 }
975
976 if (events[i].events & SPUNGE_EPOLLIN) {
977 return SOFTBUS_OK;
978 }
979 }
980 }
981 }
982
SetSocketEpollMode(int fd)983 int VtpStreamSocket::SetSocketEpollMode(int fd)
984 {
985 if (!SetNonBlockMode(fd, StreamAttr(true))) {
986 TRANS_LOGE(TRANS_STREAM, "SetNonBlockMode failed, errno=%{public}d", FtGetErrno());
987 return -1;
988 }
989
990 struct SpungeEpollEvent event = {0};
991 event.events = SPUNGE_EPOLLIN;
992 event.data.fd = fd;
993
994 auto ret = FtEpollCtl(epollFd_, SPUNGE_EPOLL_CTL_ADD, fd, &event);
995 if (ret != ERR_OK) {
996 TRANS_LOGE(TRANS_STREAM, "FtEpollCtl failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
997 return ret;
998 }
999
1000 TRANS_LOGD(TRANS_STREAM, "SetNonBlockMode success");
1001 return SOFTBUS_OK;
1002 }
1003
InsertBufferLength(int num,int length,uint8_t * output) const1004 void VtpStreamSocket::InsertBufferLength(int num, int length, uint8_t *output) const
1005 {
1006 for (int i = 0; i < length; i++) {
1007 output[length - 1 - i] = static_cast<unsigned int>(
1008 ((static_cast<unsigned int>(num) >> static_cast<unsigned int>(BYTE_TO_BIT * i))) & INT_TO_BYTE);
1009 }
1010 }
1011
MakeStreamData(StreamData & data,const StreamFrameInfo & info) const1012 std::unique_ptr<IStream> VtpStreamSocket::MakeStreamData(StreamData &data, const StreamFrameInfo &info) const
1013 {
1014 std::unique_ptr<IStream> stream = nullptr;
1015 switch (streamType_) {
1016 case VIDEO_SLICE_STREAM:
1017 TRANS_LOGD(TRANS_STREAM, "do not support VIDEO_SLICE_STREAM streamType=%{public}d", streamType_);
1018 break;
1019 case COMMON_VIDEO_STREAM:
1020 case COMMON_AUDIO_STREAM:
1021 TRANS_LOGD(TRANS_STREAM,
1022 "streamType=%{public}d, seqNum=%{public}d, streamId=%{public}d",
1023 streamType_, info.seqNum, info.streamId);
1024 stream = IStream::MakeCommonStream(data, info);
1025 break;
1026 case RAW_STREAM:
1027 TRANS_LOGD(TRANS_STREAM, "streamType=%{public}d", streamType_);
1028 stream = IStream::MakeRawStream(data, info);
1029 break;
1030 default:
1031 TRANS_LOGE(TRANS_STREAM, "do not support streamType=%{public}d", streamType_);
1032 break;
1033 }
1034 if (stream == nullptr) {
1035 TRANS_LOGE(TRANS_STREAM, "IStream construct error");
1036 return nullptr;
1037 }
1038
1039 return stream;
1040 }
1041
RecvStreamLen()1042 int32_t VtpStreamSocket::RecvStreamLen()
1043 {
1044 int32_t hdrSize = FRAME_HEADER_LEN;
1045 if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1046 hdrSize = streamHdrSize_;
1047 }
1048
1049 int32_t len = -1;
1050 int32_t timeout = -1;
1051 auto buffer = std::make_unique<char[]>(hdrSize);
1052 if (EpollTimeout(streamFd_, timeout) == 0) {
1053 do {
1054 len = FtRecv(streamFd_, buffer.get(), hdrSize, 0);
1055 } while (len <= 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
1056 }
1057 TRANS_LOGD(TRANS_STREAM, "recv frame header, len=%{public}d, scene=%{public}d", len, scene_);
1058
1059 if (len <= 0) {
1060 TRANS_LOGE(TRANS_STREAM, "len invalid, len=%{public}d", len);
1061 return -1;
1062 }
1063
1064 if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1065 std::lock_guard<std::mutex> guard(streamSocketLock_);
1066 if (streamReceiver_ != nullptr) {
1067 return streamReceiver_->OnStreamHdrReceived(std::move(buffer), hdrSize);
1068 }
1069 }
1070
1071 return ntohl(*reinterpret_cast<int *>(buffer.get()));
1072 }
1073
ProcessCommonDataStream(std::unique_ptr<char[]> & dataBuffer,int32_t & dataLength,std::unique_ptr<char[]> & extBuffer,int32_t & extLen,StreamFrameInfo & info)1074 bool VtpStreamSocket::ProcessCommonDataStream(std::unique_ptr<char[]> &dataBuffer,
1075 int32_t &dataLength, std::unique_ptr<char[]> &extBuffer, int32_t &extLen, StreamFrameInfo &info)
1076 {
1077 TRANS_LOGD(TRANS_STREAM, "recv common stream");
1078 int32_t decryptedLength = dataLength;
1079 auto decryptedBuffer = std::move(dataBuffer);
1080
1081 int32_t plainDataLength = decryptedLength - GetEncryptOverhead();
1082 if (plainDataLength <= 0) {
1083 TRANS_LOGE(TRANS_STREAM, "Decrypt failed, invalid decryptedLen=%{public}d", decryptedLength);
1084 return false;
1085 }
1086 std::unique_ptr<char[]> plainData = std::make_unique<char[]>(plainDataLength);
1087 ssize_t decLen = Decrypt(decryptedBuffer.get(), decryptedLength, plainData.get(), plainDataLength);
1088 if (decLen != plainDataLength) {
1089 TRANS_LOGE(TRANS_STREAM,
1090 "Decrypt failed, dataLen=%{public}d, decryptedLen=%{public}zd", plainDataLength, decLen);
1091 return false;
1092 }
1093 auto header = plainData.get();
1094 StreamDepacketizer decode(streamType_);
1095 if (plainDataLength < static_cast<int32_t>(sizeof(CommonHeader))) {
1096 TRANS_LOGE(TRANS_STREAM,
1097 "failed, plainDataLen=%{public}d, CommonHeader=%{public}zu", plainDataLength, sizeof(CommonHeader));
1098 return false;
1099 }
1100 decode.DepacketizeHeader(header);
1101
1102 auto buffer = plainData.get() + sizeof(CommonHeader);
1103 decode.DepacketizeBuffer(buffer, plainDataLength - sizeof(CommonHeader));
1104
1105 extBuffer = decode.GetUserExt();
1106 extLen = decode.GetUserExtSize();
1107 info = decode.GetFrameInfo();
1108 dataBuffer = decode.GetData();
1109 dataLength = decode.GetDataLength();
1110 if (dataLength <= 0) {
1111 TRANS_LOGE(TRANS_STREAM, "common depacketize error, dataLen=%{public}d", dataLength);
1112 return false;
1113 }
1114 return true;
1115 }
1116
DoStreamRecv()1117 void VtpStreamSocket::DoStreamRecv()
1118 {
1119 while (isStreamRecv_) {
1120 std::unique_ptr<char[]> dataBuffer = nullptr;
1121 std::unique_ptr<char[]> extBuffer = nullptr;
1122 int32_t extLen = 0;
1123 StreamFrameInfo info = {};
1124 TRANS_LOGD(TRANS_STREAM, "recv stream");
1125 int32_t dataLength = VtpStreamSocket::RecvStreamLen();
1126 if (dataLength <= 0 || dataLength > MAX_STREAM_LEN) {
1127 TRANS_LOGE(TRANS_STREAM, "read frame length error, dataLength=%{public}d", dataLength);
1128 break;
1129 }
1130 TRANS_LOGD(TRANS_STREAM,
1131 "recv a new frame, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1132 dataBuffer = VtpStreamSocket::RecvStream(dataLength);
1133
1134 if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
1135 if (!ProcessCommonDataStream(dataBuffer, dataLength, extBuffer, extLen, info)) {
1136 break;
1137 }
1138 }
1139
1140 StreamData data = { std::move(dataBuffer), dataLength, std::move(extBuffer), extLen };
1141 std::unique_ptr<IStream> stream = MakeStreamData(data, info);
1142 if (stream == nullptr) {
1143 TRANS_LOGE(TRANS_STREAM, "MakeStreamData failed, stream is null");
1144 break;
1145 }
1146
1147 TRANS_LOGD(TRANS_STREAM,
1148 "recv frame done, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1149
1150 if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1151 std::lock_guard<std::mutex> guard(streamSocketLock_);
1152 if (streamReceiver_ != nullptr) {
1153 streamReceiver_->OnStreamReceived(std::move(stream));
1154 continue;
1155 }
1156 }
1157
1158 PutStream(std::move(stream));
1159 TRANS_LOGD(TRANS_STREAM, "put frame done, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1160 }
1161 TRANS_LOGI(TRANS_STREAM, "recv thread exit");
1162 }
1163
RecvStream(int32_t dataLength)1164 std::unique_ptr<char[]> VtpStreamSocket::RecvStream(int32_t dataLength)
1165 {
1166 auto buffer = std::make_unique<char[]>(dataLength);
1167 int32_t recvLen = 0;
1168 while (recvLen < dataLength) {
1169 int32_t ret = -1;
1170 int32_t timeout = -1;
1171
1172 if (EpollTimeout(streamFd_, timeout) == 0) {
1173 do {
1174 ret = FtRecv(streamFd_, (buffer.get() + recvLen), dataLength - recvLen, 0);
1175 } while (ret < 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
1176 }
1177
1178 if (ret == -1) {
1179 TRANS_LOGE(TRANS_STREAM, "read frame failed, errno=%{public}d", FtGetErrno());
1180 return nullptr;
1181 }
1182
1183 recvLen += ret;
1184 }
1185 return std::unique_ptr<char[]>(buffer.release());
1186 }
1187
SetDefaultConfig(int fd)1188 void VtpStreamSocket::SetDefaultConfig(int fd)
1189 {
1190 if (!SetIpTos(fd, StreamAttr(static_cast<int>(IPTOS_LOWDELAY)))) {
1191 TRANS_LOGW(TRANS_STREAM, "SetIpTos failed");
1192 }
1193 // Set Fillp direct sending
1194 if (!EnableDirectlySend(fd)) {
1195 TRANS_LOGW(TRANS_STREAM, "EnableDirectlySend failed");
1196 }
1197
1198 if (!EnableSemiReliable(fd)) {
1199 TRANS_LOGW(TRANS_STREAM, "EnableSemiReliable failed");
1200 }
1201 // Set Fillp Differentiated Transmission
1202 FILLP_BOOL enable = 1;
1203 if (!FtConfigSet(FT_CONF_APP_DIFFER_TRANSMIT, &enable, &fd)) {
1204 TRANS_LOGW(TRANS_STREAM, "Set differ transmit failed");
1205 }
1206
1207 if (!SetOption(RECV_BUF_SIZE, StreamAttr(static_cast<int>(DEFAULT_UDP_BUFFER_RCV_SIZE)))) {
1208 TRANS_LOGW(TRANS_STREAM, "Set recv buff failed");
1209 }
1210
1211 if (!SetOption(SEND_BUF_SIZE, StreamAttr(static_cast<int>(DEFAULT_UDP_BUFFER_SIZE)))) {
1212 TRANS_LOGW(TRANS_STREAM, "Set send buff failed");
1213 }
1214
1215 if (!SetOption(InnerStreamOptionType::RECV_CACHE, StreamAttr(static_cast<int>(FILLP_VTP_RECV_CACHE_SIZE)))) {
1216 TRANS_LOGW(TRANS_STREAM, "Set recv cache failed");
1217 }
1218
1219 if (!SetOption(InnerStreamOptionType::SEND_CACHE, StreamAttr(static_cast<int>(FILLP_VTP_SEND_CACHE_SIZE)))) {
1220 TRANS_LOGW(TRANS_STREAM, "Set send cache failed");
1221 }
1222 }
1223
SetIpTos(int fd,const StreamAttr & tos)1224 bool VtpStreamSocket::SetIpTos(int fd, const StreamAttr &tos)
1225 {
1226 auto tmp = tos.GetIntValue();
1227 if (FtSetSockOpt(fd, IPPROTO_IP, IP_TOS, &tmp, sizeof(tmp)) != ERR_OK) {
1228 TRANS_LOGE(TRANS_STREAM, "SetIpTos wrong! fd=%{public}d, errno=%{public}d", fd, FtGetErrno());
1229 return false;
1230 }
1231
1232 TRANS_LOGD(TRANS_STREAM, "Success to set ip tos: fd=%{public}d, tos=%{public}d", fd, tmp);
1233 return true;
1234 }
1235
GetIpTos(int type) const1236 StreamAttr VtpStreamSocket::GetIpTos(int type) const
1237 {
1238 static_cast<void>(type);
1239 int tos;
1240 int size = sizeof(tos);
1241
1242 if (FtGetSockOpt(streamFd_, IPPROTO_IP, IP_TOS, &tos, &size) != ERR_OK) {
1243 TRANS_LOGE(TRANS_STREAM, "FtGetSockOpt errno=%{public}d", FtGetErrno());
1244 return std::move(StreamAttr());
1245 }
1246
1247 return std::move(StreamAttr(tos));
1248 }
1249
GetStreamSocketFd(int type) const1250 StreamAttr VtpStreamSocket::GetStreamSocketFd(int type) const
1251 {
1252 static_cast<void>(type);
1253 return std::move(StreamAttr(streamFd_));
1254 }
1255
GetListenSocketFd(int type) const1256 StreamAttr VtpStreamSocket::GetListenSocketFd(int type) const
1257 {
1258 static_cast<void>(type);
1259 return std::move(StreamAttr(listenFd_));
1260 }
1261
SetSocketBoundInner(int fd,std::string ip) const1262 bool VtpStreamSocket::SetSocketBoundInner(int fd, std::string ip) const
1263 {
1264 auto boundIp = (ip.empty()) ? localIpPort_.ip : ip;
1265 struct ifaddrs *ifList = nullptr;
1266 if (getifaddrs(&ifList) < 0) {
1267 TRANS_LOGE(TRANS_STREAM,
1268 "get interface address return errno=%{public}d, strerror=%{public}s", errno, strerror(errno));
1269 return false;
1270 }
1271
1272 struct ifaddrs *ifa = nullptr;
1273 for (ifa = ifList; ifa != nullptr; ifa = ifa->ifa_next) {
1274 if (ifa->ifa_addr == nullptr) {
1275 continue;
1276 }
1277 if (ifa->ifa_addr->sa_family != AF_INET) {
1278 continue;
1279 }
1280
1281 char host[ADDR_MAX_SIZE];
1282 std::string devName(ifa->ifa_name);
1283 if (strcmp(boundIp.c_str(), SoftBusInetNtoP(AF_INET, &(((struct sockaddr_in *)ifa->ifa_addr)->sin_addr),
1284 host, ADDR_MAX_SIZE)) == 0) {
1285 TRANS_LOGI(TRANS_STREAM, "current use interface to bind to socket. ifName=%{public}s", ifa->ifa_name);
1286 auto err = FtSetSockOpt(fd, SOL_SOCKET, SO_BINDTODEVICE, devName.c_str(), devName.size());
1287 if (err < 0) {
1288 TRANS_LOGE(TRANS_STREAM, "fail to set socket binding to device");
1289 freeifaddrs(ifList);
1290 return false;
1291 }
1292 break;
1293 }
1294 }
1295 freeifaddrs(ifList);
1296
1297 return true;
1298 }
1299
SetSocketBindToDevices(int type,const StreamAttr & ip)1300 bool VtpStreamSocket::SetSocketBindToDevices(int type, const StreamAttr &ip)
1301 {
1302 static_cast<void>(type);
1303 auto tmp = ip.GetStrValue();
1304 auto boundIp = (tmp.empty()) ? localIpPort_.ip : tmp;
1305 return SetSocketBoundInner(streamFd_, boundIp);
1306 }
1307
SetVtpStackConfigDelayed(int type,const StreamAttr & value)1308 bool VtpStreamSocket::SetVtpStackConfigDelayed(int type, const StreamAttr &value)
1309 {
1310 std::unique_lock<std::mutex> lock(streamSocketLock_);
1311 if (streamFd_ == -1) {
1312 configCv_.wait(lock, [this] { return streamFd_ != -1; });
1313 }
1314 TRANS_LOGD(TRANS_STREAM, "set vtp stack config, streamFd=%{public}d", streamFd_);
1315 return SetVtpStackConfig(type, value);
1316 }
1317
SetVtpStackConfig(int type,const StreamAttr & value)1318 bool VtpStreamSocket::SetVtpStackConfig(int type, const StreamAttr &value)
1319 {
1320 if (streamFd_ == -1) {
1321 TRANS_LOGI(TRANS_STREAM, "set vtp stack config when streamFd is legal, type=%{public}d", type);
1322 auto self = GetSelf();
1323 std::thread([self, type, value]() {
1324 const std::string threadName = "OS_setVtpCfg";
1325 pthread_setname_np(pthread_self(), threadName.c_str());
1326 self->SetVtpStackConfigDelayed(type, value);
1327 }).detach();
1328 return true;
1329 }
1330
1331 if (value.GetType() == INT_TYPE) {
1332 int intVal = value.GetIntValue();
1333 int ret = FtConfigSet(type, &intVal, &streamFd_);
1334 if (ret != SOFTBUS_OK) {
1335 TRANS_LOGE(TRANS_STREAM,
1336 "FtConfigSet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1337 return false;
1338 }
1339
1340 TRANS_LOGI(TRANS_STREAM,
1341 "setVtpConfig success, type=%{public}d, streamFd=%{public}d, value=%{public}d", type, streamFd_, intVal);
1342 return true;
1343 }
1344
1345 if (value.GetType() == BOOL_TYPE) {
1346 bool flag = value.GetBoolValue();
1347 int ret = FtConfigSet(type, &flag, &streamFd_);
1348 if (ret != SOFTBUS_OK) {
1349 TRANS_LOGE(TRANS_STREAM,
1350 "FtConfigSet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1351 return false;
1352 }
1353
1354 TRANS_LOGI(TRANS_STREAM,
1355 "setVtpConfig success, streamFd=%{public}d, flag=%{public}d, type=%{public}d", type, streamFd_, flag);
1356 return true;
1357 }
1358
1359 TRANS_LOGE(TRANS_STREAM, "UNKNOWN TYPE!");
1360 return false;
1361 }
1362
GetVtpStackConfig(int type) const1363 StreamAttr VtpStreamSocket::GetVtpStackConfig(int type) const
1364 {
1365 int intVal = -1;
1366 int configFd = (streamFd_ == -1) ? FILLP_CONFIG_ALL_SOCKET : streamFd_;
1367 int ret = FtConfigGet(type, &intVal, &configFd);
1368 if (ret != SOFTBUS_OK) {
1369 TRANS_LOGE(TRANS_STREAM,
1370 "FtConfigGet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1371 return std::move(StreamAttr());
1372 }
1373
1374 int valType = ValueType::UNKNOWN;
1375 for (auto it = FILLP_TYPE_MAP.begin(); it != FILLP_TYPE_MAP.end(); it++) {
1376 if (it->second != type) {
1377 continue;
1378 }
1379
1380 valType = optFuncMap_.at(it->first).valueType;
1381 break;
1382 }
1383
1384 if (valType != ValueType::UNKNOWN) {
1385 for (auto it = INNER_FILLP_TYPE_MAP.begin(); it != INNER_FILLP_TYPE_MAP.end(); it++) {
1386 if (it->second != type) {
1387 continue;
1388 }
1389
1390 valType = optFuncMap_.at(it->first).valueType;
1391 break;
1392 }
1393 }
1394
1395 if (valType == BOOL_TYPE) {
1396 return std::move(StreamAttr(!!intVal));
1397 }
1398
1399 return std::move(StreamAttr(intVal));
1400 }
1401
SetNonBlockMode(int fd,const StreamAttr & value)1402 bool VtpStreamSocket::SetNonBlockMode(int fd, const StreamAttr &value)
1403 {
1404 FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1405 if (flags < 0) {
1406 TRANS_LOGE(TRANS_STREAM, "failed to get FtFcntl, flags=%{public}d", flags);
1407 flags = 0;
1408 }
1409 bool nonBlock = value.GetBoolValue();
1410
1411 flags = nonBlock ? static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) | O_NONBLOCK)) :
1412 static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) & ~O_NONBLOCK));
1413
1414 FILLP_INT res = FtFcntl(fd, F_SETFL, flags);
1415 if (res < 0) {
1416 TRANS_LOGE(TRANS_STREAM, "failed to set FtFcntl, res=%{public}d", res);
1417 return false;
1418 }
1419
1420 TRANS_LOGI(TRANS_STREAM, "Successfully to set fd=%{public}d, nonBlock=%{public}d", fd, nonBlock);
1421 return true;
1422 }
1423
GetNonBlockMode(int fd) const1424 StreamAttr VtpStreamSocket::GetNonBlockMode(int fd) const
1425 {
1426 FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1427 if (static_cast<unsigned int>(flags) & O_NONBLOCK) {
1428 return std::move(StreamAttr(true));
1429 }
1430
1431 return std::move(StreamAttr(false));
1432 }
1433
GetIp(int type) const1434 StreamAttr VtpStreamSocket::GetIp(int type) const
1435 {
1436 if (type == LOCAL_IP) {
1437 return std::move(StreamAttr(localIpPort_.ip));
1438 }
1439
1440 return std::move(StreamAttr(remoteIpPort_.ip));
1441 }
1442
GetPort(int type) const1443 StreamAttr VtpStreamSocket::GetPort(int type) const
1444 {
1445 if (type == LOCAL_PORT) {
1446 return std::move(StreamAttr(localIpPort_.port));
1447 }
1448 return std::move(StreamAttr(remoteIpPort_.port));
1449 }
1450
SetStreamType(int type,const StreamAttr & value)1451 bool VtpStreamSocket::SetStreamType(int type, const StreamAttr &value)
1452 {
1453 if (type != STREAM_TYPE_INT) {
1454 return false;
1455 }
1456
1457 streamType_ = value.GetIntValue();
1458 return true;
1459 }
1460
GetStreamType(int type) const1461 StreamAttr VtpStreamSocket::GetStreamType(int type) const
1462 {
1463 if (type != STREAM_TYPE_INT) {
1464 return std::move(StreamAttr());
1465 }
1466
1467 return std::move(StreamAttr(streamType_));
1468 }
1469
SetStreamScene(int type,const StreamAttr & value)1470 bool VtpStreamSocket::SetStreamScene(int type, const StreamAttr &value)
1471 {
1472 static_cast<void>(type);
1473 if (value.GetType() != INT_TYPE) {
1474 TRANS_LOGE(TRANS_STREAM, "value.GetType=%{public}d", value.GetType());
1475 return false;
1476 }
1477 scene_ = value.GetIntValue();
1478 TRANS_LOGI(TRANS_STREAM, "Set scene=%{public}d", scene_);
1479 return true;
1480 }
1481
SetStreamHeaderSize(int type,const StreamAttr & value)1482 bool VtpStreamSocket::SetStreamHeaderSize(int type, const StreamAttr &value)
1483 {
1484 static_cast<void>(type);
1485 if (value.GetType() != INT_TYPE) {
1486 TRANS_LOGE(TRANS_STREAM, "value.GetType=%{public}d", value.GetType());
1487 return false;
1488 }
1489 streamHdrSize_ = value.GetIntValue();
1490 TRANS_LOGI(TRANS_STREAM, "Set headerSize=%{public}d", streamHdrSize_);
1491 return true;
1492 }
1493
NotifyStreamListener()1494 void VtpStreamSocket::NotifyStreamListener()
1495 {
1496 while (isStreamRecv_) {
1497 int streamNum = GetStreamNum();
1498 if (streamNum >= STREAM_BUFFER_THRESHOLD) {
1499 TRANS_LOGW(TRANS_STREAM, "Too many data in receiver, streamNum=%{public}d", streamNum);
1500 }
1501
1502 auto stream = TakeStream();
1503 if (stream == nullptr) {
1504 TRANS_LOGE(TRANS_STREAM, "Pop stream failed");
1505 break;
1506 }
1507
1508 std::lock_guard<std::mutex> guard(streamSocketLock_);
1509 if (streamReceiver_ != nullptr) {
1510 TRANS_LOGD(TRANS_STREAM, "notify listener");
1511 streamReceiver_->OnStreamReceived(std::move(stream));
1512 TRANS_LOGD(TRANS_STREAM, "notify listener done.");
1513 }
1514 }
1515 TRANS_LOGI(TRANS_STREAM, "notify thread exit");
1516 }
1517
GetEncryptOverhead() const1518 ssize_t VtpStreamSocket::GetEncryptOverhead() const
1519 {
1520 return OVERHEAD_LEN;
1521 }
1522
Encrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1523 ssize_t VtpStreamSocket::Encrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1524 {
1525 if (in == nullptr || out == nullptr) {
1526 TRANS_LOGE(TRANS_STREAM, "param invalid");
1527 return SOFTBUS_INVALID_PARAM;
1528 }
1529 AesGcmCipherKey cipherKey = {0};
1530
1531 if (inLen - OVERHEAD_LEN > outLen) {
1532 TRANS_LOGE(TRANS_STREAM, "Encrypt invalid para.");
1533 return SOFTBUS_INVALID_PARAM;
1534 }
1535
1536 cipherKey.keyLen = SESSION_KEY_LENGTH;
1537 if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.first, sessionKey_.second) != EOK) {
1538 TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
1539 return SOFTBUS_MEM_ERR;
1540 }
1541
1542 int ret = SoftBusEncryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1543 (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1544 if (ret != SOFTBUS_OK || outLen != inLen + OVERHEAD_LEN) {
1545 TRANS_LOGE(TRANS_STREAM, "Encrypt Data fail. ret=%{public}d", ret);
1546 return SOFTBUS_ENCRYPT_ERR;
1547 }
1548 return outLen;
1549 }
1550
Decrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1551 ssize_t VtpStreamSocket::Decrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1552 {
1553 if (in == nullptr || out == nullptr) {
1554 TRANS_LOGE(TRANS_STREAM, "param invalid");
1555 return SOFTBUS_INVALID_PARAM;
1556 }
1557 AesGcmCipherKey cipherKey = {0};
1558
1559 if (inLen - OVERHEAD_LEN > outLen) {
1560 TRANS_LOGE(TRANS_STREAM, "Decrypt invalid para.");
1561 return SOFTBUS_INVALID_PARAM;
1562 }
1563
1564 cipherKey.keyLen = SESSION_KEY_LENGTH; // 256 bit encryption
1565 if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.first, sessionKey_.second) != EOK) {
1566 TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
1567 return SOFTBUS_MEM_ERR;
1568 }
1569 int ret = SoftBusDecryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1570 (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1571 if (ret != SOFTBUS_OK) {
1572 TRANS_LOGE(TRANS_STREAM, "Decrypt Data fail. ret=%{public}d ", ret);
1573 return SOFTBUS_DECRYPT_ERR;
1574 }
1575
1576 return outLen;
1577 }
1578
SetMultiLayer(const void * para)1579 int32_t VtpStreamSocket::SetMultiLayer(const void *para)
1580 {
1581 int fd = GetStreamSocketFd(FD).GetIntValue();
1582 return VtpSetSocketMultiLayer(fd, &onStreamEvtCb_, para);
1583 }
1584
CreateServerProcessThread()1585 void VtpStreamSocket::CreateServerProcessThread()
1586 {
1587 auto self = this->GetSelf();
1588 std::thread([self]() {
1589 const std::string threadName = "OS_sntfStmLsn";
1590 pthread_setname_np(pthread_self(), threadName.c_str());
1591 self->NotifyStreamListener();
1592 }).detach();
1593
1594 std::thread([self]() {
1595 const std::string threadName = "OS_sdstyStmSkt";
1596 pthread_setname_np(pthread_self(), threadName.c_str());
1597 if (!self->Accept()) {
1598 self->DestroyStreamSocket();
1599 return;
1600 }
1601 self->DoStreamRecv();
1602 self->DestroyStreamSocket();
1603 }).detach();
1604
1605 bool &isDestroyed = isDestroyed_;
1606 std::thread([self, &isDestroyed]() {
1607 const std::string threadName = "OS_sfillStatic";
1608 pthread_setname_np(pthread_self(), threadName.c_str());
1609 while (!isDestroyed) {
1610 self->FillpAppStatistics();
1611 std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
1612 }
1613 }).detach();
1614 }
1615
CreateClientProcessThread()1616 void VtpStreamSocket::CreateClientProcessThread()
1617 {
1618 auto self = this->GetSelf();
1619 std::thread([self]() {
1620 const std::string threadName = "OS_cntfStmLsn";
1621 pthread_setname_np(pthread_self(), threadName.c_str());
1622 self->NotifyStreamListener();
1623 }).detach();
1624
1625 std::thread([self]() {
1626 const std::string threadName = "OS_cdstyStmSkt";
1627 pthread_setname_np(pthread_self(), threadName.c_str());
1628 self->DoStreamRecv();
1629 self->DestroyStreamSocket();
1630 }).detach();
1631
1632 bool &isDestroyed = isDestroyed_;
1633 std::thread([self, &isDestroyed]() {
1634 const std::string threadName = "OS_cfillStatic";
1635 pthread_setname_np(pthread_self(), threadName.c_str());
1636 while (!isDestroyed) {
1637 self->FillpAppStatistics();
1638 std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
1639 }
1640 }).detach();
1641 }
1642 } // namespace SoftBus
1643 } // namespace Communication
1644