• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "vtp_stream_socket.h"
17 
18 #include <chrono>
19 #include <ifaddrs.h>
20 #include <memory>
21 #include <netinet/in.h>
22 #include <securec.h>
23 #include <sys/socket.h>
24 #include <sys/time.h>
25 #include <thread>
26 
27 #include "fillpinc.h"
28 #include "raw_stream_data.h"
29 #include "session.h"
30 #include "softbus_adapter_crypto.h"
31 #include "softbus_errcode.h"
32 #include "softbus_log.h"
33 #include "stream_depacketizer.h"
34 #include "stream_packetizer.h"
35 
36 namespace Communication {
37 namespace SoftBus {
38 bool g_logOn = false;
39 const int FEED_BACK_PERIOD = 1;  /* feedback period of fillp stream traffic statistics is 1s */
40 const int MS_PER_SECOND = 1000;
41 const int US_PER_MS = 1000;
42 
43 namespace {
PrintOptionInfo(int type,const StreamAttr & value)44 void PrintOptionInfo(int type, const StreamAttr &value)
45 {
46     switch (value.GetType()) {
47         case INT_TYPE:
48             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
49                 "Int option: type:%d, value:%d", type, value.GetIntValue());
50             break;
51         case BOOL_TYPE:
52             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
53                 "Bool option: type:%d, value:%d", type, value.GetBoolValue());
54             break;
55         case STRING_TYPE:
56             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
57                 "String option: type:%d, value:%s", type, value.GetStrValue().c_str());
58             break;
59         default:
60             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Wrong StreamAttr!");
61             (void)type;
62     }
63 }
64 } // namespace
65 std::shared_ptr<VtpInstance> VtpStreamSocket::vtpInstance_ = VtpInstance::GetVtpInstance();
66 
67 std::map<int, std::mutex &> VtpStreamSocket::g_streamSocketLockMap;
68 std::map<int, std::shared_ptr<IStreamSocketListener>> VtpStreamSocket::g_streamReceiverMap;
69 
AddStreamSocketLock(int fd,std::mutex & streamsocketlock)70 void VtpStreamSocket::AddStreamSocketLock(int fd, std::mutex &streamsocketlock)
71 {
72     if (!g_streamSocketLockMap.empty() && g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
73         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "streamsocketlock for fd = %d already exists", fd);
74         return;
75     }
76 
77     g_streamSocketLockMap.emplace(std::pair<int, std::mutex &>(fd, streamsocketlock));
78 }
79 
AddStreamSocketListener(int fd,std::shared_ptr<IStreamSocketListener> streamreceiver)80 void VtpStreamSocket::AddStreamSocketListener(int fd, std::shared_ptr<IStreamSocketListener> streamreceiver)
81 {
82     if (!g_streamReceiverMap.empty() && g_streamReceiverMap.find(fd) != g_streamReceiverMap.end()) {
83         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "streamreceiver for fd = %d already exists", fd);
84         return;
85     }
86 
87     g_streamReceiverMap.emplace(std::pair<int, std::shared_ptr<IStreamSocketListener>>(fd, streamreceiver));
88 }
89 
RemoveStreamSocketLock(int fd)90 void VtpStreamSocket::RemoveStreamSocketLock(int fd)
91 {
92     if (g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
93         g_streamSocketLockMap.erase(fd);
94         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Remove streamsocketlock for fd = %d success", fd);
95     } else {
96         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
97             "Streamsocketlock for fd = %d not exist in the map", fd);
98     }
99     return;
100 }
101 
RemoveStreamSocketListener(int fd)102 void VtpStreamSocket::RemoveStreamSocketListener(int fd)
103 {
104     if (g_streamReceiverMap.find(fd) != g_streamReceiverMap.end()) {
105         g_streamReceiverMap.erase(fd);
106         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Remove streamreceiver for fd = %d success", fd);
107     } else {
108         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Streamreceiver for fd = %d not exist in the map", fd);
109     }
110     return;
111 }
112 
InsertElementToFuncMap(int type,ValueType valueType,MySetFunc set,MyGetFunc get)113 void VtpStreamSocket::InsertElementToFuncMap(int type, ValueType valueType, MySetFunc set, MyGetFunc get)
114 {
115     OptionFunc fun = {
116         valueType, set, get
117     };
118     optFuncMap_.insert(std::pair<int, OptionFunc>(type, fun));
119 }
120 
VtpStreamSocket()121 VtpStreamSocket::VtpStreamSocket()
122 {
123     InsertElementToFuncMap(TOS, INT_TYPE, &VtpStreamSocket::SetIpTos, &VtpStreamSocket::GetIpTos);
124     InsertElementToFuncMap(FD, INT_TYPE, nullptr, &VtpStreamSocket::GetStreamSocketFd);
125     InsertElementToFuncMap(SERVER_FD, INT_TYPE, nullptr, &VtpStreamSocket::GetListenSocketFd);
126     InsertElementToFuncMap(LOCAL_IP, INT_TYPE, nullptr, &VtpStreamSocket::GetIp);
127     InsertElementToFuncMap(LOCAL_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
128     InsertElementToFuncMap(REMOTE_IP, STRING_TYPE, nullptr, &VtpStreamSocket::GetIp);
129     InsertElementToFuncMap(REMOTE_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
130     InsertElementToFuncMap(BOUND_INTERFACE_IP, STRING_TYPE, &VtpStreamSocket::SetSocketBindToDevices, nullptr);
131     InsertElementToFuncMap(IP_TYPE, STRING_TYPE, nullptr, &VtpStreamSocket::GetIpType);
132     InsertElementToFuncMap(REMOTE_SCOPE_ID, INT_TYPE, nullptr, &VtpStreamSocket::GetRemoteScopeId);
133     InsertElementToFuncMap(NON_BLOCK, BOOL_TYPE, &VtpStreamSocket::SetNonBlockMode, &VtpStreamSocket::GetNonBlockMode);
134     InsertElementToFuncMap(KEEP_ALIVE_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig, nullptr);
135     InsertElementToFuncMap(SEND_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
136         &VtpStreamSocket::GetVtpStackConfig);
137     InsertElementToFuncMap(RECV_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
138         &VtpStreamSocket::GetVtpStackConfig);
139     InsertElementToFuncMap(SEND_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
140         &VtpStreamSocket::GetVtpStackConfig);
141     InsertElementToFuncMap(RECV_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
142         &VtpStreamSocket::GetVtpStackConfig);
143     InsertElementToFuncMap(PACKET_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
144         &VtpStreamSocket::GetVtpStackConfig);
145     InsertElementToFuncMap(MAX_VTP_SOCKET_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
146         &VtpStreamSocket::GetVtpStackConfig);
147     InsertElementToFuncMap(MAX_VTP_CONNECT_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
148         &VtpStreamSocket::GetVtpStackConfig);
149     InsertElementToFuncMap(REDUNANCY_SWITCH, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
150         &VtpStreamSocket::GetVtpStackConfig);
151     InsertElementToFuncMap(REDUNANCY_LEVEL, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
152         &VtpStreamSocket::GetVtpStackConfig);
153     InsertElementToFuncMap(NACK_DELAY, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
154         &VtpStreamSocket::GetVtpStackConfig);
155     InsertElementToFuncMap(NACK_DELAY_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
156         &VtpStreamSocket::GetVtpStackConfig);
157     InsertElementToFuncMap(PACK_INTERVAL_ENLARGE, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
158         &VtpStreamSocket::GetVtpStackConfig);
159     InsertElementToFuncMap(STREAM_TYPE_INT, INT_TYPE, &VtpStreamSocket::SetStreamType, &VtpStreamSocket::GetStreamType);
160     InsertElementToFuncMap(IS_SERVER, INT_TYPE, nullptr, &VtpStreamSocket::IsServer);
161     InsertElementToFuncMap(SCENE, INT_TYPE, &VtpStreamSocket::SetStreamScene, nullptr);
162     InsertElementToFuncMap(STREAM_HEADER_SIZE, INT_TYPE, &VtpStreamSocket::SetStreamHeaderSize, nullptr);
163 
164     scene_ = UNKNOWN_SCENE;
165 }
166 
~VtpStreamSocket()167 VtpStreamSocket::~VtpStreamSocket()
168 {
169     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "~VtpStreamSocket");
170 }
171 
GetSelf()172 std::shared_ptr<VtpStreamSocket> VtpStreamSocket::GetSelf()
173 {
174     return shared_from_this();
175 }
176 
177 /* This function is used to prompt the metrics returned by FtApiRegEventCallbackFunc() function */
FillpBwAndJitterStatistics(int fd,const FtEventCbkInfo * info)178 int VtpStreamSocket::FillpBwAndJitterStatistics(int fd, const FtEventCbkInfo *info)
179 {
180 #if (defined(FILLP_SUPPORT_BW_DET) && defined(FILLP_SUPPORT_BW_DET))
181     if (info->evt == FT_EVT_BW_DET || info->evt == FT_EVT_JITTER_DET) {
182         int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
183         int32_t tvCount = 1;
184         QosTv metricList = {};
185 
186         if (info->evt == FT_EVT_BW_DET) {
187             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
188                 "[Metric Return]: Fillp bandwidth information of socket(%d) is returned", fd);
189             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
190                 "[Metric Return]: Changed amount of current available bandwidth is: %d", info->info.bwInfo.bwStat);
191             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
192                 "[Metric Return]: Current bandwidth for receiving data is: %d kbps", info->info.bwInfo.rate);
193             metricList.type = BANDWIDTH_ESTIMATE_VALUE;
194             metricList.info.bandwidthInfo.trend = info->info.bwInfo.bwStat;
195             metricList.info.bandwidthInfo.rate = info->info.bwInfo.rate;
196         }
197         if (info->evt == FT_EVT_JITTER_DET) {
198             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
199                 "[Metric Return]: Fillp connection quality information of socket(%d) is returned", fd);
200             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
201                 "[Metric Return]: Predeicted network condition is: %d", info->info.jitterInfo.jitterLevel);
202             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
203                 "[Metric Return]: Current available receiving buffer is: %d ms", info->info.jitterInfo.bufferTime);
204             metricList.type = JITTER_DETECTION_VALUE;
205             metricList.info.jitterInfo.jitterLevel = info->info.jitterInfo.jitterLevel;
206             metricList.info.jitterInfo.bufferTime = info->info.jitterInfo.bufferTime;
207         }
208         metricList.info.wifiChannelInfo = {};
209         metricList.info.frameStatusInfo = {};
210 
211         auto itLock = g_streamSocketLockMap.find(fd);
212         if (itLock != g_streamSocketLockMap.end()) {
213             auto itListener = g_streamReceiverMap.find(fd);
214             if (itListener != g_streamReceiverMap.end()) {
215                 std::thread([itListener, eventId, tvCount, metricList, &itLock]() {
216                     std::lock_guard<std::mutex> guard(itLock->second);
217                     itListener->second->OnQosEvent(eventId, tvCount, &metricList);
218                 }).detach();
219             } else {
220                 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "StreamReceiver for fd = %d is empty in the map", fd);
221             }
222         } else {
223             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "StreamSocketLock for fd = %d is empty in the map", fd);
224         }
225     } else {
226         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
227             "[Metric Return]: Fail to retrieve bandwidth and connection quality information");
228         return -1;
229     }
230 #endif
231     return 0;
232 }
233 
FillpAppStatistics()234 void VtpStreamSocket::FillpAppStatistics()
235 {
236     int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
237     int32_t tvCount = 1;
238     QosTv metricList = {};
239     FillpStatisticsPcb fillpPcbStats = {};
240     struct timeval fillpStatsGetTime = {};
241 
242     int getStatisticsRet = FtFillpStatsGet(streamFd_, &fillpPcbStats);
243     gettimeofday(&fillpStatsGetTime, nullptr);
244     if (getStatisticsRet == 0) {
245         metricList.type = STREAM_TRAFFIC_STASTICS;
246         metricList.info.appStatistics.statisticsGotTime = static_cast<uint64_t>((fillpStatsGetTime.tv_sec *
247             MS_PER_SECOND + fillpStatsGetTime.tv_usec / US_PER_MS)); /* ms */
248         metricList.info.appStatistics.periodRecvBits =
249             static_cast<uint64_t>(fillpPcbStats.appFcStastics.periodRecvBits);
250         metricList.info.appStatistics.pktNum = fillpPcbStats.appFcStastics.pktNum;
251         metricList.info.appStatistics.periodRecvPkts = fillpPcbStats.appFcStastics.periodRecvPkts;
252         metricList.info.appStatistics.periodRecvPktLoss = fillpPcbStats.appFcStastics.periodRecvPktLoss;
253         metricList.info.appStatistics.periodRecvRate = fillpPcbStats.appFcStastics.periodRecvRate;
254         metricList.info.appStatistics.periodRecvRateBps = fillpPcbStats.appFcStastics.periodRecvRateBps;
255         metricList.info.appStatistics.periodRtt = fillpPcbStats.appFcStastics.periodRtt;
256         metricList.info.appStatistics.periodRecvPktLossHighPrecision =
257             fillpPcbStats.appFcStastics.periodRecvPktLossHighPrecision;
258         metricList.info.appStatistics.periodSendLostPkts = fillpPcbStats.appFcStastics.periodSendLostPkts;
259         metricList.info.appStatistics.periodSendPkts = fillpPcbStats.appFcStastics.periodSendPkts;
260         metricList.info.appStatistics.periodSendPktLossHighPrecision =
261             fillpPcbStats.appFcStastics.periodSendPktLossHighPrecision;
262         metricList.info.appStatistics.periodSendBits = fillpPcbStats.appFcStastics.periodSendBits;
263         metricList.info.appStatistics.periodSendRateBps = fillpPcbStats.appFcStastics.periodSendRateBps;
264 
265         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
266             "Succeed to get fillp statistics information for streamfd = %d", streamFd_);
267         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
268             "[Metric Return]: periodRtt is: %d", fillpPcbStats.appFcStastics.periodRtt);
269 
270         std::lock_guard<std::mutex> guard(streamSocketLock_);
271 
272         if (streamReceiver_ != nullptr) {
273             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
274                 "[Metric Notify]: Fillp traffic statistics information of socket(%d) is notified", streamFd_);
275             streamReceiver_->OnQosEvent(eventId, tvCount, &metricList);
276         } else {
277             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "StreamReceiver for fd = %d is empty", streamFd_);
278         }
279     } else {
280         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
281             "Fail to get fillp statistics information for streamfd = %d, errorcode = %d", streamFd_, FtGetErrno());
282     }
283 }
284 
CreateClient(IpAndPort & local,int streamType,const std::string & sessionKey)285 bool VtpStreamSocket::CreateClient(IpAndPort &local, int streamType, const std::string &sessionKey)
286 {
287     int fd = CreateAndBindSocket(local);
288     if (fd == -1) {
289         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "CreateAndBindSocket failed, errorcode:%d", FtGetErrno());
290         DestroyStreamSocket();
291         return false;
292     }
293 
294     sessionKey_ = sessionKey;
295     streamType_ = streamType;
296     std::lock_guard<std::mutex> guard(streamSocketLock_);
297     streamFd_ = fd;
298     configCv_.notify_all();
299 
300     SetDefaultConfig(fd);
301     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
302         "Success to create a client socket(%d) of stream type(%d)", fd, streamType);
303     return true;
304 }
305 
CreateClient(IpAndPort & local,const IpAndPort & remote,int streamType,const std::string & sessionKey)306 bool VtpStreamSocket::CreateClient(IpAndPort &local, const IpAndPort &remote, int streamType,
307     const std::string &sessionKey)
308 {
309     if (!CreateClient(local, streamType, sessionKey)) {
310         return false;
311     }
312     /* enable the bandwidth and CQE estimation algorithms by FtSetSockOpt() for current ftsocket */
313 #ifdef FILLP_SUPPORT_BW_DET
314     bool isServer = false;
315     EnableBwEstimationAlgo(streamFd_, isServer);
316 #endif
317 
318     bool connectRet = Connect(remote);
319 #ifdef FILLP_SUPPORT_BW_DET
320     if (connectRet == true) {
321         bool isServer = false;
322         RegisterMetricCallback(isServer); /* register the callback function */
323     }
324 #endif
325     return connectRet;
326 }
327 
CreateServer(IpAndPort & local,int streamType,const std::string & sessionKey)328 bool VtpStreamSocket::CreateServer(IpAndPort &local, int streamType, const std::string &sessionKey)
329 {
330     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "CreateVtpServer start");
331     listenFd_ = CreateAndBindSocket(local);
332     if (listenFd_ == -1) {
333         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "create listenFd failed, errorcode %d", FtGetErrno());
334         DestroyStreamSocket();
335         return false;
336     }
337 
338     bool ret = FtListen(listenFd_, MAX_CONNECTION_VALUE);
339     if (ret != 0) {
340         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtListen failed, ret :%d errorcode %d", ret, FtGetErrno());
341         DestroyStreamSocket();
342         return false;
343     }
344 
345     epollFd_ = FtEpollCreate();
346     if (epollFd_ < 0) {
347         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Failed to create epoll fd:%d", FtGetErrno());
348         DestroyStreamSocket();
349         return false;
350     }
351     isStreamRecv_ = true;
352     streamType_ = streamType;
353     sessionKey_ = sessionKey;
354     auto self = this->GetSelf();
355     std::thread([self]() { self->NotifyStreamListener(); }).detach();
356 
357     std::thread([self]() {
358         if (!self->Accept()) {
359             self->DestroyStreamSocket();
360             return;
361         }
362         self->DoStreamRecv();
363         self->DestroyStreamSocket();
364     }).detach();
365 
366     bool &isDestroyed = isDestroyed_;
367     std::thread([self, &isDestroyed]() {
368         while (isDestroyed == false) {
369             self->FillpAppStatistics();
370             std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
371         }
372     }).detach();
373 
374     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
375         "CreateServer end, listenFd:%d, epollFd:%d, streamType:%d", listenFd_, epollFd_, streamType_);
376     return true;
377 }
378 
DestroyStreamSocket()379 void VtpStreamSocket::DestroyStreamSocket()
380 {
381     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyStreamSocket start");
382     std::lock_guard<std::mutex> guard(streamSocketLock_);
383     if (isDestroyed_) {
384         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "StreamSocket is already destroyed");
385         return;
386     }
387     if (listenFd_ != -1) {
388         FtClose(listenFd_);
389         listenFd_ = -1;
390     }
391 
392     if (streamFd_ != -1) {
393         RemoveStreamSocketLock(streamFd_); /* remove the socket lock from the map */
394         RemoveStreamSocketListener(streamFd_); /* remove the socket listener from the map */
395         FtClose(streamFd_);
396         streamFd_ = -1;
397     }
398 
399     if (epollFd_ != -1) {
400         FtClose(epollFd_);
401         epollFd_ = -1;
402     }
403 
404     if (streamReceiver_ != nullptr) {
405         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyStreamSocket receiver delete");
406         streamReceiver_->OnStreamStatus(STREAM_CLOSED);
407         streamReceiver_.reset();
408     }
409 
410     QuitStreamBuffer();
411     vtpInstance_->UpdateSocketStreamCount(false);
412     isDestroyed_ = true;
413     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyStreamSocket end");
414 }
415 
Connect(const IpAndPort & remote)416 bool VtpStreamSocket::Connect(const IpAndPort &remote)
417 {
418     if (remote.ip.empty()) {
419         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "remote addr  error, ip is nullptr");
420         DestroyStreamSocket();
421         return false;
422     }
423 
424     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
425         "Connect to server(server port:%d)", remote.port);
426     remoteIpPort_ = remote;
427 
428     struct sockaddr_in remoteSockAddr;
429     remoteSockAddr.sin_family = AF_INET;
430     remoteSockAddr.sin_port = htons(static_cast<short>(remote.port));
431     remoteSockAddr.sin_addr.s_addr = inet_addr(remote.ip.c_str());
432 
433     int ret = FtConnect(streamFd_, reinterpret_cast<struct sockaddr *>(&remoteSockAddr), sizeof(remoteSockAddr));
434     if (ret != 0) {
435         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtConnect failed, ret :%d, errorno: %d", ret, FtGetErrno());
436         DestroyStreamSocket();
437         return false;
438     }
439 
440     epollFd_ = FtEpollCreate();
441     if (epollFd_ < 0) {
442         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Failed to create epoll fd:%d", FtGetErrno());
443         DestroyStreamSocket();
444         return false;
445     }
446 
447     if (SetSocketEpollMode(streamFd_) != ERR_OK) {
448         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetSocketEpollMode failed, fd = %d", streamFd_);
449         DestroyStreamSocket();
450         return false;
451     }
452     isStreamRecv_ = true;
453     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Success to connect remote, and create a thread to recv data.");
454 
455     auto self = this->GetSelf();
456     std::thread([self]() { self->NotifyStreamListener(); }).detach();
457 
458     std::thread([self]() {
459         self->DoStreamRecv();
460         self->DestroyStreamSocket();
461     }).detach();
462 
463     bool &isDestroyed = isDestroyed_;
464     std::thread([self, &isDestroyed]() {
465         while (isDestroyed == false) {
466             self->FillpAppStatistics();
467             std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
468         }
469     }).detach();
470     return true;
471 }
472 
Send(std::unique_ptr<IStream> stream)473 bool VtpStreamSocket::Send(std::unique_ptr<IStream> stream)
474 {
475     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "send in..., streamType:%d, data size:%zd, ext size:%zd", streamType_,
476         stream->GetBufferLen(), stream->GetExtBufferLen());
477 
478     if (!isBlocked_) {
479         isBlocked_ = true;
480         if (!SetNonBlockMode(streamFd_, StreamAttr(false))) {
481             return false;
482         }
483     }
484 
485     std::unique_ptr<char[]> data = nullptr;
486     ssize_t len = 0;
487     if (streamType_ == RAW_STREAM) {
488         data = stream->GetBuffer();
489         len = stream->GetBufferLen();
490     } else if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
491         StreamPacketizer packet(streamType_, std::move(stream));
492 
493         auto plainData = packet.PacketizeStream();
494         if (plainData == nullptr) {
495             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "PacketizeStream failed");
496             return false;
497         }
498         len = packet.GetPacketLen() + GetEncryptOverhead();
499         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
500             "packet.GetPacketLen() = %zd, GetEncryptOverhead() = %zd", packet.GetPacketLen(), GetEncryptOverhead());
501         data = std::make_unique<char[]>(len + FRAME_HEADER_LEN);
502         ssize_t encLen = Encrypt(plainData.get(), packet.GetPacketLen(),
503             data.get() + FRAME_HEADER_LEN, len);
504         if (encLen != len) {
505             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
506                 "encrypted failed, dataLen = %zd, encryptLen = %zd", len, encLen);
507             return false;
508         }
509         InsertBufferLength(len, FRAME_HEADER_LEN, reinterpret_cast<uint8_t *>(data.get()));
510         len += FRAME_HEADER_LEN;
511     }
512 
513     int ret = FtSend(streamFd_, data.get(), len, 0);
514     if (ret == -1) {
515         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "send failed, errorno: %d", FtGetErrno());
516         return false;
517     }
518 
519     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "send out..., streamType:%d, data size:%zd", streamType_, len);
520     return true;
521 }
522 
SetOption(int type,const StreamAttr & value)523 bool VtpStreamSocket::SetOption(int type, const StreamAttr &value)
524 {
525     PrintOptionInfo(type, value);
526     auto it = optFuncMap_.find(type);
527     if (it == optFuncMap_.end()) {
528         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "not found type = %d", type);
529         return false;
530     }
531 
532     if (value.GetType() != it->second.valueType) {
533         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN,
534             "type = %d, value.type = %d", value.GetType(), it->second.valueType);
535         return false;
536     }
537 
538     MySetFunc set = it->second.set;
539     if (set == nullptr) {
540         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "set is nullptr");
541         return false;
542     }
543 
544     if (type == NON_BLOCK || type == TOS) {
545         return (this->*set)(static_cast<int>(streamFd_), value);
546     }
547 
548     auto outerIt = FILLP_TYPE_MAP.find(type);
549     if (outerIt != FILLP_TYPE_MAP.end()) {
550         return (this->*set)(outerIt->second, value);
551     }
552 
553     auto innerIt = INNER_FILLP_TYPE_MAP.find(type);
554     if (innerIt != INNER_FILLP_TYPE_MAP.end()) {
555         return (this->*set)(innerIt->second, value);
556     }
557 
558     return (this->*set)(static_cast<int>(type), value);
559 }
560 
GetOption(int type) const561 StreamAttr VtpStreamSocket::GetOption(int type) const
562 {
563     StreamAttr attr {};
564     auto it = optFuncMap_.find(type);
565     if (it != optFuncMap_.end()) {
566         MyGetFunc get = it->second.get;
567         if (get == nullptr) {
568             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Can not get option:%d", type);
569             return std::move(StreamAttr());
570         }
571         if (type == NON_BLOCK) {
572             attr = (this->*get)(static_cast<int>(streamFd_));
573         }
574         attr = (this->*get)(static_cast<int>(type));
575     }
576 
577     PrintOptionInfo(type, attr);
578     return attr;
579 }
580 
SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)581 bool VtpStreamSocket::SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)
582 {
583     if (receiver == nullptr) {
584         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "receiver is nullptr");
585         return false;
586     }
587 
588     std::lock_guard<std::mutex> guard(streamSocketLock_);
589     streamReceiver_ = receiver;
590     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "set receiver success");
591     return true;
592 }
593 
InitVtpInstance(const std::string & pkgName)594 bool VtpStreamSocket::InitVtpInstance(const std::string &pkgName)
595 {
596     return vtpInstance_->InitVtp(pkgName);
597 }
598 
DestroyVtpInstance(const std::string & pkgName)599 void VtpStreamSocket::DestroyVtpInstance(const std::string &pkgName)
600 {
601     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyVtpInstance start");
602     vtpInstance_->DestroyVtp(pkgName);
603     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyVtpInstance end");
604 }
605 
CreateAndBindSocket(IpAndPort & local)606 int VtpStreamSocket::CreateAndBindSocket(IpAndPort &local)
607 {
608     localIpPort_ = local;
609     vtpInstance_->UpdateSocketStreamCount(true);
610     if (local.ip.empty()) {
611         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "ip is empty");
612         return -1;
613     }
614 
615     int sockFd = FtSocket(AF_INET, SOCK_STREAM, IPPROTO_FILLP);
616     if (sockFd == -1) {
617         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtSocket failed, errorcode = %d", FtGetErrno());
618         return -1;
619     }
620 
621     // bind
622     sockaddr_in localSockAddr = {0};
623     localSockAddr.sin_family = AF_INET;
624     localSockAddr.sin_port = htons((short)local.port);
625     localSockAddr.sin_addr.s_addr = inet_addr(local.ip.c_str());
626 
627     socklen_t localAddrLen = sizeof(localSockAddr);
628     int ret = FtBind(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), localAddrLen);
629     if (ret == -1) {
630         FtClose(sockFd);
631         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtBind failed, errorcode %d", FtGetErrno());
632         return -1;
633     }
634 
635     // 获取port
636     ret = FtGetSockName(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), &localAddrLen);
637     if (ret != ERR_OK) {
638         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "getsockname error ret: %d, errorcode :%d", ret, FtGetErrno());
639         FtClose(sockFd);
640         return -1;
641     }
642 
643     char host[ADDR_MAX_SIZE];
644     localIpPort_.port = static_cast<int32_t>(ntohs(localSockAddr.sin_port));
645     localIpPort_.ip = inet_ntop(AF_INET, &(localSockAddr.sin_addr), host, ADDR_MAX_SIZE);
646     local.port = localIpPort_.port;
647 
648     if (SetSocketBoundInner(sockFd, localIpPort_.ip) == false) {
649         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetSocketBoundInner failed, errorcode :%d", FtGetErrno());
650     }
651     return sockFd;
652 }
653 
654 
EnableBwEstimationAlgo(int streamFd,bool isServer) const655 bool VtpStreamSocket::EnableBwEstimationAlgo(int streamFd, bool isServer) const
656 {
657 #ifdef FILLP_SUPPORT_BW_DET
658     int errBwDet;
659     if (isServer == true) {
660         int32_t enableBwDet = FILLP_BW_DET_RX_ENABLE;
661         errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
662             &enableBwDet, sizeof(enableBwDet));
663     } else {
664         int32_t enableBwDet = FILLP_BW_DET_TX_ENABLE;
665         errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
666             &enableBwDet, sizeof(enableBwDet));
667     }
668     if (errBwDet < 0) {
669         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
670             "Fail to enable bandwidth estimation algorithm for stream: %d, errorcode = %d", streamFd, FtGetErrno());
671         return true;
672     } else {
673         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
674             "Success to enable bandwidth estimation algorithm for stream: %d", streamFd);
675         return false;
676     }
677 #else
678     return true;
679 #endif
680 }
681 
EnableJitterDetectionAlgo(int streamFd) const682 bool VtpStreamSocket::EnableJitterDetectionAlgo(int streamFd) const
683 {
684 #ifdef FILLP_SUPPORT_CQE
685     int32_t  enableCQE = FILLP_CQE_ENABLE;
686     auto errCQE = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_CQE_ALGO, &enableCQE, sizeof(enableCQE));
687     if (errCQE < 0) {
688         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
689             "Fail to enable CQE algorithm for stream: %d, errorcode = %d", streamFd, FtGetErrno());
690         return true;
691     } else {
692         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
693             "Success to enable CQE algorithm for stream: %d", streamFd);
694         return false;
695     }
696 #else
697     return true;
698 #endif
699 }
700 
RegisterMetricCallback(bool isServer)701 void VtpStreamSocket::RegisterMetricCallback(bool isServer)
702 {
703     VtpStreamSocket::AddStreamSocketLock(streamFd_, streamSocketLock_);
704     VtpStreamSocket::AddStreamSocketListener(streamFd_, streamReceiver_);
705 #if (defined(FILLP_SUPPORT_BW_DET) && defined(FILLP_SUPPORT_BW_DET))
706     int regStatisticsRet = FtApiRegEventCallbackFunc(streamFd_, FillpBwAndJitterStatistics);
707     if (isServer == true) {
708         if (regStatisticsRet == 0) {
709             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
710                 "Success to register the stream callback function at server side with Fd = %d", streamFd_);
711         } else {
712             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
713                 "Fail to register the stream callback function at server side with Fd = %d", streamFd_);
714         }
715     } else {
716         if (regStatisticsRet == 0) {
717             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
718                 "Success to register the stream callback function at client side with Fd = %d", streamFd_);
719         } else {
720             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
721                 "Fail to register the stream callback function at client side with Fd = %d", streamFd_);
722         }
723     }
724 #endif
725 }
726 
Accept()727 bool VtpStreamSocket::Accept()
728 {
729     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "accept start");
730 
731     auto fd = FtAccept(listenFd_, nullptr, nullptr);
732     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "accept streamFd:%d", fd);
733     if (fd == -1) {
734         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "errorcode = %d", FtGetErrno());
735         return false;
736     }
737 
738     sockaddr remoteAddr {};
739     socklen_t remoteAddrLen = sizeof(remoteAddr);
740     auto ret = FtGetPeerName(fd, &remoteAddr, &remoteAddrLen);
741     if (ret != ERR_OK) {
742         FtClose(fd);
743         return false;
744     }
745 
746     char host[ADDR_MAX_SIZE];
747     if (remoteAddr.sa_family == AF_INET) {
748         auto v4Addr = reinterpret_cast<const sockaddr_in *>(&remoteAddr);
749         remoteIpPort_.ip = inet_ntop(AF_INET, &(v4Addr->sin_addr), host, ADDR_MAX_SIZE);
750         remoteIpPort_.port = v4Addr->sin_port;
751     } else {
752         auto v6Addr = reinterpret_cast<const sockaddr_in6 *>(&remoteAddr);
753         remoteIpPort_.ip = inet_ntop(AF_INET6, &(v6Addr->sin6_addr), host, ADDR_MAX_SIZE);
754         remoteIpPort_.port = v6Addr->sin6_port;
755     }
756 
757     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
758         "Accept a client(server port:%d)", remoteIpPort_.port);
759     SetDefaultConfig(fd);
760 
761     if (SetSocketEpollMode(fd) != ERR_OK) {
762         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetSocketEpollMode failed, fd = %d", fd);
763         FtClose(fd);
764         return false;
765     }
766 
767     std::lock_guard<std::mutex> guard(streamSocketLock_);
768     streamFd_ = fd;
769     configCv_.notify_all();
770 
771     if (streamReceiver_ != nullptr) {
772         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "notify stream connected!");
773         streamReceiver_->OnStreamStatus(STREAM_CONNECTED);
774     }
775 
776     /* enable the bandwidth and CQE estimation algorithms for current ftsocket */
777 #ifdef FILLP_SUPPORT_BW_DET
778     bool isServer = true;
779     EnableBwEstimationAlgo(streamFd_, isServer);
780     RegisterMetricCallback(isServer); /* register the callback function */
781 #endif
782 #ifdef FILLP_SUPPORT_CQE
783     EnableJitterDetectionAlgo(streamFd_);
784 #endif
785 
786     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "accept success!");
787     return true;
788 }
789 
EpollTimeout(int fd,int timeout)790 int VtpStreamSocket::EpollTimeout(int fd, int timeout)
791 {
792     struct SpungeEpollEvent events[MAX_EPOLL_NUM];
793     (void)memset_s(events, sizeof(events), 0, sizeof(events));
794     while (true) {
795         FILLP_INT fdNum = FtEpollWait(epollFd_, events, MAX_EPOLL_NUM, timeout);
796         if (fdNum <= 0) {
797             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
798                 "FtEpollWait failed, ret = %d, errno = %d", fdNum, FtGetErrno());
799             return -FtGetErrno();
800         }
801 
802         for (FILLP_INT i = 0; i < fdNum; i++) {
803             if (events[i].data.fd != fd) {
804                 continue;
805             }
806 
807             if (events[i].events & (SPUNGE_EPOLLHUP | SPUNGE_EPOLLERR)) {
808                 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
809                     "EpollTimeout, something may be wrong in this socket, fd = %d, events = %u", fd,
810                     (unsigned int)events[i].events);
811                 return -1;
812             }
813 
814             if (events[i].events & SPUNGE_EPOLLIN) {
815                 return 0;
816             }
817         }
818     }
819 }
820 
SetSocketEpollMode(int fd)821 int VtpStreamSocket::SetSocketEpollMode(int fd)
822 {
823     if (!SetNonBlockMode(fd, StreamAttr(true))) {
824         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetNonBlockMode failed, errno = %d", FtGetErrno());
825         return -1;
826     }
827 
828     struct SpungeEpollEvent event = {0};
829     event.events = SPUNGE_EPOLLIN;
830     event.data.fd = fd;
831 
832     auto ret = FtEpollCtl(epollFd_, SPUNGE_EPOLL_CTL_ADD, fd, &event);
833     if (ret != ERR_OK) {
834         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtEpollCtl failed, ret = %d, errno = %d", ret, FtGetErrno());
835         return ret;
836     }
837 
838     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "SetNonBlockMode success");
839     return 0;
840 }
841 
InsertBufferLength(int num,int length,uint8_t * output) const842 void VtpStreamSocket::InsertBufferLength(int num, int length, uint8_t *output) const
843 {
844     for (int i = 0; i < length; i++) {
845         output[length - 1 - i] = static_cast<unsigned int>(
846             ((static_cast<unsigned int>(num) >> static_cast<unsigned int>(BYTE_TO_BIT * i))) & INT_TO_BYTE);
847     }
848 }
849 
MakeStreamData(StreamData & data,const StreamFrameInfo & info) const850 std::unique_ptr<IStream> VtpStreamSocket::MakeStreamData(StreamData &data, const StreamFrameInfo &info) const
851 {
852     std::unique_ptr<IStream> stream = nullptr;
853     switch (streamType_) {
854         case VIDEO_SLICE_STREAM:
855             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "do not support VIDEO_SLICE_STREAM type = %d", streamType_);
856             break;
857         case COMMON_VIDEO_STREAM:
858         case COMMON_AUDIO_STREAM:
859             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
860                 "streamType = %d, seqnum=%d, streamid=%d", streamType_, info.seqNum, info.streamId);
861             stream = IStream::MakeCommonStream(data, info);
862             break;
863         case RAW_STREAM:
864             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "streamType = %d", streamType_);
865             stream = IStream::MakeRawStream(data, info);
866             break;
867         default:
868             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "do not support type = %d", streamType_);
869             break;
870     }
871     if (stream == nullptr) {
872         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "IStream construct error");
873         return nullptr;
874     }
875 
876     return stream;
877 }
878 
RecvStreamLen()879 int VtpStreamSocket::RecvStreamLen()
880 {
881     int hdrSize = FRAME_HEADER_LEN;
882     if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
883         hdrSize = streamHdrSize_;
884     }
885 
886     int len = -1;
887     int timeout = -1;
888     auto buffer = std::make_unique<char[]>(hdrSize);
889     if (EpollTimeout(streamFd_, timeout) == 0) {
890         do {
891             len = FtRecv(streamFd_, buffer.get(), hdrSize, 0);
892         } while (len <= 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
893     }
894     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "recv frame header, len = %d, scene:%d", len, scene_);
895 
896     if (len <= 0) {
897         return -1;
898     }
899 
900     if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
901         std::lock_guard<std::mutex> guard(streamSocketLock_);
902         if (streamReceiver_ != nullptr) {
903             return streamReceiver_->OnStreamHdrReceived(std::move(buffer), hdrSize);
904         }
905     }
906 
907     return ntohl(*reinterpret_cast<int *>(buffer.get()));
908 }
909 
DoStreamRecv()910 void VtpStreamSocket::DoStreamRecv()
911 {
912     while (isStreamRecv_) {
913         std::unique_ptr<char[]> dataBuffer = nullptr;
914         std::unique_ptr<char[]> extBuffer = nullptr;
915         int extLen = 0;
916         StreamFrameInfo info = {};
917         int dataLength = 0;
918         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "recv stream");
919         dataLength = VtpStreamSocket::RecvStreamLen();
920         if (dataLength <= 0 || dataLength > MAX_STREAM_LEN) {
921             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "read frame length error, dataLength = %d", dataLength);
922             break;
923         }
924         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
925             "recv a new frame, dataLength = %d, stream type:%d", dataLength, streamType_);
926         dataBuffer = VtpStreamSocket::RecvStream(dataLength);
927 
928         if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
929             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "recv common stream");
930             int decryptedLength = dataLength;
931             auto decryptedBuffer = std::move(dataBuffer);
932 
933             int plainDataLength = decryptedLength - GetEncryptOverhead();
934             std::unique_ptr<char[]> plainData = std::make_unique<char[]>(plainDataLength);
935             ssize_t decLen = Decrypt(decryptedBuffer.get(), decryptedLength, plainData.get(), plainDataLength);
936             if (decLen != plainDataLength) {
937                 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
938                     "Decrypt failed, dataLength = %d, decryptedLen = %zd", plainDataLength, decLen);
939                 break;
940             }
941             auto header = plainData.get();
942             StreamDepacketizer decode(streamType_);
943             decode.DepacketizeHeader(header);
944 
945             auto buffer = plainData.get() + sizeof(CommonHeader);
946             decode.DepacketizeBuffer(buffer);
947 
948             extBuffer = decode.GetUserExt();
949             extLen = decode.GetUserExtSize();
950             info.seqNum = decode.GetSeqNum();
951             info.streamId = decode.GetStreamId();
952             dataBuffer = decode.GetData();
953             dataLength = decode.GetDataLength();
954             if (dataLength <= 0) {
955                 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
956                     "common depacketize error, dataLength = %d", dataLength);
957                 break;
958             }
959         }
960 
961         StreamData data = { std::move(dataBuffer), dataLength, std::move(extBuffer), extLen };
962         std::unique_ptr<IStream> stream = MakeStreamData(data, info);
963         if (stream == nullptr) {
964             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "MakeStreamData failed, stream == nullptr");
965             break;
966         }
967 
968         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
969             "recv frame done, dataLength = %d, stream type:%d", dataLength, streamType_);
970 
971         if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
972             std::lock_guard<std::mutex> guard(streamSocketLock_);
973             if (streamReceiver_ != nullptr) {
974                 streamReceiver_->OnStreamReceived(std::move(stream));
975                 continue;
976             }
977         }
978 
979         PutStream(std::move(stream));
980         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
981             "put frame done, dataLength = %d, stream type:%d", dataLength, streamType_);
982     }
983     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "recv thread exit");
984 }
985 
RecvStream(int dataLength)986 std::unique_ptr<char[]> VtpStreamSocket::RecvStream(int dataLength)
987 {
988     auto buffer = std::make_unique<char[]>(dataLength);
989     int recvLen = 0;
990     while (recvLen < dataLength) {
991         int ret = -1;
992         int timeout = -1;
993 
994         if (EpollTimeout(streamFd_, timeout) == 0) {
995             do {
996                 ret = FtRecv(streamFd_, (buffer.get() + recvLen), dataLength - recvLen, 0);
997             } while (ret < 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
998         }
999 
1000         if (ret == -1) {
1001             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "read frame failed, errno: %d", FtGetErrno());
1002             return nullptr;
1003         }
1004 
1005         recvLen += ret;
1006     }
1007     return std::unique_ptr<char[]>(buffer.release());
1008 }
1009 
SetDefaultConfig(int fd)1010 void VtpStreamSocket::SetDefaultConfig(int fd)
1011 {
1012     if (!SetIpTos(fd, StreamAttr(static_cast<int>(IPTOS_LOWDELAY)))) {
1013         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "SetIpTos failed");
1014     }
1015 
1016     if (!SetOption(SEND_BUF_SIZE, StreamAttr(static_cast<int>(DEFAULT_UDP_BUFFER_SIZE)))) {
1017         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Set send buff failed");
1018     }
1019 
1020     if (!SetOption(InnerStreamOptionType::RECV_CACHE, StreamAttr(static_cast<int>(FILLP_VTP_RECV_CACHE_SIZE)))) {
1021         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Set recv cache failed");
1022     }
1023 
1024     if (!SetOption(InnerStreamOptionType::SEND_CACHE, StreamAttr(static_cast<int>(FILLP_VTP_SEND_CACHE_SIZE)))) {
1025         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Set send cache failed");
1026     }
1027 }
1028 
SetIpTos(int fd,const StreamAttr & tos)1029 bool VtpStreamSocket::SetIpTos(int fd, const StreamAttr &tos)
1030 {
1031     auto tmp = tos.GetIntValue();
1032     if (FtSetSockOpt(fd, IPPROTO_IP, IP_TOS, &tmp, sizeof(tmp)) != ERR_OK) {
1033         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetIpTos wrong! fd=%d, errorcode=%d", fd, FtGetErrno());
1034         return false;
1035     }
1036 
1037     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Success to set ip tos: fd=%d, tos=%d", fd, tmp);
1038     return true;
1039 }
1040 
GetIpTos(int type) const1041 StreamAttr VtpStreamSocket::GetIpTos(int type) const
1042 {
1043     static_cast<void>(type);
1044     int tos;
1045     int size = sizeof(tos);
1046 
1047     if (FtGetSockOpt(streamFd_, IPPROTO_IP, IP_TOS, &tos, &size) != ERR_OK) {
1048         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtGetSockOpt errorcode = %d", FtGetErrno());
1049         return std::move(StreamAttr());
1050     }
1051 
1052     return std::move(StreamAttr(tos));
1053 }
1054 
GetStreamSocketFd(int type) const1055 StreamAttr VtpStreamSocket::GetStreamSocketFd(int type) const
1056 {
1057     static_cast<void>(type);
1058     return std::move(StreamAttr(streamFd_));
1059 }
1060 
GetListenSocketFd(int type) const1061 StreamAttr VtpStreamSocket::GetListenSocketFd(int type) const
1062 {
1063     static_cast<void>(type);
1064     return std::move(StreamAttr(listenFd_));
1065 }
1066 
SetSocketBoundInner(int fd,std::string ip) const1067 bool VtpStreamSocket::SetSocketBoundInner(int fd, std::string ip) const
1068 {
1069     auto boundIp = (ip == "") ? localIpPort_.ip : ip;
1070     struct ifaddrs *ifList = nullptr;
1071     if (getifaddrs(&ifList) < 0) {
1072         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1073             "get interface address return error %d (%s)", errno, strerror(errno));
1074         return false;
1075     }
1076 
1077     struct ifaddrs *ifa = nullptr;
1078     for (ifa = ifList; ifa != nullptr; ifa = ifa->ifa_next) {
1079         if (ifa->ifa_addr == nullptr) {
1080             continue;
1081         }
1082         if (ifa->ifa_addr->sa_family != AF_INET) {
1083             continue;
1084         }
1085 
1086         char host[ADDR_MAX_SIZE];
1087         std::string devName(ifa->ifa_name);
1088         if (strcmp(boundIp.c_str(), inet_ntop(AF_INET, &(((struct sockaddr_in *)ifa->ifa_addr)->sin_addr),
1089             host, ADDR_MAX_SIZE)) == 0) {
1090             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "current use interface %s to bind to socket", ifa->ifa_name);
1091             auto err = FtSetSockOpt(fd, SOL_SOCKET, SO_BINDTODEVICE, devName.c_str(), devName.size());
1092             if (err < 0) {
1093                 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "fail to set socket binding to device");
1094                 freeifaddrs(ifList);
1095                 return false;
1096             }
1097             break;
1098         }
1099     }
1100     freeifaddrs(ifList);
1101 
1102     return true;
1103 }
1104 
SetSocketBindToDevices(int type,const StreamAttr & ip)1105 bool VtpStreamSocket::SetSocketBindToDevices(int type, const StreamAttr &ip)
1106 {
1107     static_cast<void>(type);
1108     auto tmp = ip.GetStrValue();
1109     auto boundIp = (tmp == "") ? localIpPort_.ip : tmp;
1110     return SetSocketBoundInner(streamFd_, boundIp);
1111 }
1112 
SetVtpStackConfigDelayed(int type,const StreamAttr & value)1113 bool VtpStreamSocket::SetVtpStackConfigDelayed(int type, const StreamAttr &value)
1114 {
1115     std::unique_lock<std::mutex> lock(streamSocketLock_);
1116     if (streamFd_ == -1) {
1117         configCv_.wait(lock, [this] { return streamFd_ != -1; });
1118     }
1119     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "set vtp stack config, streamFd = %d", streamFd_);
1120     return SetVtpStackConfig(type, value);
1121 }
1122 
SetVtpStackConfig(int type,const StreamAttr & value)1123 bool VtpStreamSocket::SetVtpStackConfig(int type, const StreamAttr &value)
1124 {
1125     if (streamFd_ == -1) {
1126         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "set vtp stack config when streamFd is legal");
1127         auto self = GetSelf();
1128         std::thread([self, type, value]() { self->SetVtpStackConfigDelayed(type, value); }).detach();
1129         return true;
1130     }
1131 
1132     if (value.GetType() == INT_TYPE) {
1133         int intVal = value.GetIntValue();
1134         int ret = FtConfigSet(type, &intVal, &streamFd_);
1135         if (ret != 0) {
1136             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1137                 "FtConfigSet failed, type = %d, errorcode = %d", type, FtGetErrno());
1138             return false;
1139         }
1140 
1141         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
1142             "setVtpConfig(%d) success, fd= %d, value= %d", type, streamFd_, intVal);
1143         return true;
1144     }
1145 
1146     if (value.GetType() == BOOL_TYPE) {
1147         bool flag = value.GetBoolValue();
1148         int ret = FtConfigSet(type, &flag, &streamFd_);
1149         if (ret != 0) {
1150             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1151                 "FtConfigSet failed, type = %d, errorcode = %d", type, FtGetErrno());
1152             return false;
1153         }
1154 
1155         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
1156             "setVtpConfig(%d) success, fd= %d, value= %d", type, streamFd_, flag);
1157         return true;
1158     }
1159 
1160     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "UNKNOWN TYPE!");
1161     return false;
1162 }
1163 
GetVtpStackConfig(int type) const1164 StreamAttr VtpStreamSocket::GetVtpStackConfig(int type) const
1165 {
1166     int intVal = -1;
1167     int configFd = (streamFd_ == -1) ? FILLP_CONFIG_ALL_SOCKET : streamFd_;
1168     int ret = FtConfigGet(type, &intVal, &configFd);
1169     if (ret != 0) {
1170         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1171             "FtConfigGet failed, type = %d, errorcode = %d", type, FtGetErrno());
1172         return std::move(StreamAttr());
1173     }
1174 
1175     int valType = ValueType::UNKNOWN;
1176     for (auto it = FILLP_TYPE_MAP.begin(); it != FILLP_TYPE_MAP.end(); it++) {
1177         if (it->second != type) {
1178             continue;
1179         }
1180 
1181         valType = optFuncMap_.at(it->first).valueType;
1182         break;
1183     }
1184 
1185     if (valType != ValueType::UNKNOWN) {
1186         for (auto it = INNER_FILLP_TYPE_MAP.begin(); it != INNER_FILLP_TYPE_MAP.end(); it++) {
1187             if (it->second != type) {
1188                 continue;
1189             }
1190 
1191             valType = optFuncMap_.at(it->first).valueType;
1192             break;
1193         }
1194     }
1195 
1196     if (valType == BOOL_TYPE) {
1197         return std::move(StreamAttr(!!intVal));
1198     }
1199 
1200     return std::move(StreamAttr(intVal));
1201 }
1202 
SetNonBlockMode(int fd,const StreamAttr & value)1203 bool VtpStreamSocket::SetNonBlockMode(int fd, const StreamAttr &value)
1204 {
1205     FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1206     if (flags < 0) {
1207         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "failed to get FtFcntl, flags = %d", flags);
1208         flags = 0;
1209     }
1210     bool nonBlock = value.GetBoolValue();
1211 
1212     flags = nonBlock ? static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) | O_NONBLOCK)) :
1213         static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) & ~O_NONBLOCK));
1214 
1215     FILLP_INT res = FtFcntl(fd, F_SETFL, flags);
1216     if (res < 0) {
1217         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "failed to set FtFcntl, res = %d", res);
1218         return false;
1219     }
1220 
1221     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Successfully to set fd(%d) nonBlock mode = %d", fd, nonBlock);
1222     return true;
1223 }
1224 
GetNonBlockMode(int fd) const1225 StreamAttr VtpStreamSocket::GetNonBlockMode(int fd) const
1226 {
1227     FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1228     if (static_cast<unsigned int>(flags) & O_NONBLOCK) {
1229         return std::move(StreamAttr(true));
1230     }
1231 
1232     return std::move(StreamAttr(false));
1233 }
1234 
GetIp(int type) const1235 StreamAttr VtpStreamSocket::GetIp(int type) const
1236 {
1237     if (type == LOCAL_IP) {
1238         return std::move(StreamAttr(localIpPort_.ip));
1239     }
1240 
1241     return std::move(StreamAttr(remoteIpPort_.ip));
1242 }
1243 
GetPort(int type) const1244 StreamAttr VtpStreamSocket::GetPort(int type) const
1245 {
1246     if (type == LOCAL_PORT) {
1247         return std::move(StreamAttr(localIpPort_.port));
1248     }
1249     return std::move(StreamAttr(remoteIpPort_.port));
1250 }
1251 
SetStreamType(int type,const StreamAttr & value)1252 bool VtpStreamSocket::SetStreamType(int type, const StreamAttr &value)
1253 {
1254     if (type != STREAM_TYPE_INT) {
1255         return false;
1256     }
1257 
1258     streamType_ = value.GetIntValue();
1259     return true;
1260 }
1261 
GetStreamType(int type) const1262 StreamAttr VtpStreamSocket::GetStreamType(int type) const
1263 {
1264     if (type != STREAM_TYPE_INT) {
1265         return std::move(StreamAttr());
1266     }
1267 
1268     return std::move(StreamAttr(streamType_));
1269 }
1270 
SetStreamScene(int type,const StreamAttr & value)1271 bool VtpStreamSocket::SetStreamScene(int type, const StreamAttr &value)
1272 {
1273     static_cast<void>(type);
1274     if (value.GetType() != INT_TYPE) {
1275         return false;
1276     }
1277     scene_ = value.GetIntValue();
1278     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Set scene to %d", scene_);
1279     return true;
1280 }
1281 
SetStreamHeaderSize(int type,const StreamAttr & value)1282 bool VtpStreamSocket::SetStreamHeaderSize(int type, const StreamAttr &value)
1283 {
1284     static_cast<void>(type);
1285     if (value.GetType() != INT_TYPE) {
1286         return false;
1287     }
1288     streamHdrSize_ = value.GetIntValue();
1289     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Set header size to %d", streamHdrSize_);
1290     return true;
1291 }
1292 
NotifyStreamListener()1293 void VtpStreamSocket::NotifyStreamListener()
1294 {
1295     while (isStreamRecv_) {
1296         int streamNum = GetStreamNum();
1297         if (streamNum >= STREAM_BUFFER_THRESHOLD) {
1298             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Too many data in receiver, num = %d", streamNum);
1299         }
1300 
1301         auto stream = TakeStream();
1302         if (stream == nullptr) {
1303             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Pop stream failed");
1304             break;
1305         }
1306 
1307         std::lock_guard<std::mutex> guard(streamSocketLock_);
1308         if (streamReceiver_ != nullptr) {
1309             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "notify listener");
1310             streamReceiver_->OnStreamReceived(std::move(stream));
1311             SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "notify listener done.");
1312         }
1313     }
1314     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "notify thread exit");
1315 }
1316 
GetEncryptOverhead() const1317 ssize_t VtpStreamSocket::GetEncryptOverhead() const
1318 {
1319     return OVERHEAD_LEN;
1320 }
1321 
Encrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1322 ssize_t VtpStreamSocket::Encrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1323 {
1324     AesGcmCipherKey cipherKey = {0};
1325 
1326     if (inLen - OVERHEAD_LEN > outLen) {
1327         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Encrypt invalid para.");
1328         return SOFTBUS_ERR;
1329     }
1330 
1331     cipherKey.keyLen = SESSION_KEY_LENGTH;
1332     if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.c_str(), SESSION_KEY_LENGTH) != EOK) {
1333         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "memcpy key error.");
1334         return SOFTBUS_ERR;
1335     }
1336 
1337     int ret = SoftBusEncryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1338     (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1339     if (ret != SOFTBUS_OK || outLen != inLen + OVERHEAD_LEN) {
1340         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Encrypt Data fail. %d", ret);
1341         return SOFTBUS_ENCRYPT_ERR;
1342     }
1343     return outLen;
1344 }
1345 
Decrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1346 ssize_t VtpStreamSocket::Decrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1347 {
1348     AesGcmCipherKey cipherKey = {0};
1349 
1350     if (inLen - OVERHEAD_LEN > outLen) {
1351         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Decrypt invalid para.");
1352         return SOFTBUS_ERR;
1353     }
1354 
1355     cipherKey.keyLen = SESSION_KEY_LENGTH; // 256 bit encryption
1356     if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.c_str(), SESSION_KEY_LENGTH) != EOK) {
1357         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "memcpy key error.");
1358         return SOFTBUS_ERR;
1359     }
1360     int ret = SoftBusDecryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1361     (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1362     if (ret != SOFTBUS_OK) {
1363         SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Decrypt Data fail. %d ", ret);
1364         return SOFTBUS_DECRYPT_ERR;
1365     }
1366 
1367     return outLen;
1368 }
1369 
GetCryptErrorReason(void) const1370 void VtpStreamSocket::GetCryptErrorReason(void) const
1371 {
1372     SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "Unsupport api");
1373 }
1374 } // namespace SoftBus
1375 } // namespace Communication
1376