1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "vtp_stream_socket.h"
17
18 #include <chrono>
19 #include <ifaddrs.h>
20 #include <memory>
21 #include <netinet/in.h>
22 #include <securec.h>
23 #include <sys/socket.h>
24 #include <sys/time.h>
25 #include <thread>
26
27 #include "fillpinc.h"
28 #include "raw_stream_data.h"
29 #include "session.h"
30 #include "softbus_adapter_crypto.h"
31 #include "softbus_errcode.h"
32 #include "softbus_log.h"
33 #include "stream_depacketizer.h"
34 #include "stream_packetizer.h"
35
36 namespace Communication {
37 namespace SoftBus {
38 bool g_logOn = false;
39 const int FEED_BACK_PERIOD = 1; /* feedback period of fillp stream traffic statistics is 1s */
40 const int MS_PER_SECOND = 1000;
41 const int US_PER_MS = 1000;
42
43 namespace {
PrintOptionInfo(int type,const StreamAttr & value)44 void PrintOptionInfo(int type, const StreamAttr &value)
45 {
46 switch (value.GetType()) {
47 case INT_TYPE:
48 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
49 "Int option: type:%d, value:%d", type, value.GetIntValue());
50 break;
51 case BOOL_TYPE:
52 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
53 "Bool option: type:%d, value:%d", type, value.GetBoolValue());
54 break;
55 case STRING_TYPE:
56 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
57 "String option: type:%d, value:%s", type, value.GetStrValue().c_str());
58 break;
59 default:
60 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Wrong StreamAttr!");
61 (void)type;
62 }
63 }
64 } // namespace
65 std::shared_ptr<VtpInstance> VtpStreamSocket::vtpInstance_ = VtpInstance::GetVtpInstance();
66
67 std::map<int, std::mutex &> VtpStreamSocket::g_streamSocketLockMap;
68 std::map<int, std::shared_ptr<IStreamSocketListener>> VtpStreamSocket::g_streamReceiverMap;
69
AddStreamSocketLock(int fd,std::mutex & streamsocketlock)70 void VtpStreamSocket::AddStreamSocketLock(int fd, std::mutex &streamsocketlock)
71 {
72 if (!g_streamSocketLockMap.empty() && g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
73 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "streamsocketlock for fd = %d already exists", fd);
74 return;
75 }
76
77 g_streamSocketLockMap.emplace(std::pair<int, std::mutex &>(fd, streamsocketlock));
78 }
79
AddStreamSocketListener(int fd,std::shared_ptr<IStreamSocketListener> streamreceiver)80 void VtpStreamSocket::AddStreamSocketListener(int fd, std::shared_ptr<IStreamSocketListener> streamreceiver)
81 {
82 if (!g_streamReceiverMap.empty() && g_streamReceiverMap.find(fd) != g_streamReceiverMap.end()) {
83 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "streamreceiver for fd = %d already exists", fd);
84 return;
85 }
86
87 g_streamReceiverMap.emplace(std::pair<int, std::shared_ptr<IStreamSocketListener>>(fd, streamreceiver));
88 }
89
RemoveStreamSocketLock(int fd)90 void VtpStreamSocket::RemoveStreamSocketLock(int fd)
91 {
92 if (g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
93 g_streamSocketLockMap.erase(fd);
94 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Remove streamsocketlock for fd = %d success", fd);
95 } else {
96 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
97 "Streamsocketlock for fd = %d not exist in the map", fd);
98 }
99 return;
100 }
101
RemoveStreamSocketListener(int fd)102 void VtpStreamSocket::RemoveStreamSocketListener(int fd)
103 {
104 if (g_streamReceiverMap.find(fd) != g_streamReceiverMap.end()) {
105 g_streamReceiverMap.erase(fd);
106 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Remove streamreceiver for fd = %d success", fd);
107 } else {
108 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Streamreceiver for fd = %d not exist in the map", fd);
109 }
110 return;
111 }
112
InsertElementToFuncMap(int type,ValueType valueType,MySetFunc set,MyGetFunc get)113 void VtpStreamSocket::InsertElementToFuncMap(int type, ValueType valueType, MySetFunc set, MyGetFunc get)
114 {
115 OptionFunc fun = {
116 valueType, set, get
117 };
118 optFuncMap_.insert(std::pair<int, OptionFunc>(type, fun));
119 }
120
VtpStreamSocket()121 VtpStreamSocket::VtpStreamSocket()
122 {
123 InsertElementToFuncMap(TOS, INT_TYPE, &VtpStreamSocket::SetIpTos, &VtpStreamSocket::GetIpTos);
124 InsertElementToFuncMap(FD, INT_TYPE, nullptr, &VtpStreamSocket::GetStreamSocketFd);
125 InsertElementToFuncMap(SERVER_FD, INT_TYPE, nullptr, &VtpStreamSocket::GetListenSocketFd);
126 InsertElementToFuncMap(LOCAL_IP, INT_TYPE, nullptr, &VtpStreamSocket::GetIp);
127 InsertElementToFuncMap(LOCAL_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
128 InsertElementToFuncMap(REMOTE_IP, STRING_TYPE, nullptr, &VtpStreamSocket::GetIp);
129 InsertElementToFuncMap(REMOTE_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
130 InsertElementToFuncMap(BOUND_INTERFACE_IP, STRING_TYPE, &VtpStreamSocket::SetSocketBindToDevices, nullptr);
131 InsertElementToFuncMap(IP_TYPE, STRING_TYPE, nullptr, &VtpStreamSocket::GetIpType);
132 InsertElementToFuncMap(REMOTE_SCOPE_ID, INT_TYPE, nullptr, &VtpStreamSocket::GetRemoteScopeId);
133 InsertElementToFuncMap(NON_BLOCK, BOOL_TYPE, &VtpStreamSocket::SetNonBlockMode, &VtpStreamSocket::GetNonBlockMode);
134 InsertElementToFuncMap(KEEP_ALIVE_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig, nullptr);
135 InsertElementToFuncMap(SEND_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
136 &VtpStreamSocket::GetVtpStackConfig);
137 InsertElementToFuncMap(RECV_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
138 &VtpStreamSocket::GetVtpStackConfig);
139 InsertElementToFuncMap(SEND_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
140 &VtpStreamSocket::GetVtpStackConfig);
141 InsertElementToFuncMap(RECV_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
142 &VtpStreamSocket::GetVtpStackConfig);
143 InsertElementToFuncMap(PACKET_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
144 &VtpStreamSocket::GetVtpStackConfig);
145 InsertElementToFuncMap(MAX_VTP_SOCKET_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
146 &VtpStreamSocket::GetVtpStackConfig);
147 InsertElementToFuncMap(MAX_VTP_CONNECT_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
148 &VtpStreamSocket::GetVtpStackConfig);
149 InsertElementToFuncMap(REDUNANCY_SWITCH, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
150 &VtpStreamSocket::GetVtpStackConfig);
151 InsertElementToFuncMap(REDUNANCY_LEVEL, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
152 &VtpStreamSocket::GetVtpStackConfig);
153 InsertElementToFuncMap(NACK_DELAY, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
154 &VtpStreamSocket::GetVtpStackConfig);
155 InsertElementToFuncMap(NACK_DELAY_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
156 &VtpStreamSocket::GetVtpStackConfig);
157 InsertElementToFuncMap(PACK_INTERVAL_ENLARGE, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
158 &VtpStreamSocket::GetVtpStackConfig);
159 InsertElementToFuncMap(STREAM_TYPE_INT, INT_TYPE, &VtpStreamSocket::SetStreamType, &VtpStreamSocket::GetStreamType);
160 InsertElementToFuncMap(IS_SERVER, INT_TYPE, nullptr, &VtpStreamSocket::IsServer);
161 InsertElementToFuncMap(SCENE, INT_TYPE, &VtpStreamSocket::SetStreamScene, nullptr);
162 InsertElementToFuncMap(STREAM_HEADER_SIZE, INT_TYPE, &VtpStreamSocket::SetStreamHeaderSize, nullptr);
163
164 scene_ = UNKNOWN_SCENE;
165 }
166
~VtpStreamSocket()167 VtpStreamSocket::~VtpStreamSocket()
168 {
169 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "~VtpStreamSocket");
170 }
171
GetSelf()172 std::shared_ptr<VtpStreamSocket> VtpStreamSocket::GetSelf()
173 {
174 return shared_from_this();
175 }
176
177 /* This function is used to prompt the metrics returned by FtApiRegEventCallbackFunc() function */
FillpBwAndJitterStatistics(int fd,const FtEventCbkInfo * info)178 int VtpStreamSocket::FillpBwAndJitterStatistics(int fd, const FtEventCbkInfo *info)
179 {
180 #if (defined(FILLP_SUPPORT_BW_DET) && defined(FILLP_SUPPORT_BW_DET))
181 if (info->evt == FT_EVT_BW_DET || info->evt == FT_EVT_JITTER_DET) {
182 int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
183 int32_t tvCount = 1;
184 QosTv metricList = {};
185
186 if (info->evt == FT_EVT_BW_DET) {
187 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
188 "[Metric Return]: Fillp bandwidth information of socket(%d) is returned", fd);
189 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
190 "[Metric Return]: Changed amount of current available bandwidth is: %d", info->info.bwInfo.bwStat);
191 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
192 "[Metric Return]: Current bandwidth for receiving data is: %d kbps", info->info.bwInfo.rate);
193 metricList.type = BANDWIDTH_ESTIMATE_VALUE;
194 metricList.info.bandwidthInfo.trend = info->info.bwInfo.bwStat;
195 metricList.info.bandwidthInfo.rate = info->info.bwInfo.rate;
196 }
197 if (info->evt == FT_EVT_JITTER_DET) {
198 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
199 "[Metric Return]: Fillp connection quality information of socket(%d) is returned", fd);
200 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
201 "[Metric Return]: Predeicted network condition is: %d", info->info.jitterInfo.jitterLevel);
202 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
203 "[Metric Return]: Current available receiving buffer is: %d ms", info->info.jitterInfo.bufferTime);
204 metricList.type = JITTER_DETECTION_VALUE;
205 metricList.info.jitterInfo.jitterLevel = info->info.jitterInfo.jitterLevel;
206 metricList.info.jitterInfo.bufferTime = info->info.jitterInfo.bufferTime;
207 }
208 metricList.info.wifiChannelInfo = {};
209 metricList.info.frameStatusInfo = {};
210
211 auto itLock = g_streamSocketLockMap.find(fd);
212 if (itLock != g_streamSocketLockMap.end()) {
213 auto itListener = g_streamReceiverMap.find(fd);
214 if (itListener != g_streamReceiverMap.end()) {
215 std::thread([itListener, eventId, tvCount, metricList, &itLock]() {
216 std::lock_guard<std::mutex> guard(itLock->second);
217 itListener->second->OnQosEvent(eventId, tvCount, &metricList);
218 }).detach();
219 } else {
220 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "StreamReceiver for fd = %d is empty in the map", fd);
221 }
222 } else {
223 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "StreamSocketLock for fd = %d is empty in the map", fd);
224 }
225 } else {
226 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
227 "[Metric Return]: Fail to retrieve bandwidth and connection quality information");
228 return -1;
229 }
230 #endif
231 return 0;
232 }
233
FillpAppStatistics()234 void VtpStreamSocket::FillpAppStatistics()
235 {
236 int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
237 int32_t tvCount = 1;
238 QosTv metricList = {};
239 FillpStatisticsPcb fillpPcbStats = {};
240 struct timeval fillpStatsGetTime = {};
241
242 int getStatisticsRet = FtFillpStatsGet(streamFd_, &fillpPcbStats);
243 gettimeofday(&fillpStatsGetTime, nullptr);
244 if (getStatisticsRet == 0) {
245 metricList.type = STREAM_TRAFFIC_STASTICS;
246 metricList.info.appStatistics.statisticsGotTime = static_cast<uint64_t>((fillpStatsGetTime.tv_sec *
247 MS_PER_SECOND + fillpStatsGetTime.tv_usec / US_PER_MS)); /* ms */
248 metricList.info.appStatistics.periodRecvBits =
249 static_cast<uint64_t>(fillpPcbStats.appFcStastics.periodRecvBits);
250 metricList.info.appStatistics.pktNum = fillpPcbStats.appFcStastics.pktNum;
251 metricList.info.appStatistics.periodRecvPkts = fillpPcbStats.appFcStastics.periodRecvPkts;
252 metricList.info.appStatistics.periodRecvPktLoss = fillpPcbStats.appFcStastics.periodRecvPktLoss;
253 metricList.info.appStatistics.periodRecvRate = fillpPcbStats.appFcStastics.periodRecvRate;
254 metricList.info.appStatistics.periodRecvRateBps = fillpPcbStats.appFcStastics.periodRecvRateBps;
255 metricList.info.appStatistics.periodRtt = fillpPcbStats.appFcStastics.periodRtt;
256 metricList.info.appStatistics.periodRecvPktLossHighPrecision =
257 fillpPcbStats.appFcStastics.periodRecvPktLossHighPrecision;
258 metricList.info.appStatistics.periodSendLostPkts = fillpPcbStats.appFcStastics.periodSendLostPkts;
259 metricList.info.appStatistics.periodSendPkts = fillpPcbStats.appFcStastics.periodSendPkts;
260 metricList.info.appStatistics.periodSendPktLossHighPrecision =
261 fillpPcbStats.appFcStastics.periodSendPktLossHighPrecision;
262 metricList.info.appStatistics.periodSendBits = fillpPcbStats.appFcStastics.periodSendBits;
263 metricList.info.appStatistics.periodSendRateBps = fillpPcbStats.appFcStastics.periodSendRateBps;
264
265 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
266 "Succeed to get fillp statistics information for streamfd = %d", streamFd_);
267 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
268 "[Metric Return]: periodRtt is: %d", fillpPcbStats.appFcStastics.periodRtt);
269
270 std::lock_guard<std::mutex> guard(streamSocketLock_);
271
272 if (streamReceiver_ != nullptr) {
273 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
274 "[Metric Notify]: Fillp traffic statistics information of socket(%d) is notified", streamFd_);
275 streamReceiver_->OnQosEvent(eventId, tvCount, &metricList);
276 } else {
277 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "StreamReceiver for fd = %d is empty", streamFd_);
278 }
279 } else {
280 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
281 "Fail to get fillp statistics information for streamfd = %d, errorcode = %d", streamFd_, FtGetErrno());
282 }
283 }
284
CreateClient(IpAndPort & local,int streamType,const std::string & sessionKey)285 bool VtpStreamSocket::CreateClient(IpAndPort &local, int streamType, const std::string &sessionKey)
286 {
287 int fd = CreateAndBindSocket(local);
288 if (fd == -1) {
289 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "CreateAndBindSocket failed, errorcode:%d", FtGetErrno());
290 DestroyStreamSocket();
291 return false;
292 }
293
294 sessionKey_ = sessionKey;
295 streamType_ = streamType;
296 std::lock_guard<std::mutex> guard(streamSocketLock_);
297 streamFd_ = fd;
298 configCv_.notify_all();
299
300 SetDefaultConfig(fd);
301 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
302 "Success to create a client socket(%d) of stream type(%d)", fd, streamType);
303 return true;
304 }
305
CreateClient(IpAndPort & local,const IpAndPort & remote,int streamType,const std::string & sessionKey)306 bool VtpStreamSocket::CreateClient(IpAndPort &local, const IpAndPort &remote, int streamType,
307 const std::string &sessionKey)
308 {
309 if (!CreateClient(local, streamType, sessionKey)) {
310 return false;
311 }
312 /* enable the bandwidth and CQE estimation algorithms by FtSetSockOpt() for current ftsocket */
313 #ifdef FILLP_SUPPORT_BW_DET
314 bool isServer = false;
315 EnableBwEstimationAlgo(streamFd_, isServer);
316 #endif
317
318 bool connectRet = Connect(remote);
319 #ifdef FILLP_SUPPORT_BW_DET
320 if (connectRet == true) {
321 bool isServer = false;
322 RegisterMetricCallback(isServer); /* register the callback function */
323 }
324 #endif
325 return connectRet;
326 }
327
CreateServer(IpAndPort & local,int streamType,const std::string & sessionKey)328 bool VtpStreamSocket::CreateServer(IpAndPort &local, int streamType, const std::string &sessionKey)
329 {
330 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "CreateVtpServer start");
331 listenFd_ = CreateAndBindSocket(local);
332 if (listenFd_ == -1) {
333 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "create listenFd failed, errorcode %d", FtGetErrno());
334 DestroyStreamSocket();
335 return false;
336 }
337
338 bool ret = FtListen(listenFd_, MAX_CONNECTION_VALUE);
339 if (ret != 0) {
340 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtListen failed, ret :%d errorcode %d", ret, FtGetErrno());
341 DestroyStreamSocket();
342 return false;
343 }
344
345 epollFd_ = FtEpollCreate();
346 if (epollFd_ < 0) {
347 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Failed to create epoll fd:%d", FtGetErrno());
348 DestroyStreamSocket();
349 return false;
350 }
351 isStreamRecv_ = true;
352 streamType_ = streamType;
353 sessionKey_ = sessionKey;
354 auto self = this->GetSelf();
355 std::thread([self]() { self->NotifyStreamListener(); }).detach();
356
357 std::thread([self]() {
358 if (!self->Accept()) {
359 self->DestroyStreamSocket();
360 return;
361 }
362 self->DoStreamRecv();
363 self->DestroyStreamSocket();
364 }).detach();
365
366 bool &isDestroyed = isDestroyed_;
367 std::thread([self, &isDestroyed]() {
368 while (isDestroyed == false) {
369 self->FillpAppStatistics();
370 std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
371 }
372 }).detach();
373
374 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
375 "CreateServer end, listenFd:%d, epollFd:%d, streamType:%d", listenFd_, epollFd_, streamType_);
376 return true;
377 }
378
DestroyStreamSocket()379 void VtpStreamSocket::DestroyStreamSocket()
380 {
381 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyStreamSocket start");
382 std::lock_guard<std::mutex> guard(streamSocketLock_);
383 if (isDestroyed_) {
384 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "StreamSocket is already destroyed");
385 return;
386 }
387 if (listenFd_ != -1) {
388 FtClose(listenFd_);
389 listenFd_ = -1;
390 }
391
392 if (streamFd_ != -1) {
393 RemoveStreamSocketLock(streamFd_); /* remove the socket lock from the map */
394 RemoveStreamSocketListener(streamFd_); /* remove the socket listener from the map */
395 FtClose(streamFd_);
396 streamFd_ = -1;
397 }
398
399 if (epollFd_ != -1) {
400 FtClose(epollFd_);
401 epollFd_ = -1;
402 }
403
404 if (streamReceiver_ != nullptr) {
405 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyStreamSocket receiver delete");
406 streamReceiver_->OnStreamStatus(STREAM_CLOSED);
407 streamReceiver_.reset();
408 }
409
410 QuitStreamBuffer();
411 vtpInstance_->UpdateSocketStreamCount(false);
412 isDestroyed_ = true;
413 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyStreamSocket end");
414 }
415
Connect(const IpAndPort & remote)416 bool VtpStreamSocket::Connect(const IpAndPort &remote)
417 {
418 if (remote.ip.empty()) {
419 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "remote addr error, ip is nullptr");
420 DestroyStreamSocket();
421 return false;
422 }
423
424 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
425 "Connect to server(server port:%d)", remote.port);
426 remoteIpPort_ = remote;
427
428 struct sockaddr_in remoteSockAddr;
429 remoteSockAddr.sin_family = AF_INET;
430 remoteSockAddr.sin_port = htons(static_cast<short>(remote.port));
431 remoteSockAddr.sin_addr.s_addr = inet_addr(remote.ip.c_str());
432
433 int ret = FtConnect(streamFd_, reinterpret_cast<struct sockaddr *>(&remoteSockAddr), sizeof(remoteSockAddr));
434 if (ret != 0) {
435 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtConnect failed, ret :%d, errorno: %d", ret, FtGetErrno());
436 DestroyStreamSocket();
437 return false;
438 }
439
440 epollFd_ = FtEpollCreate();
441 if (epollFd_ < 0) {
442 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Failed to create epoll fd:%d", FtGetErrno());
443 DestroyStreamSocket();
444 return false;
445 }
446
447 if (SetSocketEpollMode(streamFd_) != ERR_OK) {
448 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetSocketEpollMode failed, fd = %d", streamFd_);
449 DestroyStreamSocket();
450 return false;
451 }
452 isStreamRecv_ = true;
453 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Success to connect remote, and create a thread to recv data.");
454
455 auto self = this->GetSelf();
456 std::thread([self]() { self->NotifyStreamListener(); }).detach();
457
458 std::thread([self]() {
459 self->DoStreamRecv();
460 self->DestroyStreamSocket();
461 }).detach();
462
463 bool &isDestroyed = isDestroyed_;
464 std::thread([self, &isDestroyed]() {
465 while (isDestroyed == false) {
466 self->FillpAppStatistics();
467 std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
468 }
469 }).detach();
470 return true;
471 }
472
Send(std::unique_ptr<IStream> stream)473 bool VtpStreamSocket::Send(std::unique_ptr<IStream> stream)
474 {
475 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "send in..., streamType:%d, data size:%zd, ext size:%zd", streamType_,
476 stream->GetBufferLen(), stream->GetExtBufferLen());
477
478 if (!isBlocked_) {
479 isBlocked_ = true;
480 if (!SetNonBlockMode(streamFd_, StreamAttr(false))) {
481 return false;
482 }
483 }
484
485 std::unique_ptr<char[]> data = nullptr;
486 ssize_t len = 0;
487 if (streamType_ == RAW_STREAM) {
488 data = stream->GetBuffer();
489 len = stream->GetBufferLen();
490 } else if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
491 StreamPacketizer packet(streamType_, std::move(stream));
492
493 auto plainData = packet.PacketizeStream();
494 if (plainData == nullptr) {
495 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "PacketizeStream failed");
496 return false;
497 }
498 len = packet.GetPacketLen() + GetEncryptOverhead();
499 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
500 "packet.GetPacketLen() = %zd, GetEncryptOverhead() = %zd", packet.GetPacketLen(), GetEncryptOverhead());
501 data = std::make_unique<char[]>(len + FRAME_HEADER_LEN);
502 ssize_t encLen = Encrypt(plainData.get(), packet.GetPacketLen(),
503 data.get() + FRAME_HEADER_LEN, len);
504 if (encLen != len) {
505 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
506 "encrypted failed, dataLen = %zd, encryptLen = %zd", len, encLen);
507 return false;
508 }
509 InsertBufferLength(len, FRAME_HEADER_LEN, reinterpret_cast<uint8_t *>(data.get()));
510 len += FRAME_HEADER_LEN;
511 }
512
513 int ret = FtSend(streamFd_, data.get(), len, 0);
514 if (ret == -1) {
515 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "send failed, errorno: %d", FtGetErrno());
516 return false;
517 }
518
519 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "send out..., streamType:%d, data size:%zd", streamType_, len);
520 return true;
521 }
522
SetOption(int type,const StreamAttr & value)523 bool VtpStreamSocket::SetOption(int type, const StreamAttr &value)
524 {
525 PrintOptionInfo(type, value);
526 auto it = optFuncMap_.find(type);
527 if (it == optFuncMap_.end()) {
528 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "not found type = %d", type);
529 return false;
530 }
531
532 if (value.GetType() != it->second.valueType) {
533 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN,
534 "type = %d, value.type = %d", value.GetType(), it->second.valueType);
535 return false;
536 }
537
538 MySetFunc set = it->second.set;
539 if (set == nullptr) {
540 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "set is nullptr");
541 return false;
542 }
543
544 if (type == NON_BLOCK || type == TOS) {
545 return (this->*set)(static_cast<int>(streamFd_), value);
546 }
547
548 auto outerIt = FILLP_TYPE_MAP.find(type);
549 if (outerIt != FILLP_TYPE_MAP.end()) {
550 return (this->*set)(outerIt->second, value);
551 }
552
553 auto innerIt = INNER_FILLP_TYPE_MAP.find(type);
554 if (innerIt != INNER_FILLP_TYPE_MAP.end()) {
555 return (this->*set)(innerIt->second, value);
556 }
557
558 return (this->*set)(static_cast<int>(type), value);
559 }
560
GetOption(int type) const561 StreamAttr VtpStreamSocket::GetOption(int type) const
562 {
563 StreamAttr attr {};
564 auto it = optFuncMap_.find(type);
565 if (it != optFuncMap_.end()) {
566 MyGetFunc get = it->second.get;
567 if (get == nullptr) {
568 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Can not get option:%d", type);
569 return std::move(StreamAttr());
570 }
571 if (type == NON_BLOCK) {
572 attr = (this->*get)(static_cast<int>(streamFd_));
573 }
574 attr = (this->*get)(static_cast<int>(type));
575 }
576
577 PrintOptionInfo(type, attr);
578 return attr;
579 }
580
SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)581 bool VtpStreamSocket::SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)
582 {
583 if (receiver == nullptr) {
584 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "receiver is nullptr");
585 return false;
586 }
587
588 std::lock_guard<std::mutex> guard(streamSocketLock_);
589 streamReceiver_ = receiver;
590 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "set receiver success");
591 return true;
592 }
593
InitVtpInstance(const std::string & pkgName)594 bool VtpStreamSocket::InitVtpInstance(const std::string &pkgName)
595 {
596 return vtpInstance_->InitVtp(pkgName);
597 }
598
DestroyVtpInstance(const std::string & pkgName)599 void VtpStreamSocket::DestroyVtpInstance(const std::string &pkgName)
600 {
601 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyVtpInstance start");
602 vtpInstance_->DestroyVtp(pkgName);
603 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "DestroyVtpInstance end");
604 }
605
CreateAndBindSocket(IpAndPort & local)606 int VtpStreamSocket::CreateAndBindSocket(IpAndPort &local)
607 {
608 localIpPort_ = local;
609 vtpInstance_->UpdateSocketStreamCount(true);
610 if (local.ip.empty()) {
611 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "ip is empty");
612 return -1;
613 }
614
615 int sockFd = FtSocket(AF_INET, SOCK_STREAM, IPPROTO_FILLP);
616 if (sockFd == -1) {
617 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtSocket failed, errorcode = %d", FtGetErrno());
618 return -1;
619 }
620
621 // bind
622 sockaddr_in localSockAddr = {0};
623 localSockAddr.sin_family = AF_INET;
624 localSockAddr.sin_port = htons((short)local.port);
625 localSockAddr.sin_addr.s_addr = inet_addr(local.ip.c_str());
626
627 socklen_t localAddrLen = sizeof(localSockAddr);
628 int ret = FtBind(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), localAddrLen);
629 if (ret == -1) {
630 FtClose(sockFd);
631 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtBind failed, errorcode %d", FtGetErrno());
632 return -1;
633 }
634
635 // 获取port
636 ret = FtGetSockName(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), &localAddrLen);
637 if (ret != ERR_OK) {
638 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "getsockname error ret: %d, errorcode :%d", ret, FtGetErrno());
639 FtClose(sockFd);
640 return -1;
641 }
642
643 char host[ADDR_MAX_SIZE];
644 localIpPort_.port = static_cast<int32_t>(ntohs(localSockAddr.sin_port));
645 localIpPort_.ip = inet_ntop(AF_INET, &(localSockAddr.sin_addr), host, ADDR_MAX_SIZE);
646 local.port = localIpPort_.port;
647
648 if (SetSocketBoundInner(sockFd, localIpPort_.ip) == false) {
649 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetSocketBoundInner failed, errorcode :%d", FtGetErrno());
650 }
651 return sockFd;
652 }
653
654
EnableBwEstimationAlgo(int streamFd,bool isServer) const655 bool VtpStreamSocket::EnableBwEstimationAlgo(int streamFd, bool isServer) const
656 {
657 #ifdef FILLP_SUPPORT_BW_DET
658 int errBwDet;
659 if (isServer == true) {
660 int32_t enableBwDet = FILLP_BW_DET_RX_ENABLE;
661 errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
662 &enableBwDet, sizeof(enableBwDet));
663 } else {
664 int32_t enableBwDet = FILLP_BW_DET_TX_ENABLE;
665 errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
666 &enableBwDet, sizeof(enableBwDet));
667 }
668 if (errBwDet < 0) {
669 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
670 "Fail to enable bandwidth estimation algorithm for stream: %d, errorcode = %d", streamFd, FtGetErrno());
671 return true;
672 } else {
673 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
674 "Success to enable bandwidth estimation algorithm for stream: %d", streamFd);
675 return false;
676 }
677 #else
678 return true;
679 #endif
680 }
681
EnableJitterDetectionAlgo(int streamFd) const682 bool VtpStreamSocket::EnableJitterDetectionAlgo(int streamFd) const
683 {
684 #ifdef FILLP_SUPPORT_CQE
685 int32_t enableCQE = FILLP_CQE_ENABLE;
686 auto errCQE = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_CQE_ALGO, &enableCQE, sizeof(enableCQE));
687 if (errCQE < 0) {
688 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
689 "Fail to enable CQE algorithm for stream: %d, errorcode = %d", streamFd, FtGetErrno());
690 return true;
691 } else {
692 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
693 "Success to enable CQE algorithm for stream: %d", streamFd);
694 return false;
695 }
696 #else
697 return true;
698 #endif
699 }
700
RegisterMetricCallback(bool isServer)701 void VtpStreamSocket::RegisterMetricCallback(bool isServer)
702 {
703 VtpStreamSocket::AddStreamSocketLock(streamFd_, streamSocketLock_);
704 VtpStreamSocket::AddStreamSocketListener(streamFd_, streamReceiver_);
705 #if (defined(FILLP_SUPPORT_BW_DET) && defined(FILLP_SUPPORT_BW_DET))
706 int regStatisticsRet = FtApiRegEventCallbackFunc(streamFd_, FillpBwAndJitterStatistics);
707 if (isServer == true) {
708 if (regStatisticsRet == 0) {
709 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
710 "Success to register the stream callback function at server side with Fd = %d", streamFd_);
711 } else {
712 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
713 "Fail to register the stream callback function at server side with Fd = %d", streamFd_);
714 }
715 } else {
716 if (regStatisticsRet == 0) {
717 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
718 "Success to register the stream callback function at client side with Fd = %d", streamFd_);
719 } else {
720 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
721 "Fail to register the stream callback function at client side with Fd = %d", streamFd_);
722 }
723 }
724 #endif
725 }
726
Accept()727 bool VtpStreamSocket::Accept()
728 {
729 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "accept start");
730
731 auto fd = FtAccept(listenFd_, nullptr, nullptr);
732 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "accept streamFd:%d", fd);
733 if (fd == -1) {
734 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "errorcode = %d", FtGetErrno());
735 return false;
736 }
737
738 sockaddr remoteAddr {};
739 socklen_t remoteAddrLen = sizeof(remoteAddr);
740 auto ret = FtGetPeerName(fd, &remoteAddr, &remoteAddrLen);
741 if (ret != ERR_OK) {
742 FtClose(fd);
743 return false;
744 }
745
746 char host[ADDR_MAX_SIZE];
747 if (remoteAddr.sa_family == AF_INET) {
748 auto v4Addr = reinterpret_cast<const sockaddr_in *>(&remoteAddr);
749 remoteIpPort_.ip = inet_ntop(AF_INET, &(v4Addr->sin_addr), host, ADDR_MAX_SIZE);
750 remoteIpPort_.port = v4Addr->sin_port;
751 } else {
752 auto v6Addr = reinterpret_cast<const sockaddr_in6 *>(&remoteAddr);
753 remoteIpPort_.ip = inet_ntop(AF_INET6, &(v6Addr->sin6_addr), host, ADDR_MAX_SIZE);
754 remoteIpPort_.port = v6Addr->sin6_port;
755 }
756
757 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
758 "Accept a client(server port:%d)", remoteIpPort_.port);
759 SetDefaultConfig(fd);
760
761 if (SetSocketEpollMode(fd) != ERR_OK) {
762 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetSocketEpollMode failed, fd = %d", fd);
763 FtClose(fd);
764 return false;
765 }
766
767 std::lock_guard<std::mutex> guard(streamSocketLock_);
768 streamFd_ = fd;
769 configCv_.notify_all();
770
771 if (streamReceiver_ != nullptr) {
772 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "notify stream connected!");
773 streamReceiver_->OnStreamStatus(STREAM_CONNECTED);
774 }
775
776 /* enable the bandwidth and CQE estimation algorithms for current ftsocket */
777 #ifdef FILLP_SUPPORT_BW_DET
778 bool isServer = true;
779 EnableBwEstimationAlgo(streamFd_, isServer);
780 RegisterMetricCallback(isServer); /* register the callback function */
781 #endif
782 #ifdef FILLP_SUPPORT_CQE
783 EnableJitterDetectionAlgo(streamFd_);
784 #endif
785
786 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "accept success!");
787 return true;
788 }
789
EpollTimeout(int fd,int timeout)790 int VtpStreamSocket::EpollTimeout(int fd, int timeout)
791 {
792 struct SpungeEpollEvent events[MAX_EPOLL_NUM];
793 (void)memset_s(events, sizeof(events), 0, sizeof(events));
794 while (true) {
795 FILLP_INT fdNum = FtEpollWait(epollFd_, events, MAX_EPOLL_NUM, timeout);
796 if (fdNum <= 0) {
797 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
798 "FtEpollWait failed, ret = %d, errno = %d", fdNum, FtGetErrno());
799 return -FtGetErrno();
800 }
801
802 for (FILLP_INT i = 0; i < fdNum; i++) {
803 if (events[i].data.fd != fd) {
804 continue;
805 }
806
807 if (events[i].events & (SPUNGE_EPOLLHUP | SPUNGE_EPOLLERR)) {
808 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
809 "EpollTimeout, something may be wrong in this socket, fd = %d, events = %u", fd,
810 (unsigned int)events[i].events);
811 return -1;
812 }
813
814 if (events[i].events & SPUNGE_EPOLLIN) {
815 return 0;
816 }
817 }
818 }
819 }
820
SetSocketEpollMode(int fd)821 int VtpStreamSocket::SetSocketEpollMode(int fd)
822 {
823 if (!SetNonBlockMode(fd, StreamAttr(true))) {
824 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetNonBlockMode failed, errno = %d", FtGetErrno());
825 return -1;
826 }
827
828 struct SpungeEpollEvent event = {0};
829 event.events = SPUNGE_EPOLLIN;
830 event.data.fd = fd;
831
832 auto ret = FtEpollCtl(epollFd_, SPUNGE_EPOLL_CTL_ADD, fd, &event);
833 if (ret != ERR_OK) {
834 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtEpollCtl failed, ret = %d, errno = %d", ret, FtGetErrno());
835 return ret;
836 }
837
838 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "SetNonBlockMode success");
839 return 0;
840 }
841
InsertBufferLength(int num,int length,uint8_t * output) const842 void VtpStreamSocket::InsertBufferLength(int num, int length, uint8_t *output) const
843 {
844 for (int i = 0; i < length; i++) {
845 output[length - 1 - i] = static_cast<unsigned int>(
846 ((static_cast<unsigned int>(num) >> static_cast<unsigned int>(BYTE_TO_BIT * i))) & INT_TO_BYTE);
847 }
848 }
849
MakeStreamData(StreamData & data,const StreamFrameInfo & info) const850 std::unique_ptr<IStream> VtpStreamSocket::MakeStreamData(StreamData &data, const StreamFrameInfo &info) const
851 {
852 std::unique_ptr<IStream> stream = nullptr;
853 switch (streamType_) {
854 case VIDEO_SLICE_STREAM:
855 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "do not support VIDEO_SLICE_STREAM type = %d", streamType_);
856 break;
857 case COMMON_VIDEO_STREAM:
858 case COMMON_AUDIO_STREAM:
859 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
860 "streamType = %d, seqnum=%d, streamid=%d", streamType_, info.seqNum, info.streamId);
861 stream = IStream::MakeCommonStream(data, info);
862 break;
863 case RAW_STREAM:
864 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "streamType = %d", streamType_);
865 stream = IStream::MakeRawStream(data, info);
866 break;
867 default:
868 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "do not support type = %d", streamType_);
869 break;
870 }
871 if (stream == nullptr) {
872 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "IStream construct error");
873 return nullptr;
874 }
875
876 return stream;
877 }
878
RecvStreamLen()879 int VtpStreamSocket::RecvStreamLen()
880 {
881 int hdrSize = FRAME_HEADER_LEN;
882 if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
883 hdrSize = streamHdrSize_;
884 }
885
886 int len = -1;
887 int timeout = -1;
888 auto buffer = std::make_unique<char[]>(hdrSize);
889 if (EpollTimeout(streamFd_, timeout) == 0) {
890 do {
891 len = FtRecv(streamFd_, buffer.get(), hdrSize, 0);
892 } while (len <= 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
893 }
894 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "recv frame header, len = %d, scene:%d", len, scene_);
895
896 if (len <= 0) {
897 return -1;
898 }
899
900 if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
901 std::lock_guard<std::mutex> guard(streamSocketLock_);
902 if (streamReceiver_ != nullptr) {
903 return streamReceiver_->OnStreamHdrReceived(std::move(buffer), hdrSize);
904 }
905 }
906
907 return ntohl(*reinterpret_cast<int *>(buffer.get()));
908 }
909
DoStreamRecv()910 void VtpStreamSocket::DoStreamRecv()
911 {
912 while (isStreamRecv_) {
913 std::unique_ptr<char[]> dataBuffer = nullptr;
914 std::unique_ptr<char[]> extBuffer = nullptr;
915 int extLen = 0;
916 StreamFrameInfo info = {};
917 int dataLength = 0;
918 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "recv stream");
919 dataLength = VtpStreamSocket::RecvStreamLen();
920 if (dataLength <= 0 || dataLength > MAX_STREAM_LEN) {
921 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "read frame length error, dataLength = %d", dataLength);
922 break;
923 }
924 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
925 "recv a new frame, dataLength = %d, stream type:%d", dataLength, streamType_);
926 dataBuffer = VtpStreamSocket::RecvStream(dataLength);
927
928 if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
929 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "recv common stream");
930 int decryptedLength = dataLength;
931 auto decryptedBuffer = std::move(dataBuffer);
932
933 int plainDataLength = decryptedLength - GetEncryptOverhead();
934 std::unique_ptr<char[]> plainData = std::make_unique<char[]>(plainDataLength);
935 ssize_t decLen = Decrypt(decryptedBuffer.get(), decryptedLength, plainData.get(), plainDataLength);
936 if (decLen != plainDataLength) {
937 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
938 "Decrypt failed, dataLength = %d, decryptedLen = %zd", plainDataLength, decLen);
939 break;
940 }
941 auto header = plainData.get();
942 StreamDepacketizer decode(streamType_);
943 decode.DepacketizeHeader(header);
944
945 auto buffer = plainData.get() + sizeof(CommonHeader);
946 decode.DepacketizeBuffer(buffer);
947
948 extBuffer = decode.GetUserExt();
949 extLen = decode.GetUserExtSize();
950 info.seqNum = decode.GetSeqNum();
951 info.streamId = decode.GetStreamId();
952 dataBuffer = decode.GetData();
953 dataLength = decode.GetDataLength();
954 if (dataLength <= 0) {
955 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
956 "common depacketize error, dataLength = %d", dataLength);
957 break;
958 }
959 }
960
961 StreamData data = { std::move(dataBuffer), dataLength, std::move(extBuffer), extLen };
962 std::unique_ptr<IStream> stream = MakeStreamData(data, info);
963 if (stream == nullptr) {
964 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "MakeStreamData failed, stream == nullptr");
965 break;
966 }
967
968 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
969 "recv frame done, dataLength = %d, stream type:%d", dataLength, streamType_);
970
971 if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
972 std::lock_guard<std::mutex> guard(streamSocketLock_);
973 if (streamReceiver_ != nullptr) {
974 streamReceiver_->OnStreamReceived(std::move(stream));
975 continue;
976 }
977 }
978
979 PutStream(std::move(stream));
980 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG,
981 "put frame done, dataLength = %d, stream type:%d", dataLength, streamType_);
982 }
983 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "recv thread exit");
984 }
985
RecvStream(int dataLength)986 std::unique_ptr<char[]> VtpStreamSocket::RecvStream(int dataLength)
987 {
988 auto buffer = std::make_unique<char[]>(dataLength);
989 int recvLen = 0;
990 while (recvLen < dataLength) {
991 int ret = -1;
992 int timeout = -1;
993
994 if (EpollTimeout(streamFd_, timeout) == 0) {
995 do {
996 ret = FtRecv(streamFd_, (buffer.get() + recvLen), dataLength - recvLen, 0);
997 } while (ret < 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
998 }
999
1000 if (ret == -1) {
1001 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "read frame failed, errno: %d", FtGetErrno());
1002 return nullptr;
1003 }
1004
1005 recvLen += ret;
1006 }
1007 return std::unique_ptr<char[]>(buffer.release());
1008 }
1009
SetDefaultConfig(int fd)1010 void VtpStreamSocket::SetDefaultConfig(int fd)
1011 {
1012 if (!SetIpTos(fd, StreamAttr(static_cast<int>(IPTOS_LOWDELAY)))) {
1013 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "SetIpTos failed");
1014 }
1015
1016 if (!SetOption(SEND_BUF_SIZE, StreamAttr(static_cast<int>(DEFAULT_UDP_BUFFER_SIZE)))) {
1017 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Set send buff failed");
1018 }
1019
1020 if (!SetOption(InnerStreamOptionType::RECV_CACHE, StreamAttr(static_cast<int>(FILLP_VTP_RECV_CACHE_SIZE)))) {
1021 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Set recv cache failed");
1022 }
1023
1024 if (!SetOption(InnerStreamOptionType::SEND_CACHE, StreamAttr(static_cast<int>(FILLP_VTP_SEND_CACHE_SIZE)))) {
1025 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Set send cache failed");
1026 }
1027 }
1028
SetIpTos(int fd,const StreamAttr & tos)1029 bool VtpStreamSocket::SetIpTos(int fd, const StreamAttr &tos)
1030 {
1031 auto tmp = tos.GetIntValue();
1032 if (FtSetSockOpt(fd, IPPROTO_IP, IP_TOS, &tmp, sizeof(tmp)) != ERR_OK) {
1033 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "SetIpTos wrong! fd=%d, errorcode=%d", fd, FtGetErrno());
1034 return false;
1035 }
1036
1037 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Success to set ip tos: fd=%d, tos=%d", fd, tmp);
1038 return true;
1039 }
1040
GetIpTos(int type) const1041 StreamAttr VtpStreamSocket::GetIpTos(int type) const
1042 {
1043 static_cast<void>(type);
1044 int tos;
1045 int size = sizeof(tos);
1046
1047 if (FtGetSockOpt(streamFd_, IPPROTO_IP, IP_TOS, &tos, &size) != ERR_OK) {
1048 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "FtGetSockOpt errorcode = %d", FtGetErrno());
1049 return std::move(StreamAttr());
1050 }
1051
1052 return std::move(StreamAttr(tos));
1053 }
1054
GetStreamSocketFd(int type) const1055 StreamAttr VtpStreamSocket::GetStreamSocketFd(int type) const
1056 {
1057 static_cast<void>(type);
1058 return std::move(StreamAttr(streamFd_));
1059 }
1060
GetListenSocketFd(int type) const1061 StreamAttr VtpStreamSocket::GetListenSocketFd(int type) const
1062 {
1063 static_cast<void>(type);
1064 return std::move(StreamAttr(listenFd_));
1065 }
1066
SetSocketBoundInner(int fd,std::string ip) const1067 bool VtpStreamSocket::SetSocketBoundInner(int fd, std::string ip) const
1068 {
1069 auto boundIp = (ip == "") ? localIpPort_.ip : ip;
1070 struct ifaddrs *ifList = nullptr;
1071 if (getifaddrs(&ifList) < 0) {
1072 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1073 "get interface address return error %d (%s)", errno, strerror(errno));
1074 return false;
1075 }
1076
1077 struct ifaddrs *ifa = nullptr;
1078 for (ifa = ifList; ifa != nullptr; ifa = ifa->ifa_next) {
1079 if (ifa->ifa_addr == nullptr) {
1080 continue;
1081 }
1082 if (ifa->ifa_addr->sa_family != AF_INET) {
1083 continue;
1084 }
1085
1086 char host[ADDR_MAX_SIZE];
1087 std::string devName(ifa->ifa_name);
1088 if (strcmp(boundIp.c_str(), inet_ntop(AF_INET, &(((struct sockaddr_in *)ifa->ifa_addr)->sin_addr),
1089 host, ADDR_MAX_SIZE)) == 0) {
1090 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "current use interface %s to bind to socket", ifa->ifa_name);
1091 auto err = FtSetSockOpt(fd, SOL_SOCKET, SO_BINDTODEVICE, devName.c_str(), devName.size());
1092 if (err < 0) {
1093 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "fail to set socket binding to device");
1094 freeifaddrs(ifList);
1095 return false;
1096 }
1097 break;
1098 }
1099 }
1100 freeifaddrs(ifList);
1101
1102 return true;
1103 }
1104
SetSocketBindToDevices(int type,const StreamAttr & ip)1105 bool VtpStreamSocket::SetSocketBindToDevices(int type, const StreamAttr &ip)
1106 {
1107 static_cast<void>(type);
1108 auto tmp = ip.GetStrValue();
1109 auto boundIp = (tmp == "") ? localIpPort_.ip : tmp;
1110 return SetSocketBoundInner(streamFd_, boundIp);
1111 }
1112
SetVtpStackConfigDelayed(int type,const StreamAttr & value)1113 bool VtpStreamSocket::SetVtpStackConfigDelayed(int type, const StreamAttr &value)
1114 {
1115 std::unique_lock<std::mutex> lock(streamSocketLock_);
1116 if (streamFd_ == -1) {
1117 configCv_.wait(lock, [this] { return streamFd_ != -1; });
1118 }
1119 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "set vtp stack config, streamFd = %d", streamFd_);
1120 return SetVtpStackConfig(type, value);
1121 }
1122
SetVtpStackConfig(int type,const StreamAttr & value)1123 bool VtpStreamSocket::SetVtpStackConfig(int type, const StreamAttr &value)
1124 {
1125 if (streamFd_ == -1) {
1126 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "set vtp stack config when streamFd is legal");
1127 auto self = GetSelf();
1128 std::thread([self, type, value]() { self->SetVtpStackConfigDelayed(type, value); }).detach();
1129 return true;
1130 }
1131
1132 if (value.GetType() == INT_TYPE) {
1133 int intVal = value.GetIntValue();
1134 int ret = FtConfigSet(type, &intVal, &streamFd_);
1135 if (ret != 0) {
1136 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1137 "FtConfigSet failed, type = %d, errorcode = %d", type, FtGetErrno());
1138 return false;
1139 }
1140
1141 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
1142 "setVtpConfig(%d) success, fd= %d, value= %d", type, streamFd_, intVal);
1143 return true;
1144 }
1145
1146 if (value.GetType() == BOOL_TYPE) {
1147 bool flag = value.GetBoolValue();
1148 int ret = FtConfigSet(type, &flag, &streamFd_);
1149 if (ret != 0) {
1150 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1151 "FtConfigSet failed, type = %d, errorcode = %d", type, FtGetErrno());
1152 return false;
1153 }
1154
1155 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO,
1156 "setVtpConfig(%d) success, fd= %d, value= %d", type, streamFd_, flag);
1157 return true;
1158 }
1159
1160 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "UNKNOWN TYPE!");
1161 return false;
1162 }
1163
GetVtpStackConfig(int type) const1164 StreamAttr VtpStreamSocket::GetVtpStackConfig(int type) const
1165 {
1166 int intVal = -1;
1167 int configFd = (streamFd_ == -1) ? FILLP_CONFIG_ALL_SOCKET : streamFd_;
1168 int ret = FtConfigGet(type, &intVal, &configFd);
1169 if (ret != 0) {
1170 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR,
1171 "FtConfigGet failed, type = %d, errorcode = %d", type, FtGetErrno());
1172 return std::move(StreamAttr());
1173 }
1174
1175 int valType = ValueType::UNKNOWN;
1176 for (auto it = FILLP_TYPE_MAP.begin(); it != FILLP_TYPE_MAP.end(); it++) {
1177 if (it->second != type) {
1178 continue;
1179 }
1180
1181 valType = optFuncMap_.at(it->first).valueType;
1182 break;
1183 }
1184
1185 if (valType != ValueType::UNKNOWN) {
1186 for (auto it = INNER_FILLP_TYPE_MAP.begin(); it != INNER_FILLP_TYPE_MAP.end(); it++) {
1187 if (it->second != type) {
1188 continue;
1189 }
1190
1191 valType = optFuncMap_.at(it->first).valueType;
1192 break;
1193 }
1194 }
1195
1196 if (valType == BOOL_TYPE) {
1197 return std::move(StreamAttr(!!intVal));
1198 }
1199
1200 return std::move(StreamAttr(intVal));
1201 }
1202
SetNonBlockMode(int fd,const StreamAttr & value)1203 bool VtpStreamSocket::SetNonBlockMode(int fd, const StreamAttr &value)
1204 {
1205 FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1206 if (flags < 0) {
1207 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "failed to get FtFcntl, flags = %d", flags);
1208 flags = 0;
1209 }
1210 bool nonBlock = value.GetBoolValue();
1211
1212 flags = nonBlock ? static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) | O_NONBLOCK)) :
1213 static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) & ~O_NONBLOCK));
1214
1215 FILLP_INT res = FtFcntl(fd, F_SETFL, flags);
1216 if (res < 0) {
1217 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "failed to set FtFcntl, res = %d", res);
1218 return false;
1219 }
1220
1221 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Successfully to set fd(%d) nonBlock mode = %d", fd, nonBlock);
1222 return true;
1223 }
1224
GetNonBlockMode(int fd) const1225 StreamAttr VtpStreamSocket::GetNonBlockMode(int fd) const
1226 {
1227 FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1228 if (static_cast<unsigned int>(flags) & O_NONBLOCK) {
1229 return std::move(StreamAttr(true));
1230 }
1231
1232 return std::move(StreamAttr(false));
1233 }
1234
GetIp(int type) const1235 StreamAttr VtpStreamSocket::GetIp(int type) const
1236 {
1237 if (type == LOCAL_IP) {
1238 return std::move(StreamAttr(localIpPort_.ip));
1239 }
1240
1241 return std::move(StreamAttr(remoteIpPort_.ip));
1242 }
1243
GetPort(int type) const1244 StreamAttr VtpStreamSocket::GetPort(int type) const
1245 {
1246 if (type == LOCAL_PORT) {
1247 return std::move(StreamAttr(localIpPort_.port));
1248 }
1249 return std::move(StreamAttr(remoteIpPort_.port));
1250 }
1251
SetStreamType(int type,const StreamAttr & value)1252 bool VtpStreamSocket::SetStreamType(int type, const StreamAttr &value)
1253 {
1254 if (type != STREAM_TYPE_INT) {
1255 return false;
1256 }
1257
1258 streamType_ = value.GetIntValue();
1259 return true;
1260 }
1261
GetStreamType(int type) const1262 StreamAttr VtpStreamSocket::GetStreamType(int type) const
1263 {
1264 if (type != STREAM_TYPE_INT) {
1265 return std::move(StreamAttr());
1266 }
1267
1268 return std::move(StreamAttr(streamType_));
1269 }
1270
SetStreamScene(int type,const StreamAttr & value)1271 bool VtpStreamSocket::SetStreamScene(int type, const StreamAttr &value)
1272 {
1273 static_cast<void>(type);
1274 if (value.GetType() != INT_TYPE) {
1275 return false;
1276 }
1277 scene_ = value.GetIntValue();
1278 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Set scene to %d", scene_);
1279 return true;
1280 }
1281
SetStreamHeaderSize(int type,const StreamAttr & value)1282 bool VtpStreamSocket::SetStreamHeaderSize(int type, const StreamAttr &value)
1283 {
1284 static_cast<void>(type);
1285 if (value.GetType() != INT_TYPE) {
1286 return false;
1287 }
1288 streamHdrSize_ = value.GetIntValue();
1289 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "Set header size to %d", streamHdrSize_);
1290 return true;
1291 }
1292
NotifyStreamListener()1293 void VtpStreamSocket::NotifyStreamListener()
1294 {
1295 while (isStreamRecv_) {
1296 int streamNum = GetStreamNum();
1297 if (streamNum >= STREAM_BUFFER_THRESHOLD) {
1298 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_WARN, "Too many data in receiver, num = %d", streamNum);
1299 }
1300
1301 auto stream = TakeStream();
1302 if (stream == nullptr) {
1303 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Pop stream failed");
1304 break;
1305 }
1306
1307 std::lock_guard<std::mutex> guard(streamSocketLock_);
1308 if (streamReceiver_ != nullptr) {
1309 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "notify listener");
1310 streamReceiver_->OnStreamReceived(std::move(stream));
1311 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "notify listener done.");
1312 }
1313 }
1314 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_INFO, "notify thread exit");
1315 }
1316
GetEncryptOverhead() const1317 ssize_t VtpStreamSocket::GetEncryptOverhead() const
1318 {
1319 return OVERHEAD_LEN;
1320 }
1321
Encrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1322 ssize_t VtpStreamSocket::Encrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1323 {
1324 AesGcmCipherKey cipherKey = {0};
1325
1326 if (inLen - OVERHEAD_LEN > outLen) {
1327 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Encrypt invalid para.");
1328 return SOFTBUS_ERR;
1329 }
1330
1331 cipherKey.keyLen = SESSION_KEY_LENGTH;
1332 if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.c_str(), SESSION_KEY_LENGTH) != EOK) {
1333 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "memcpy key error.");
1334 return SOFTBUS_ERR;
1335 }
1336
1337 int ret = SoftBusEncryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1338 (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1339 if (ret != SOFTBUS_OK || outLen != inLen + OVERHEAD_LEN) {
1340 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Encrypt Data fail. %d", ret);
1341 return SOFTBUS_ENCRYPT_ERR;
1342 }
1343 return outLen;
1344 }
1345
Decrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1346 ssize_t VtpStreamSocket::Decrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1347 {
1348 AesGcmCipherKey cipherKey = {0};
1349
1350 if (inLen - OVERHEAD_LEN > outLen) {
1351 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Decrypt invalid para.");
1352 return SOFTBUS_ERR;
1353 }
1354
1355 cipherKey.keyLen = SESSION_KEY_LENGTH; // 256 bit encryption
1356 if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.c_str(), SESSION_KEY_LENGTH) != EOK) {
1357 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "memcpy key error.");
1358 return SOFTBUS_ERR;
1359 }
1360 int ret = SoftBusDecryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1361 (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1362 if (ret != SOFTBUS_OK) {
1363 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_ERROR, "Decrypt Data fail. %d ", ret);
1364 return SOFTBUS_DECRYPT_ERR;
1365 }
1366
1367 return outLen;
1368 }
1369
GetCryptErrorReason(void) const1370 void VtpStreamSocket::GetCryptErrorReason(void) const
1371 {
1372 SoftBusLog(SOFTBUS_LOG_TRAN, SOFTBUS_LOG_DBG, "Unsupport api");
1373 }
1374 } // namespace SoftBus
1375 } // namespace Communication
1376