1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "vtp_stream_socket.h"
17
18 #include <chrono>
19 #include <ifaddrs.h>
20 #include <memory>
21 #include <netinet/in.h>
22 #include <securec.h>
23 #include <sys/socket.h>
24 #include <sys/time.h>
25 #include <thread>
26
27 #include "fillpinc.h"
28 #include "raw_stream_data.h"
29 #include "stream_common_data.h"
30 #include "session.h"
31 #include "softbus_adapter_timer.h"
32 #include "softbus_adapter_crypto.h"
33 #include "softbus_adapter_socket.h"
34 #include "softbus_errcode.h"
35 #include "softbus_log.h"
36 #include "softbus_trans_def.h"
37 #include "stream_depacketizer.h"
38 #include "stream_packetizer.h"
39
40 namespace Communication {
41 namespace SoftBus {
42 bool g_logOn = false;
43 const int FEED_BACK_PERIOD = 1; /* feedback period of fillp stream traffic statistics is 1s */
44 const int MS_PER_SECOND = 1000;
45 const int US_PER_MS = 1000;
46
47 namespace {
PrintOptionInfo(int type,const StreamAttr & value)48 void PrintOptionInfo(int type, const StreamAttr &value)
49 {
50 switch (value.GetType()) {
51 case INT_TYPE:
52 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
53 "Int option: type:%d, value:%d", type, value.GetIntValue());
54 break;
55 case BOOL_TYPE:
56 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
57 "Bool option: type:%d, value:%d", type, value.GetBoolValue());
58 break;
59 case STRING_TYPE:
60 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
61 "String option: type:%d, value:%s", type, value.GetStrValue().c_str());
62 break;
63 default:
64 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Wrong StreamAttr!");
65 (void)type;
66 }
67 }
68 } // namespace
69 std::shared_ptr<VtpInstance> VtpStreamSocket::vtpInstance_ = VtpInstance::GetVtpInstance();
70
71 std::map<int, std::mutex &> VtpStreamSocket::g_streamSocketLockMap;
72 std::mutex VtpStreamSocket::g_streamSocketLockMapLock_;
73 std::map<int, std::shared_ptr<VtpStreamSocket>> VtpStreamSocket::g_streamSocketMap;
74 std::mutex VtpStreamSocket::g_streamSocketMapLock_;
75
ConvertStreamFrameInfo2FrameInfo(FrameInfo * frameInfo,const Communication::SoftBus::StreamFrameInfo * streamFrameInfo)76 static inline void ConvertStreamFrameInfo2FrameInfo(FrameInfo* frameInfo,
77 const Communication::SoftBus::StreamFrameInfo* streamFrameInfo)
78 {
79 frameInfo->frameType = (FILLP_INT)(streamFrameInfo->frameType);
80 frameInfo->seqNum = (FILLP_INT)(streamFrameInfo->seqNum);
81 frameInfo->subSeqNum = (FILLP_INT)(streamFrameInfo->seqSubNum);
82 frameInfo->level = (FILLP_INT)(streamFrameInfo->level);
83 frameInfo->timestamp = (FILLP_SLONG)streamFrameInfo->timeStamp;
84 frameInfo->bitMap = (FILLP_UINT32)streamFrameInfo->bitMap;
85 }
86
AddStreamSocketLock(int fd,std::mutex & streamsocketlock)87 void VtpStreamSocket::AddStreamSocketLock(int fd, std::mutex &streamsocketlock)
88 {
89 std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
90 if (!g_streamSocketLockMap.empty() && g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
91 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "streamsocketlock for fd = %d already exists", fd);
92 return;
93 }
94
95 g_streamSocketLockMap.emplace(std::pair<int, std::mutex &>(fd, streamsocketlock));
96 }
97
AddStreamSocketListener(int fd,std::shared_ptr<VtpStreamSocket> streamreceiver)98 void VtpStreamSocket::AddStreamSocketListener(int fd, std::shared_ptr<VtpStreamSocket> streamreceiver)
99 {
100 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
101 if (!g_streamSocketMap.empty() && g_streamSocketMap.find(fd) != g_streamSocketMap.end()) {
102 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "streamreceiver for fd = %d already exists", fd);
103 return;
104 }
105
106 g_streamSocketMap.emplace(std::pair<int, std::shared_ptr<VtpStreamSocket>>(fd, streamreceiver));
107 }
108
RemoveStreamSocketLock(int fd)109 void VtpStreamSocket::RemoveStreamSocketLock(int fd)
110 {
111 std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
112 if (g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
113 g_streamSocketLockMap.erase(fd);
114 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Remove streamsocketlock for fd = %d success", fd);
115 } else {
116 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
117 "Streamsocketlock for fd = %d not exist in the map", fd);
118 }
119 return;
120 }
121
RemoveStreamSocketListener(int fd)122 void VtpStreamSocket::RemoveStreamSocketListener(int fd)
123 {
124 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
125 if (g_streamSocketMap.find(fd) != g_streamSocketMap.end()) {
126 g_streamSocketMap.erase(fd);
127 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Remove streamreceiver for fd = %d success", fd);
128 } else {
129 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Streamreceiver for fd = %d not exist in the map", fd);
130 }
131 return;
132 }
133
InsertElementToFuncMap(int type,ValueType valueType,MySetFunc set,MyGetFunc get)134 void VtpStreamSocket::InsertElementToFuncMap(int type, ValueType valueType, MySetFunc set, MyGetFunc get)
135 {
136 OptionFunc fun = {
137 valueType, set, get
138 };
139 optFuncMap_.insert(std::pair<int, OptionFunc>(type, fun));
140 }
141
VtpStreamSocket()142 VtpStreamSocket::VtpStreamSocket()
143 {
144 InsertElementToFuncMap(TOS, INT_TYPE, &VtpStreamSocket::SetIpTos, &VtpStreamSocket::GetIpTos);
145 InsertElementToFuncMap(FD, INT_TYPE, nullptr, &VtpStreamSocket::GetStreamSocketFd);
146 InsertElementToFuncMap(SERVER_FD, INT_TYPE, nullptr, &VtpStreamSocket::GetListenSocketFd);
147 InsertElementToFuncMap(LOCAL_IP, INT_TYPE, nullptr, &VtpStreamSocket::GetIp);
148 InsertElementToFuncMap(LOCAL_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
149 InsertElementToFuncMap(REMOTE_IP, STRING_TYPE, nullptr, &VtpStreamSocket::GetIp);
150 InsertElementToFuncMap(REMOTE_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
151 InsertElementToFuncMap(BOUND_INTERFACE_IP, STRING_TYPE, &VtpStreamSocket::SetSocketBindToDevices, nullptr);
152 InsertElementToFuncMap(IP_TYPE, STRING_TYPE, nullptr, &VtpStreamSocket::GetIpType);
153 InsertElementToFuncMap(REMOTE_SCOPE_ID, INT_TYPE, nullptr, &VtpStreamSocket::GetRemoteScopeId);
154 InsertElementToFuncMap(NON_BLOCK, BOOL_TYPE, &VtpStreamSocket::SetNonBlockMode, &VtpStreamSocket::GetNonBlockMode);
155 InsertElementToFuncMap(KEEP_ALIVE_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig, nullptr);
156 InsertElementToFuncMap(SEND_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
157 &VtpStreamSocket::GetVtpStackConfig);
158 InsertElementToFuncMap(RECV_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
159 &VtpStreamSocket::GetVtpStackConfig);
160 InsertElementToFuncMap(SEND_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
161 &VtpStreamSocket::GetVtpStackConfig);
162 InsertElementToFuncMap(RECV_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
163 &VtpStreamSocket::GetVtpStackConfig);
164 InsertElementToFuncMap(PACKET_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
165 &VtpStreamSocket::GetVtpStackConfig);
166 InsertElementToFuncMap(MAX_VTP_SOCKET_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
167 &VtpStreamSocket::GetVtpStackConfig);
168 InsertElementToFuncMap(MAX_VTP_CONNECT_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
169 &VtpStreamSocket::GetVtpStackConfig);
170 InsertElementToFuncMap(REDUNANCY_SWITCH, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
171 &VtpStreamSocket::GetVtpStackConfig);
172 InsertElementToFuncMap(REDUNANCY_LEVEL, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
173 &VtpStreamSocket::GetVtpStackConfig);
174 InsertElementToFuncMap(NACK_DELAY, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
175 &VtpStreamSocket::GetVtpStackConfig);
176 InsertElementToFuncMap(NACK_DELAY_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
177 &VtpStreamSocket::GetVtpStackConfig);
178 InsertElementToFuncMap(PACK_INTERVAL_ENLARGE, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
179 &VtpStreamSocket::GetVtpStackConfig);
180 InsertElementToFuncMap(STREAM_TYPE_INT, INT_TYPE, &VtpStreamSocket::SetStreamType, &VtpStreamSocket::GetStreamType);
181 InsertElementToFuncMap(IS_SERVER, INT_TYPE, nullptr, &VtpStreamSocket::IsServer);
182 InsertElementToFuncMap(SCENE, INT_TYPE, &VtpStreamSocket::SetStreamScene, nullptr);
183 InsertElementToFuncMap(STREAM_HEADER_SIZE, INT_TYPE, &VtpStreamSocket::SetStreamHeaderSize, nullptr);
184
185 scene_ = UNKNOWN_SCENE;
186 }
187
~VtpStreamSocket()188 VtpStreamSocket::~VtpStreamSocket()
189 {
190 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "~VtpStreamSocket");
191 }
192
GetSelf()193 std::shared_ptr<VtpStreamSocket> VtpStreamSocket::GetSelf()
194 {
195 return shared_from_this();
196 }
197
HandleFillpFrameStats(int fd,const FtEventCbkInfo * info)198 int VtpStreamSocket::HandleFillpFrameStats(int fd, const FtEventCbkInfo *info)
199 {
200 StreamSendStats stats = {};
201 if (memcpy_s(&stats, sizeof(StreamSendStats), &info->info.frameSendStats,
202 sizeof(info->info.frameSendStats)) != EOK) {
203 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "streamStats info memcpy fail");
204 return -1;
205 }
206
207 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
208 auto itListener = g_streamSocketMap.find(fd);
209 if (itListener != g_streamSocketMap.end()) {
210 if (itListener->second->streamReceiver_ != nullptr) {
211 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "OnFrameStats enter");
212 itListener->second->streamReceiver_->OnFrameStats(&stats);
213 } else {
214 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "streamReceiver_ is nullptr");
215 }
216 } else {
217 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "StreamReceiver for fd = %d is empty in the map", fd);
218 }
219 return 0;
220 }
221
HandleRipplePolicy(int fd,const FtEventCbkInfo * info)222 int VtpStreamSocket::HandleRipplePolicy(int fd, const FtEventCbkInfo *info)
223 {
224 TrafficStats stats;
225 (void)memset_s(&stats, sizeof(TrafficStats), 0, sizeof(TrafficStats));
226 if (memcpy_s(&stats.stats, sizeof(stats.stats), info->info.trafficData.stats,
227 sizeof(info->info.trafficData.stats)) != EOK) {
228 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "RipplePolicy info memcpy fail");
229 return -1;
230 }
231 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
232 auto itListener = g_streamSocketMap.find(fd);
233 if (itListener != g_streamSocketMap.end()) {
234 if (itListener->second->streamReceiver_ != nullptr) {
235 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "OnRippleStats enter");
236 itListener->second->streamReceiver_->OnRippleStats(&stats);
237 } else {
238 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "OnRippleStats streamReceiver_ is nullptr");
239 }
240 } else {
241 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
242 "OnRippleStats streamReceiver for fd = %d is empty in the map", fd);
243 }
244 return 0;
245 }
246
247 #ifdef FILLP_SUPPORT_BW_DET
FillSupportDet(int fd,const FtEventCbkInfo * info,QosTv * metricList)248 void VtpStreamSocket::FillSupportDet(int fd, const FtEventCbkInfo *info, QosTv *metricList)
249 {
250 if (info == nullptr) {
251 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "stats info is nullptr");
252 return;
253 }
254 if (info->evt == FT_EVT_BW_DET) {
255 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
256 "[Metric Return]: Fillp bandwidth information of socket(%d) is returned", fd);
257 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
258 "[Metric Return]: Changed amount of current available bandwidth is: %d", info->info.bwInfo.bwStat);
259 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
260 "[Metric Return]: Current bandwidth for receiving data is: %d kbps", info->info.bwInfo.rate);
261 metricList->type = BANDWIDTH_ESTIMATE_VALUE;
262 metricList->info.bandwidthInfo.trend = info->info.bwInfo.bwStat;
263 metricList->info.bandwidthInfo.rate = info->info.bwInfo.rate;
264 }
265 if (info->evt == FT_EVT_JITTER_DET) {
266 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
267 "[Metric Return]: Fillp connection quality information of socket(%d) is returned", fd);
268 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
269 "[Metric Return]: Predeicted network condition is: %d", info->info.jitterInfo.jitterLevel);
270 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
271 "[Metric Return]: Current available receiving buffer is: %d ms", info->info.jitterInfo.bufferTime);
272 metricList->type = JITTER_DETECTION_VALUE;
273 metricList->info.jitterInfo.jitterLevel = info->info.jitterInfo.jitterLevel;
274 metricList->info.jitterInfo.bufferTime = info->info.jitterInfo.bufferTime;
275 }
276 }
277 #endif
278
279 /* This function is used to prompt the metrics returned by FtApiRegEventCallbackFunc() function */
FillpStatistics(int fd,const FtEventCbkInfo * info)280 int VtpStreamSocket::FillpStatistics(int fd, const FtEventCbkInfo *info)
281 {
282 if (info == nullptr) {
283 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "stats info is nullptr");
284 return -1;
285 }
286 if (info->evt == FT_EVT_FRAME_STATS) {
287 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "recv fillp frame stats");
288 return HandleFillpFrameStats(fd, info);
289 } else if (info->evt == FT_EVT_TRAFFIC_DATA) {
290 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "recv fillp traffic data");
291 return HandleRipplePolicy(fd, info);
292 }
293 #ifdef FILLP_SUPPORT_BW_DET
294 if (info->evt == FT_EVT_BW_DET || info->evt == FT_EVT_JITTER_DET) {
295 int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
296 int16_t tvCount = 1;
297 QosTv metricList = {};
298
299 FillSupportDet(fd, info, &metricList);
300 metricList.info.wifiChannelInfo = {};
301 metricList.info.frameStatusInfo = {};
302
303 std::lock_guard<std::mutex> guard(g_streamSocketLockMapLock_);
304 auto itLock = g_streamSocketLockMap.find(fd);
305 if (itLock != g_streamSocketLockMap.end()) {
306 std::lock_guard<std::mutex> guard(g_streamSocketMapLock_);
307 auto itListener = g_streamSocketMap.find(fd);
308 if (itListener != g_streamSocketMap.end()) {
309 std::thread([itListener, eventId, tvCount, metricList, &itLock]() {
310 std::lock_guard<std::mutex> guard(itLock->second);
311 itListener->second->OnQosEvent(eventId, tvCount, &metricList);
312 }).detach();
313 } else {
314 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "StreamReceiver for fd = %d is empty in the map", fd);
315 }
316 } else {
317 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "StreamSocketLock for fd = %d is empty in the map", fd);
318 }
319 } else {
320 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
321 "[Metric Return]: Fail to retrieve bandwidth and connection quality information");
322 return -1;
323 }
324 #endif
325 return 0;
326 }
327
FillpAppStatistics()328 void VtpStreamSocket::FillpAppStatistics()
329 {
330 int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
331 int16_t tvCount = 1;
332 QosTv metricList = {};
333 FillpStatisticsPcb fillpPcbStats = {};
334 SoftBusSysTime fillpStatsGetTime = {0};
335
336 int getStatisticsRet = FtFillpStatsGet(streamFd_, &fillpPcbStats);
337 SoftBusGetTime(&fillpStatsGetTime);
338 if (getStatisticsRet == 0) {
339 metricList.type = STREAM_TRAFFIC_STASTICS;
340 metricList.info.appStatistics.statisticsGotTime = static_cast<uint64_t>((fillpStatsGetTime.sec *
341 MS_PER_SECOND + fillpStatsGetTime.usec / US_PER_MS)); /* ms */
342 metricList.info.appStatistics.periodRecvBits =
343 static_cast<uint64_t>(fillpPcbStats.appFcStastics.periodRecvBits);
344 metricList.info.appStatistics.pktNum = fillpPcbStats.appFcStastics.pktNum;
345 metricList.info.appStatistics.periodRecvPkts = fillpPcbStats.appFcStastics.periodRecvPkts;
346 metricList.info.appStatistics.periodRecvPktLoss = fillpPcbStats.appFcStastics.periodRecvPktLoss;
347 metricList.info.appStatistics.periodRecvRate = fillpPcbStats.appFcStastics.periodRecvRate;
348 metricList.info.appStatistics.periodRecvRateBps = fillpPcbStats.appFcStastics.periodRecvRateBps;
349 metricList.info.appStatistics.periodRtt = fillpPcbStats.appFcStastics.periodRtt;
350 metricList.info.appStatistics.periodRecvPktLossHighPrecision =
351 fillpPcbStats.appFcStastics.periodRecvPktLossHighPrecision;
352 metricList.info.appStatistics.periodSendLostPkts = fillpPcbStats.appFcStastics.periodSendLostPkts;
353 metricList.info.appStatistics.periodSendPkts = fillpPcbStats.appFcStastics.periodSendPkts;
354 metricList.info.appStatistics.periodSendPktLossHighPrecision =
355 fillpPcbStats.appFcStastics.periodSendPktLossHighPrecision;
356 metricList.info.appStatistics.periodSendBits = fillpPcbStats.appFcStastics.periodSendBits;
357 metricList.info.appStatistics.periodSendRateBps = fillpPcbStats.appFcStastics.periodSendRateBps;
358
359 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
360 "Succeed to get fillp statistics information for streamfd = %d", streamFd_);
361 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
362 "[Metric Return]: periodRtt is: %d", fillpPcbStats.appFcStastics.periodRtt);
363
364 std::lock_guard<std::mutex> guard(streamSocketLock_);
365
366 if (streamReceiver_ != nullptr) {
367 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
368 "[Metric Notify]: Fillp traffic statistics information of socket(%d) is notified", streamFd_);
369 streamReceiver_->OnQosEvent(eventId, tvCount, &metricList);
370 } else {
371 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "StreamReceiver for fd = %d is empty", streamFd_);
372 }
373 } else {
374 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
375 "Fail to get fillp statistics information for streamfd = %d, errorcode = %d", streamFd_, FtGetErrno());
376 }
377 }
378
CreateClient(IpAndPort & local,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)379 bool VtpStreamSocket::CreateClient(IpAndPort &local, int streamType, std::pair<uint8_t*, uint32_t> sessionKey)
380 {
381 int fd = CreateAndBindSocket(local);
382 if (fd == -1) {
383 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "CreateAndBindSocket failed, errorcode:%d", FtGetErrno());
384 DestroyStreamSocket();
385 return false;
386 }
387
388 sessionKey_.second = sessionKey.second;
389 if (sessionKey_.first == nullptr) {
390 sessionKey_.first = new uint8_t[sessionKey_.second];
391 }
392 if (memcpy_s(sessionKey_.first, sessionKey_.second, sessionKey.first, sessionKey.second) != EOK) {
393 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "memcpy key error.");
394 return false;
395 }
396
397 streamType_ = streamType;
398 std::lock_guard<std::mutex> guard(streamSocketLock_);
399 streamFd_ = fd;
400 configCv_.notify_all();
401
402 SetDefaultConfig(fd);
403 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
404 "Success to create a client socket(%d) of stream type(%d)", fd, streamType);
405 return true;
406 }
407
CreateClient(IpAndPort & local,const IpAndPort & remote,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)408 bool VtpStreamSocket::CreateClient(IpAndPort &local, const IpAndPort &remote, int streamType,
409 std::pair<uint8_t*, uint32_t> sessionKey)
410 {
411 if (!CreateClient(local, streamType, sessionKey)) {
412 return false;
413 }
414 /* enable the bandwidth and CQE estimation algorithms by FtSetSockOpt() for current ftsocket */
415 #ifdef FILLP_SUPPORT_BW_DET
416 bool isServer = false;
417 EnableBwEstimationAlgo(streamFd_, isServer);
418 #endif
419
420 bool connectRet = Connect(remote);
421 if (connectRet) {
422 bool isServer = false;
423 RegisterMetricCallback(isServer); /* register the callback function */
424 }
425 return connectRet;
426 }
427
CreateServer(IpAndPort & local,int streamType,std::pair<uint8_t *,uint32_t> sessionKey)428 bool VtpStreamSocket::CreateServer(IpAndPort &local, int streamType, std::pair<uint8_t*, uint32_t> sessionKey)
429 {
430 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "CreateVtpServer start");
431 listenFd_ = CreateAndBindSocket(local);
432 if (listenFd_ == -1) {
433 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "create listenFd failed, errorcode %d", FtGetErrno());
434 DestroyStreamSocket();
435 return false;
436 }
437
438 bool ret = FtListen(listenFd_, MAX_CONNECTION_VALUE);
439 if (ret != 0) {
440 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtListen failed, ret :%d errorcode %d", ret, FtGetErrno());
441 DestroyStreamSocket();
442 return false;
443 }
444
445 epollFd_ = FtEpollCreate();
446 if (epollFd_ < 0) {
447 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Failed to create epoll fd:%d", FtGetErrno());
448 DestroyStreamSocket();
449 return false;
450 }
451 isStreamRecv_ = true;
452 streamType_ = streamType;
453 sessionKey_.second = sessionKey.second;
454 if (sessionKey_.first == nullptr) {
455 sessionKey_.first = new uint8_t[sessionKey_.second];
456 }
457 if (memcpy_s(sessionKey_.first, sessionKey_.second, sessionKey.first, sessionKey.second) != EOK) {
458 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "memcpy key error.");
459 return false;
460 }
461 auto self = this->GetSelf();
462 std::thread([self]() { self->NotifyStreamListener(); }).detach();
463
464 std::thread([self]() {
465 if (!self->Accept()) {
466 self->DestroyStreamSocket();
467 return;
468 }
469 self->DoStreamRecv();
470 self->DestroyStreamSocket();
471 }).detach();
472
473 bool &isDestroyed = isDestroyed_;
474 std::thread([self, &isDestroyed]() {
475 while (!isDestroyed) {
476 self->FillpAppStatistics();
477 std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
478 }
479 }).detach();
480
481 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
482 "CreateServer end, listenFd:%d, epollFd:%d, streamType:%d", listenFd_, epollFd_, streamType_);
483 return true;
484 }
485
DestroyStreamSocket()486 void VtpStreamSocket::DestroyStreamSocket()
487 {
488 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyStreamSocket start");
489 std::lock_guard<std::mutex> guard(streamSocketLock_);
490 if (isDestroyed_) {
491 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "StreamSocket is already destroyed");
492 return;
493 }
494 if (listenFd_ != -1) {
495 FtClose(listenFd_);
496 listenFd_ = -1;
497 }
498
499 if (streamFd_ != -1) {
500 RemoveStreamSocketLock(streamFd_); /* remove the socket lock from the map */
501 RemoveStreamSocketListener(streamFd_); /* remove the socket listener from the map */
502 FtClose(streamFd_);
503 streamFd_ = -1;
504 }
505
506 if (epollFd_ != -1) {
507 FtClose(epollFd_);
508 epollFd_ = -1;
509 }
510
511 if (streamReceiver_ != nullptr) {
512 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyStreamSocket receiver delete");
513 streamReceiver_->OnStreamStatus(STREAM_CLOSED);
514 streamReceiver_.reset();
515 }
516
517 QuitStreamBuffer();
518 vtpInstance_->UpdateSocketStreamCount(false);
519 isDestroyed_ = true;
520 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyStreamSocket end");
521 }
522
Connect(const IpAndPort & remote)523 bool VtpStreamSocket::Connect(const IpAndPort &remote)
524 {
525 if (remote.ip.empty()) {
526 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "remote addr error, ip is nullptr");
527 DestroyStreamSocket();
528 return false;
529 }
530
531 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
532 "Connect to server(server port:%d)", remote.port);
533 remoteIpPort_ = remote;
534
535 struct sockaddr_in remoteSockAddr;
536 remoteSockAddr.sin_family = AF_INET;
537 remoteSockAddr.sin_port = htons(static_cast<short>(remote.port));
538 remoteSockAddr.sin_addr.s_addr = inet_addr(remote.ip.c_str());
539
540 int ret = FtConnect(streamFd_, reinterpret_cast<struct sockaddr *>(&remoteSockAddr), sizeof(remoteSockAddr));
541 if (ret != 0) {
542 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtConnect failed, ret :%d, errorno: %d", ret, FtGetErrno());
543 DestroyStreamSocket();
544 return false;
545 }
546
547 epollFd_ = FtEpollCreate();
548 if (epollFd_ < 0) {
549 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Failed to create epoll fd:%d", FtGetErrno());
550 DestroyStreamSocket();
551 return false;
552 }
553
554 if (SetSocketEpollMode(streamFd_) != ERR_OK) {
555 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetSocketEpollMode failed, fd = %d", streamFd_);
556 DestroyStreamSocket();
557 return false;
558 }
559 isStreamRecv_ = true;
560 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Success to connect remote, and create a thread to recv data.");
561
562 auto self = this->GetSelf();
563 std::thread([self]() { self->NotifyStreamListener(); }).detach();
564
565 std::thread([self]() {
566 self->DoStreamRecv();
567 self->DestroyStreamSocket();
568 }).detach();
569
570 bool &isDestroyed = isDestroyed_;
571 std::thread([self, &isDestroyed]() {
572 while (!isDestroyed) {
573 self->FillpAppStatistics();
574 std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
575 }
576 }).detach();
577 return true;
578 }
579
Send(std::unique_ptr<IStream> stream)580 bool VtpStreamSocket::Send(std::unique_ptr<IStream> stream)
581 {
582 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "send in..., streamType:%d, data size:%zd, ext size:%zd", streamType_,
583 stream->GetBufferLen(), stream->GetExtBufferLen());
584
585 if (!isBlocked_) {
586 isBlocked_ = true;
587 if (!SetNonBlockMode(streamFd_, StreamAttr(false))) {
588 return false;
589 }
590 }
591
592 int ret = -1;
593 std::unique_ptr<char[]> data = nullptr;
594 ssize_t len = 0;
595
596 if (streamType_ == RAW_STREAM) {
597 data = stream->GetBuffer();
598 len = stream->GetBufferLen();
599
600 ret = FtSend(streamFd_, data.get(), len, 0);
601 } else if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
602 const Communication::SoftBus::StreamFrameInfo *streamFrameInfo = stream->GetStreamFrameInfo();
603 if (streamFrameInfo == nullptr) {
604 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "streamFrameInfo == nullptr");
605 return false;
606 }
607
608 StreamPacketizer packet(streamType_, std::move(stream));
609 auto plainData = packet.PacketizeStream();
610 if (plainData == nullptr) {
611 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "PacketizeStream failed");
612 return false;
613 }
614 len = packet.GetPacketLen() + GetEncryptOverhead();
615 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
616 "packet.GetPacketLen() = %zd, GetEncryptOverhead() = %zd", packet.GetPacketLen(), GetEncryptOverhead());
617 data = std::make_unique<char[]>(len + FRAME_HEADER_LEN);
618 ssize_t encLen = Encrypt(plainData.get(), packet.GetPacketLen(), data.get() + FRAME_HEADER_LEN, len);
619 if (encLen != len) {
620 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
621 "encrypted failed, dataLen = %zd, encryptLen = %zd", len, encLen);
622 return false;
623 }
624 InsertBufferLength(len, FRAME_HEADER_LEN, reinterpret_cast<uint8_t *>(data.get()));
625 len += FRAME_HEADER_LEN;
626
627 FrameInfo frameInfo;
628 ConvertStreamFrameInfo2FrameInfo(&frameInfo, streamFrameInfo);
629 ret = FtSendFrame(streamFd_, data.get(), len, 0, &frameInfo);
630 }
631
632 if (ret == -1) {
633 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "send failed, errorno: %d", FtGetErrno());
634 return false;
635 }
636
637 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "send out..., streamType:%d, data size:%zd", streamType_, len);
638 return true;
639 }
640
SetOption(int type,const StreamAttr & value)641 bool VtpStreamSocket::SetOption(int type, const StreamAttr &value)
642 {
643 PrintOptionInfo(type, value);
644 auto it = optFuncMap_.find(type);
645 if (it == optFuncMap_.end()) {
646 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "not found type = %d", type);
647 return false;
648 }
649
650 if (value.GetType() != it->second.valueType) {
651 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN,
652 "type = %d, value.type = %d", value.GetType(), it->second.valueType);
653 return false;
654 }
655
656 MySetFunc set = it->second.set;
657 if (set == nullptr) {
658 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "set is nullptr");
659 return false;
660 }
661
662 if (type == NON_BLOCK || type == TOS) {
663 return (this->*set)(static_cast<int>(streamFd_), value);
664 }
665
666 auto outerIt = FILLP_TYPE_MAP.find(type);
667 if (outerIt != FILLP_TYPE_MAP.end()) {
668 return (this->*set)(outerIt->second, value);
669 }
670
671 auto innerIt = INNER_FILLP_TYPE_MAP.find(type);
672 if (innerIt != INNER_FILLP_TYPE_MAP.end()) {
673 return (this->*set)(innerIt->second, value);
674 }
675
676 return (this->*set)(static_cast<int>(type), value);
677 }
678
GetOption(int type) const679 StreamAttr VtpStreamSocket::GetOption(int type) const
680 {
681 StreamAttr attr {};
682 auto it = optFuncMap_.find(type);
683 if (it != optFuncMap_.end()) {
684 MyGetFunc get = it->second.get;
685 if (get == nullptr) {
686 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Can not get option:%d", type);
687 return std::move(StreamAttr());
688 }
689 if (type == NON_BLOCK) {
690 attr = (this->*get)(static_cast<int>(streamFd_));
691 } else {
692 attr = (this->*get)(static_cast<int>(type));
693 }
694 }
695
696 PrintOptionInfo(type, attr);
697 return attr;
698 }
699
SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)700 bool VtpStreamSocket::SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)
701 {
702 if (receiver == nullptr) {
703 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "receiver is nullptr");
704 return false;
705 }
706
707 std::lock_guard<std::mutex> guard(streamSocketLock_);
708 streamReceiver_ = receiver;
709 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "set receiver success");
710 return true;
711 }
712
InitVtpInstance(const std::string & pkgName)713 bool VtpStreamSocket::InitVtpInstance(const std::string &pkgName)
714 {
715 return vtpInstance_->InitVtp(pkgName);
716 }
717
DestroyVtpInstance(const std::string & pkgName)718 void VtpStreamSocket::DestroyVtpInstance(const std::string &pkgName)
719 {
720 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyVtpInstance start");
721 vtpInstance_->DestroyVtp(pkgName);
722 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyVtpInstance end");
723 }
724
CreateAndBindSocket(IpAndPort & local)725 int VtpStreamSocket::CreateAndBindSocket(IpAndPort &local)
726 {
727 localIpPort_ = local;
728 vtpInstance_->UpdateSocketStreamCount(true);
729 if (local.ip.empty()) {
730 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "ip is empty");
731 return -1;
732 }
733
734 int sockFd = FtSocket(AF_INET, SOCK_STREAM, IPPROTO_FILLP);
735 if (sockFd == -1) {
736 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtSocket failed, errorcode = %d", FtGetErrno());
737 return -1;
738 }
739
740 // bind
741 sockaddr_in localSockAddr = {0};
742 localSockAddr.sin_family = AF_INET;
743 localSockAddr.sin_port = htons((short)local.port);
744 localSockAddr.sin_addr.s_addr = inet_addr(local.ip.c_str());
745
746 socklen_t localAddrLen = sizeof(localSockAddr);
747 int ret = FtBind(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), localAddrLen);
748 if (ret == -1) {
749 FtClose(sockFd);
750 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtBind failed, errorcode %d", FtGetErrno());
751 return -1;
752 }
753
754 // 获取port
755 ret = FtGetSockName(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), &localAddrLen);
756 if (ret != ERR_OK) {
757 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "getsockname error ret: %d, errorcode :%d", ret, FtGetErrno());
758 FtClose(sockFd);
759 return -1;
760 }
761
762 char host[ADDR_MAX_SIZE];
763 localIpPort_.port = static_cast<int32_t>(ntohs(localSockAddr.sin_port));
764 localIpPort_.ip = SoftBusInetNtoP(AF_INET, &(localSockAddr.sin_addr), host, ADDR_MAX_SIZE);
765 local.port = localIpPort_.port;
766
767 if (!SetSocketBoundInner(sockFd, localIpPort_.ip)) {
768 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetSocketBoundInner failed, errorcode :%d", FtGetErrno());
769 }
770 return sockFd;
771 }
772
773
EnableBwEstimationAlgo(int streamFd,bool isServer) const774 bool VtpStreamSocket::EnableBwEstimationAlgo(int streamFd, bool isServer) const
775 {
776 #ifdef FILLP_SUPPORT_BW_DET
777 int errBwDet;
778 if (isServer) {
779 int32_t enableBwDet = FILLP_BW_DET_RX_ENABLE;
780 errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
781 &enableBwDet, sizeof(enableBwDet));
782 } else {
783 int32_t enableBwDet = FILLP_BW_DET_TX_ENABLE;
784 errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
785 &enableBwDet, sizeof(enableBwDet));
786 }
787 if (errBwDet < 0) {
788 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
789 "Fail to enable bandwidth estimation algorithm for stream: %d, errorcode = %d", streamFd, FtGetErrno());
790 return true;
791 } else {
792 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
793 "Success to enable bandwidth estimation algorithm for stream: %d", streamFd);
794 return false;
795 }
796 #else
797 return true;
798 #endif
799 }
800
EnableJitterDetectionAlgo(int streamFd) const801 bool VtpStreamSocket::EnableJitterDetectionAlgo(int streamFd) const
802 {
803 #ifdef FILLP_SUPPORT_CQE
804 int32_t enableCQE = FILLP_CQE_ENABLE;
805 auto errCQE = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_CQE_ALGO, &enableCQE, sizeof(enableCQE));
806 if (errCQE < 0) {
807 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
808 "Fail to enable CQE algorithm for stream: %d, errorcode = %d", streamFd, FtGetErrno());
809 return true;
810 } else {
811 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
812 "Success to enable CQE algorithm for stream: %d", streamFd);
813 return false;
814 }
815 #else
816 return true;
817 #endif
818 }
819
EnableDirectlySend(int streamFd) const820 bool VtpStreamSocket::EnableDirectlySend(int streamFd) const
821 {
822 int32_t enable = 1;
823 FILLP_INT ret = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_DIRECTLY_SEND, &enable, sizeof(enable));
824 if (ret < 0) {
825 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
826 "Fail to enable direct send for stream: %d, errorcode = %d", streamFd, FtGetErrno());
827 return false;
828 }
829 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Success to enable direct send for stream: %d", streamFd);
830 return true;
831 }
832
EnableSemiReliable(int streamFd) const833 bool VtpStreamSocket::EnableSemiReliable(int streamFd) const
834 {
835 int32_t enable = 1;
836 FILLP_INT ret = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SEMI_RELIABLE, &enable, sizeof(enable));
837 if (ret < 0) {
838 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
839 "Fail to enable direct send for stream: %d, errorcode = %d", streamFd, FtGetErrno());
840 return false;
841 }
842 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Success to enable direct send for stream: %d", streamFd);
843 return true;
844 }
845
RegisterMetricCallback(bool isServer)846 void VtpStreamSocket::RegisterMetricCallback(bool isServer)
847 {
848 VtpStreamSocket::AddStreamSocketLock(streamFd_, streamSocketLock_);
849 auto self = this->GetSelf();
850 VtpStreamSocket::AddStreamSocketListener(streamFd_, self);
851 int regStatisticsRet = FtApiRegEventCallbackFunc(FILLP_CONFIG_ALL_SOCKET, FillpStatistics);
852 int value = 1;
853 auto err = FtSetSockOpt(streamFd_, IPPROTO_FILLP, FILLP_SOCK_TRAFFIC, &value, sizeof(value));
854 if (err < 0) {
855 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "fail to set socket binding to device");
856 return;
857 }
858 SoftBusLog(SOFTBUS_LOG_LNN, SOFTBUS_LOG_INFO, "FtSetSockOpt start success");
859 if (isServer) {
860 if (regStatisticsRet == 0) {
861 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
862 "Success to register the stream callback function at server side with Fd = %d", streamFd_);
863 } else {
864 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
865 "Fail to register the stream callback function at server side with Fd = %d, errcode:%d",
866 streamFd_, regStatisticsRet);
867 }
868 } else {
869 if (regStatisticsRet == 0) {
870 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
871 "Success to register the stream callback function at client side with Fd = %d", streamFd_);
872 } else {
873 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
874 "Fail to register the stream callback function at client side with Fd = %d, errcode:%d",
875 streamFd_, regStatisticsRet);
876 }
877 }
878 }
879
Accept()880 bool VtpStreamSocket::Accept()
881 {
882 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "accept start");
883
884 auto fd = FtAccept(listenFd_, nullptr, nullptr);
885 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "accept streamFd:%d", fd);
886 if (fd == -1) {
887 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "errorcode = %d", FtGetErrno());
888 return false;
889 }
890
891 sockaddr remoteAddr {};
892 socklen_t remoteAddrLen = sizeof(remoteAddr);
893 auto ret = FtGetPeerName(fd, &remoteAddr, &remoteAddrLen);
894 if (ret != ERR_OK) {
895 FtClose(fd);
896 return false;
897 }
898
899 char host[ADDR_MAX_SIZE];
900 if (remoteAddr.sa_family == AF_INET) {
901 auto v4Addr = reinterpret_cast<const sockaddr_in *>(&remoteAddr);
902 remoteIpPort_.ip = SoftBusInetNtoP(AF_INET, &(v4Addr->sin_addr), host, ADDR_MAX_SIZE);
903 remoteIpPort_.port = v4Addr->sin_port;
904 } else {
905 auto v6Addr = reinterpret_cast<const sockaddr_in6 *>(&remoteAddr);
906 remoteIpPort_.ip = SoftBusInetNtoP(AF_INET6, &(v6Addr->sin6_addr), host, ADDR_MAX_SIZE);
907 remoteIpPort_.port = v6Addr->sin6_port;
908 }
909
910 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
911 "Accept a client(server port:%d)", remoteIpPort_.port);
912 SetDefaultConfig(fd);
913
914 if (SetSocketEpollMode(fd) != ERR_OK) {
915 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetSocketEpollMode failed, fd = %d", fd);
916 FtClose(fd);
917 return false;
918 }
919
920 std::lock_guard<std::mutex> guard(streamSocketLock_);
921 streamFd_ = fd;
922 configCv_.notify_all();
923
924 if (streamReceiver_ != nullptr) {
925 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "notify stream connected!");
926 streamReceiver_->OnStreamStatus(STREAM_CONNECTED);
927 }
928
929 bool isServer = true;
930 RegisterMetricCallback(isServer); /* register the callback function */
931 /* enable the bandwidth and CQE estimation algorithms for current ftsocket */
932 #ifdef FILLP_SUPPORT_BW_DET
933 EnableBwEstimationAlgo(streamFd_, isServer);
934 #endif
935 #ifdef FILLP_SUPPORT_CQE
936 EnableJitterDetectionAlgo(streamFd_);
937 #endif
938
939 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "accept success!");
940 return true;
941 }
942
EpollTimeout(int fd,int timeout)943 int VtpStreamSocket::EpollTimeout(int fd, int timeout)
944 {
945 struct SpungeEpollEvent events[MAX_EPOLL_NUM];
946 (void)memset_s(events, sizeof(events), 0, sizeof(events));
947 while (true) {
948 FILLP_INT fdNum = FtEpollWait(epollFd_, events, MAX_EPOLL_NUM, timeout);
949 if (fdNum <= 0) {
950 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
951 "FtEpollWait failed, ret = %d, errno = %d", fdNum, FtGetErrno());
952 return -FtGetErrno();
953 }
954
955 for (FILLP_INT i = 0; i < fdNum; i++) {
956 if (events[i].data.fd != fd) {
957 continue;
958 }
959
960 if (events[i].events & (SPUNGE_EPOLLHUP | SPUNGE_EPOLLERR)) {
961 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
962 "EpollTimeout, something may be wrong in this socket, fd = %d, events = %u", fd,
963 (unsigned int)events[i].events);
964 return -1;
965 }
966
967 if (events[i].events & SPUNGE_EPOLLIN) {
968 return 0;
969 }
970 }
971 }
972 }
973
SetSocketEpollMode(int fd)974 int VtpStreamSocket::SetSocketEpollMode(int fd)
975 {
976 if (!SetNonBlockMode(fd, StreamAttr(true))) {
977 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetNonBlockMode failed, errno = %d", FtGetErrno());
978 return -1;
979 }
980
981 struct SpungeEpollEvent event = {0};
982 event.events = SPUNGE_EPOLLIN;
983 event.data.fd = fd;
984
985 auto ret = FtEpollCtl(epollFd_, SPUNGE_EPOLL_CTL_ADD, fd, &event);
986 if (ret != ERR_OK) {
987 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtEpollCtl failed, ret = %d, errno = %d", ret, FtGetErrno());
988 return ret;
989 }
990
991 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "SetNonBlockMode success");
992 return 0;
993 }
994
InsertBufferLength(int num,int length,uint8_t * output) const995 void VtpStreamSocket::InsertBufferLength(int num, int length, uint8_t *output) const
996 {
997 for (int i = 0; i < length; i++) {
998 output[length - 1 - i] = static_cast<unsigned int>(
999 ((static_cast<unsigned int>(num) >> static_cast<unsigned int>(BYTE_TO_BIT * i))) & INT_TO_BYTE);
1000 }
1001 }
1002
MakeStreamData(StreamData & data,const StreamFrameInfo & info) const1003 std::unique_ptr<IStream> VtpStreamSocket::MakeStreamData(StreamData &data, const StreamFrameInfo &info) const
1004 {
1005 std::unique_ptr<IStream> stream = nullptr;
1006 switch (streamType_) {
1007 case VIDEO_SLICE_STREAM:
1008 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "do not support VIDEO_SLICE_STREAM type = %d", streamType_);
1009 break;
1010 case COMMON_VIDEO_STREAM:
1011 case COMMON_AUDIO_STREAM:
1012 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
1013 "streamType = %d, seqnum=%d, streamid=%d", streamType_, info.seqNum, info.streamId);
1014 stream = IStream::MakeCommonStream(data, info);
1015 break;
1016 case RAW_STREAM:
1017 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "streamType = %d", streamType_);
1018 stream = IStream::MakeRawStream(data, info);
1019 break;
1020 default:
1021 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "do not support type = %d", streamType_);
1022 break;
1023 }
1024 if (stream == nullptr) {
1025 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "IStream construct error");
1026 return nullptr;
1027 }
1028
1029 return stream;
1030 }
1031
RecvStreamLen()1032 int VtpStreamSocket::RecvStreamLen()
1033 {
1034 int hdrSize = FRAME_HEADER_LEN;
1035 if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1036 hdrSize = streamHdrSize_;
1037 }
1038
1039 int len = -1;
1040 int timeout = -1;
1041 auto buffer = std::make_unique<char[]>(hdrSize);
1042 if (EpollTimeout(streamFd_, timeout) == 0) {
1043 do {
1044 len = FtRecv(streamFd_, buffer.get(), hdrSize, 0);
1045 } while (len <= 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
1046 }
1047 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "recv frame header, len = %d, scene:%d", len, scene_);
1048
1049 if (len <= 0) {
1050 return -1;
1051 }
1052
1053 if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1054 std::lock_guard<std::mutex> guard(streamSocketLock_);
1055 if (streamReceiver_ != nullptr) {
1056 return streamReceiver_->OnStreamHdrReceived(std::move(buffer), hdrSize);
1057 }
1058 }
1059
1060 return ntohl(*reinterpret_cast<int *>(buffer.get()));
1061 }
1062
DoStreamRecv()1063 void VtpStreamSocket::DoStreamRecv()
1064 {
1065 while (isStreamRecv_) {
1066 std::unique_ptr<char[]> dataBuffer = nullptr;
1067 std::unique_ptr<char[]> extBuffer = nullptr;
1068 int extLen = 0;
1069 StreamFrameInfo info = {};
1070 int dataLength = 0;
1071 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "recv stream");
1072 dataLength = VtpStreamSocket::RecvStreamLen();
1073 if (dataLength <= 0 || dataLength > MAX_STREAM_LEN) {
1074 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "read frame length error, dataLength = %d", dataLength);
1075 break;
1076 }
1077 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
1078 "recv a new frame, dataLength = %d, stream type:%d", dataLength, streamType_);
1079 dataBuffer = VtpStreamSocket::RecvStream(dataLength);
1080
1081 if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
1082 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "recv common stream");
1083 int decryptedLength = dataLength;
1084 auto decryptedBuffer = std::move(dataBuffer);
1085
1086 int plainDataLength = decryptedLength - GetEncryptOverhead();
1087 std::unique_ptr<char[]> plainData = std::make_unique<char[]>(plainDataLength);
1088 ssize_t decLen = Decrypt(decryptedBuffer.get(), decryptedLength, plainData.get(), plainDataLength);
1089 if (decLen != plainDataLength) {
1090 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1091 "Decrypt failed, dataLength = %d, decryptedLen = %zd", plainDataLength, decLen);
1092 break;
1093 }
1094 auto header = plainData.get();
1095 StreamDepacketizer decode(streamType_);
1096 decode.DepacketizeHeader(header);
1097
1098 auto buffer = plainData.get() + sizeof(CommonHeader);
1099 decode.DepacketizeBuffer(buffer);
1100
1101 extBuffer = decode.GetUserExt();
1102 extLen = decode.GetUserExtSize();
1103 info.seqNum = decode.GetSeqNum();
1104 info.streamId = decode.GetStreamId();
1105 dataBuffer = decode.GetData();
1106 dataLength = decode.GetDataLength();
1107 if (dataLength <= 0) {
1108 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1109 "common depacketize error, dataLength = %d", dataLength);
1110 break;
1111 }
1112 }
1113
1114 StreamData data = { std::move(dataBuffer), dataLength, std::move(extBuffer), extLen };
1115 std::unique_ptr<IStream> stream = MakeStreamData(data, info);
1116 if (stream == nullptr) {
1117 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "MakeStreamData failed, stream == nullptr");
1118 break;
1119 }
1120
1121 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
1122 "recv frame done, dataLength = %d, stream type:%d", dataLength, streamType_);
1123
1124 if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1125 std::lock_guard<std::mutex> guard(streamSocketLock_);
1126 if (streamReceiver_ != nullptr) {
1127 streamReceiver_->OnStreamReceived(std::move(stream));
1128 continue;
1129 }
1130 }
1131
1132 PutStream(std::move(stream));
1133 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
1134 "put frame done, dataLength = %d, stream type:%d", dataLength, streamType_);
1135 }
1136 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "recv thread exit");
1137 }
1138
RecvStream(int dataLength)1139 std::unique_ptr<char[]> VtpStreamSocket::RecvStream(int dataLength)
1140 {
1141 auto buffer = std::make_unique<char[]>(dataLength);
1142 int recvLen = 0;
1143 while (recvLen < dataLength) {
1144 int ret = -1;
1145 int timeout = -1;
1146
1147 if (EpollTimeout(streamFd_, timeout) == 0) {
1148 do {
1149 ret = FtRecv(streamFd_, (buffer.get() + recvLen), dataLength - recvLen, 0);
1150 } while (ret < 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
1151 }
1152
1153 if (ret == -1) {
1154 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "read frame failed, errno: %d", FtGetErrno());
1155 return nullptr;
1156 }
1157
1158 recvLen += ret;
1159 }
1160 return std::unique_ptr<char[]>(buffer.release());
1161 }
1162
SetDefaultConfig(int fd)1163 void VtpStreamSocket::SetDefaultConfig(int fd)
1164 {
1165 if (!SetIpTos(fd, StreamAttr(static_cast<int>(IPTOS_LOWDELAY)))) {
1166 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "SetIpTos failed");
1167 }
1168
1169 if (!EnableDirectlySend(fd)) {
1170 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "EnableDirectlySend failed");
1171 }
1172
1173 if (!EnableSemiReliable(fd)) {
1174 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "EnableSemiReliable failed");
1175 }
1176
1177 FILLP_BOOL enable = 1;
1178 if (!FtConfigSet(FT_CONF_APP_DIFFER_TRANSMIT, &enable, &fd)) {
1179 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Set differ transmit failed");
1180 }
1181
1182 if (!SetOption(RECV_BUF_SIZE, StreamAttr(static_cast<int>(DEFAULT_UDP_BUFFER_RCV_SIZE)))) {
1183 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Set recv buff failed");
1184 }
1185
1186 if (!SetOption(SEND_BUF_SIZE, StreamAttr(static_cast<int>(DEFAULT_UDP_BUFFER_SIZE)))) {
1187 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Set send buff failed");
1188 }
1189
1190 if (!SetOption(InnerStreamOptionType::RECV_CACHE, StreamAttr(static_cast<int>(FILLP_VTP_RECV_CACHE_SIZE)))) {
1191 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Set recv cache failed");
1192 }
1193
1194 if (!SetOption(InnerStreamOptionType::SEND_CACHE, StreamAttr(static_cast<int>(FILLP_VTP_SEND_CACHE_SIZE)))) {
1195 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Set send cache failed");
1196 }
1197 }
1198
SetIpTos(int fd,const StreamAttr & tos)1199 bool VtpStreamSocket::SetIpTos(int fd, const StreamAttr &tos)
1200 {
1201 auto tmp = tos.GetIntValue();
1202 if (FtSetSockOpt(fd, IPPROTO_IP, IP_TOS, &tmp, sizeof(tmp)) != ERR_OK) {
1203 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetIpTos wrong! fd=%d, errorcode=%d", fd, FtGetErrno());
1204 return false;
1205 }
1206
1207 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Success to set ip tos: fd=%d, tos=%d", fd, tmp);
1208 return true;
1209 }
1210
GetIpTos(int type) const1211 StreamAttr VtpStreamSocket::GetIpTos(int type) const
1212 {
1213 static_cast<void>(type);
1214 int tos;
1215 int size = sizeof(tos);
1216
1217 if (FtGetSockOpt(streamFd_, IPPROTO_IP, IP_TOS, &tos, &size) != ERR_OK) {
1218 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtGetSockOpt errorcode = %d", FtGetErrno());
1219 return std::move(StreamAttr());
1220 }
1221
1222 return std::move(StreamAttr(tos));
1223 }
1224
GetStreamSocketFd(int type) const1225 StreamAttr VtpStreamSocket::GetStreamSocketFd(int type) const
1226 {
1227 static_cast<void>(type);
1228 return std::move(StreamAttr(streamFd_));
1229 }
1230
GetListenSocketFd(int type) const1231 StreamAttr VtpStreamSocket::GetListenSocketFd(int type) const
1232 {
1233 static_cast<void>(type);
1234 return std::move(StreamAttr(listenFd_));
1235 }
1236
SetSocketBoundInner(int fd,std::string ip) const1237 bool VtpStreamSocket::SetSocketBoundInner(int fd, std::string ip) const
1238 {
1239 auto boundIp = (ip == "") ? localIpPort_.ip : ip;
1240 struct ifaddrs *ifList = nullptr;
1241 if (getifaddrs(&ifList) < 0) {
1242 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1243 "get interface address return error %d (%s)", errno, strerror(errno));
1244 return false;
1245 }
1246
1247 struct ifaddrs *ifa = nullptr;
1248 for (ifa = ifList; ifa != nullptr; ifa = ifa->ifa_next) {
1249 if (ifa->ifa_addr == nullptr) {
1250 continue;
1251 }
1252 if (ifa->ifa_addr->sa_family != AF_INET) {
1253 continue;
1254 }
1255
1256 char host[ADDR_MAX_SIZE];
1257 std::string devName(ifa->ifa_name);
1258 if (strcmp(boundIp.c_str(), SoftBusInetNtoP(AF_INET, &(((struct sockaddr_in *)ifa->ifa_addr)->sin_addr),
1259 host, ADDR_MAX_SIZE)) == 0) {
1260 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "current use interface %s to bind to socket", ifa->ifa_name);
1261 auto err = FtSetSockOpt(fd, SOL_SOCKET, SO_BINDTODEVICE, devName.c_str(), devName.size());
1262 if (err < 0) {
1263 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "fail to set socket binding to device");
1264 freeifaddrs(ifList);
1265 return false;
1266 }
1267 break;
1268 }
1269 }
1270 freeifaddrs(ifList);
1271
1272 return true;
1273 }
1274
SetSocketBindToDevices(int type,const StreamAttr & ip)1275 bool VtpStreamSocket::SetSocketBindToDevices(int type, const StreamAttr &ip)
1276 {
1277 static_cast<void>(type);
1278 auto tmp = ip.GetStrValue();
1279 auto boundIp = (tmp == "") ? localIpPort_.ip : tmp;
1280 return SetSocketBoundInner(streamFd_, boundIp);
1281 }
1282
SetVtpStackConfigDelayed(int type,const StreamAttr & value)1283 bool VtpStreamSocket::SetVtpStackConfigDelayed(int type, const StreamAttr &value)
1284 {
1285 std::unique_lock<std::mutex> lock(streamSocketLock_);
1286 if (streamFd_ == -1) {
1287 configCv_.wait(lock, [this] { return streamFd_ != -1; });
1288 }
1289 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "set vtp stack config, streamFd = %d", streamFd_);
1290 return SetVtpStackConfig(type, value);
1291 }
1292
SetVtpStackConfig(int type,const StreamAttr & value)1293 bool VtpStreamSocket::SetVtpStackConfig(int type, const StreamAttr &value)
1294 {
1295 if (streamFd_ == -1) {
1296 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "set vtp stack config when streamFd is legal");
1297 auto self = GetSelf();
1298 std::thread([self, type, value]() { self->SetVtpStackConfigDelayed(type, value); }).detach();
1299 return true;
1300 }
1301
1302 if (value.GetType() == INT_TYPE) {
1303 int intVal = value.GetIntValue();
1304 int ret = FtConfigSet(type, &intVal, &streamFd_);
1305 if (ret != 0) {
1306 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1307 "FtConfigSet failed, type = %d, errorcode = %d", type, FtGetErrno());
1308 return false;
1309 }
1310
1311 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
1312 "setVtpConfig(%d) success, fd= %d, value= %d", type, streamFd_, intVal);
1313 return true;
1314 }
1315
1316 if (value.GetType() == BOOL_TYPE) {
1317 bool flag = value.GetBoolValue();
1318 int ret = FtConfigSet(type, &flag, &streamFd_);
1319 if (ret != 0) {
1320 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1321 "FtConfigSet failed, type = %d, errorcode = %d", type, FtGetErrno());
1322 return false;
1323 }
1324
1325 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
1326 "setVtpConfig(%d) success, fd= %d, value= %d", type, streamFd_, flag);
1327 return true;
1328 }
1329
1330 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "UNKNOWN TYPE!");
1331 return false;
1332 }
1333
GetVtpStackConfig(int type) const1334 StreamAttr VtpStreamSocket::GetVtpStackConfig(int type) const
1335 {
1336 int intVal = -1;
1337 int configFd = (streamFd_ == -1) ? FILLP_CONFIG_ALL_SOCKET : streamFd_;
1338 int ret = FtConfigGet(type, &intVal, &configFd);
1339 if (ret != 0) {
1340 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1341 "FtConfigGet failed, type = %d, errorcode = %d", type, FtGetErrno());
1342 return std::move(StreamAttr());
1343 }
1344
1345 int valType = ValueType::UNKNOWN;
1346 for (auto it = FILLP_TYPE_MAP.begin(); it != FILLP_TYPE_MAP.end(); it++) {
1347 if (it->second != type) {
1348 continue;
1349 }
1350
1351 valType = optFuncMap_.at(it->first).valueType;
1352 break;
1353 }
1354
1355 if (valType != ValueType::UNKNOWN) {
1356 for (auto it = INNER_FILLP_TYPE_MAP.begin(); it != INNER_FILLP_TYPE_MAP.end(); it++) {
1357 if (it->second != type) {
1358 continue;
1359 }
1360
1361 valType = optFuncMap_.at(it->first).valueType;
1362 break;
1363 }
1364 }
1365
1366 if (valType == BOOL_TYPE) {
1367 return std::move(StreamAttr(!!intVal));
1368 }
1369
1370 return std::move(StreamAttr(intVal));
1371 }
1372
SetNonBlockMode(int fd,const StreamAttr & value)1373 bool VtpStreamSocket::SetNonBlockMode(int fd, const StreamAttr &value)
1374 {
1375 FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1376 if (flags < 0) {
1377 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "failed to get FtFcntl, flags = %d", flags);
1378 flags = 0;
1379 }
1380 bool nonBlock = value.GetBoolValue();
1381
1382 flags = nonBlock ? static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) | O_NONBLOCK)) :
1383 static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) & ~O_NONBLOCK));
1384
1385 FILLP_INT res = FtFcntl(fd, F_SETFL, flags);
1386 if (res < 0) {
1387 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "failed to set FtFcntl, res = %d", res);
1388 return false;
1389 }
1390
1391 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Successfully to set fd(%d) nonBlock mode = %d", fd, nonBlock);
1392 return true;
1393 }
1394
GetNonBlockMode(int fd) const1395 StreamAttr VtpStreamSocket::GetNonBlockMode(int fd) const
1396 {
1397 FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1398 if (static_cast<unsigned int>(flags) & O_NONBLOCK) {
1399 return std::move(StreamAttr(true));
1400 }
1401
1402 return std::move(StreamAttr(false));
1403 }
1404
GetIp(int type) const1405 StreamAttr VtpStreamSocket::GetIp(int type) const
1406 {
1407 if (type == LOCAL_IP) {
1408 return std::move(StreamAttr(localIpPort_.ip));
1409 }
1410
1411 return std::move(StreamAttr(remoteIpPort_.ip));
1412 }
1413
GetPort(int type) const1414 StreamAttr VtpStreamSocket::GetPort(int type) const
1415 {
1416 if (type == LOCAL_PORT) {
1417 return std::move(StreamAttr(localIpPort_.port));
1418 }
1419 return std::move(StreamAttr(remoteIpPort_.port));
1420 }
1421
SetStreamType(int type,const StreamAttr & value)1422 bool VtpStreamSocket::SetStreamType(int type, const StreamAttr &value)
1423 {
1424 if (type != STREAM_TYPE_INT) {
1425 return false;
1426 }
1427
1428 streamType_ = value.GetIntValue();
1429 return true;
1430 }
1431
GetStreamType(int type) const1432 StreamAttr VtpStreamSocket::GetStreamType(int type) const
1433 {
1434 if (type != STREAM_TYPE_INT) {
1435 return std::move(StreamAttr());
1436 }
1437
1438 return std::move(StreamAttr(streamType_));
1439 }
1440
SetStreamScene(int type,const StreamAttr & value)1441 bool VtpStreamSocket::SetStreamScene(int type, const StreamAttr &value)
1442 {
1443 static_cast<void>(type);
1444 if (value.GetType() != INT_TYPE) {
1445 return false;
1446 }
1447 scene_ = value.GetIntValue();
1448 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Set scene to %d", scene_);
1449 return true;
1450 }
1451
SetStreamHeaderSize(int type,const StreamAttr & value)1452 bool VtpStreamSocket::SetStreamHeaderSize(int type, const StreamAttr &value)
1453 {
1454 static_cast<void>(type);
1455 if (value.GetType() != INT_TYPE) {
1456 return false;
1457 }
1458 streamHdrSize_ = value.GetIntValue();
1459 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Set header size to %d", streamHdrSize_);
1460 return true;
1461 }
1462
NotifyStreamListener()1463 void VtpStreamSocket::NotifyStreamListener()
1464 {
1465 while (isStreamRecv_) {
1466 int streamNum = GetStreamNum();
1467 if (streamNum >= STREAM_BUFFER_THRESHOLD) {
1468 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Too many data in receiver, num = %d", streamNum);
1469 }
1470
1471 auto stream = TakeStream();
1472 if (stream == nullptr) {
1473 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Pop stream failed");
1474 break;
1475 }
1476
1477 std::lock_guard<std::mutex> guard(streamSocketLock_);
1478 if (streamReceiver_ != nullptr) {
1479 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "notify listener");
1480 streamReceiver_->OnStreamReceived(std::move(stream));
1481 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "notify listener done.");
1482 }
1483 }
1484 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "notify thread exit");
1485 }
1486
GetEncryptOverhead() const1487 ssize_t VtpStreamSocket::GetEncryptOverhead() const
1488 {
1489 return OVERHEAD_LEN;
1490 }
1491
Encrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1492 ssize_t VtpStreamSocket::Encrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1493 {
1494 AesGcmCipherKey cipherKey = {0};
1495
1496 if (inLen - OVERHEAD_LEN > outLen) {
1497 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Encrypt invalid para.");
1498 return SOFTBUS_ERR;
1499 }
1500
1501 cipherKey.keyLen = SESSION_KEY_LENGTH;
1502 if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.first, sessionKey_.second) != EOK) {
1503 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "memcpy key error.");
1504 return SOFTBUS_ERR;
1505 }
1506
1507 int ret = SoftBusEncryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1508 (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1509 if (ret != SOFTBUS_OK || outLen != inLen + OVERHEAD_LEN) {
1510 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Encrypt Data fail. %d", ret);
1511 return SOFTBUS_ENCRYPT_ERR;
1512 }
1513 return outLen;
1514 }
1515
Decrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1516 ssize_t VtpStreamSocket::Decrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1517 {
1518 AesGcmCipherKey cipherKey = {0};
1519
1520 if (inLen - OVERHEAD_LEN > outLen) {
1521 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Decrypt invalid para.");
1522 return SOFTBUS_ERR;
1523 }
1524
1525 cipherKey.keyLen = SESSION_KEY_LENGTH; // 256 bit encryption
1526 if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.first, sessionKey_.second) != EOK) {
1527 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "memcpy key error.");
1528 return SOFTBUS_ERR;
1529 }
1530 int ret = SoftBusDecryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1531 (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1532 if (ret != SOFTBUS_OK) {
1533 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Decrypt Data fail. %d ", ret);
1534 return SOFTBUS_DECRYPT_ERR;
1535 }
1536
1537 return outLen;
1538 }
1539 } // namespace SoftBus
1540 } // namespace Communication
1541