• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "dsoftbus_adapter_impl.h"
17 
18 #ifdef ENABLE_PERFORMANCE_CHECK
19 #include <chrono>
20 #endif // ENABLE_PERFORMANCE_CHECK
21 
22 #include <netinet/in.h>
23 #include <netinet/tcp.h>
24 #include "device_manager.h"
25 #include "dfs_session.h"
26 #include "securec.h"
27 #include "softbus_bus_center.h"
28 #include "softbus_error_code.h"
29 
30 #include "devicestatus_define.h"
31 #include "i_ddm_adapter.h"
32 #include "utility.h"
33 #include "inner_socket.h"
34 
35 #undef LOG_TAG
36 #define LOG_TAG "DSoftbusAdapterImpl"
37 
38 namespace OHOS {
39 namespace Msdp {
40 namespace DeviceStatus {
41 namespace {
42 #define SERVER_SESSION_NAME "ohos.msdp.device_status.intention.serversession"
43 #define D_DEV_MGR DistributedHardware::DeviceManager::GetInstance()
44 const std::string CLIENT_SESSION_NAME { "ohos.msdp.device_status.intention.clientsession." };
45 constexpr size_t BIND_STRING_LENGTH { 15 };
46 constexpr size_t DEVICE_NAME_SIZE_MAX { 256 };
47 constexpr size_t PKG_NAME_SIZE_MAX { 65 };
48 constexpr int32_t MIN_BW { 80 * 1024 * 1024 };
49 constexpr int32_t LATENCY { 3000 };
50 constexpr int32_t SOCKET_SERVER { 0 };
51 constexpr int32_t SOCKET_CLIENT { 1 };
52 constexpr int32_t INVALID_SOCKET { -1 };
53 constexpr int32_t HEART_BEAT_INTERVAL_MS { 64 };
54 constexpr int32_t HEART_BEAT_SIZE_BYTE { 28 }; // Ensure size of heartBeat packet is 64Bytes.
55 const std::string HEART_BEAT_THREAD_NAME { "OS_Cooperate_Heart_Beat" };
56 const char* PARAM_KEY_OS_TYPE = "OS_TYPE";
57 constexpr int32_t OS_TYPE_OH { 10 };
58 constexpr int32_t OPT_TYPE_FLOW_INFO { 10005 };
59 }
60 
61 std::mutex DSoftbusAdapterImpl::mutex_;
62 std::shared_ptr<DSoftbusAdapterImpl> DSoftbusAdapterImpl::instance_;
63 
GetInstance()64 std::shared_ptr<DSoftbusAdapterImpl> DSoftbusAdapterImpl::GetInstance()
65 {
66     if (instance_ == nullptr) {
67         std::lock_guard<std::mutex> lock(mutex_);
68         if (instance_ == nullptr) {
69             instance_ = std::make_shared<DSoftbusAdapterImpl>();
70         }
71     }
72     return instance_;
73 }
74 
DestroyInstance()75 void DSoftbusAdapterImpl::DestroyInstance()
76 {
77     std::lock_guard<std::mutex> lock(mutex_);
78     instance_.reset();
79 }
80 
~DSoftbusAdapterImpl()81 DSoftbusAdapterImpl::~DSoftbusAdapterImpl()
82 {
83     Disable();
84 }
85 
Enable()86 int32_t DSoftbusAdapterImpl::Enable()
87 {
88     CALL_DEBUG_ENTER;
89     InitHeartBeat();
90     std::unique_lock<std::shared_mutex> lock(lock_);
91     return SetupServer();
92 }
93 
Disable()94 void DSoftbusAdapterImpl::Disable()
95 {
96     CALL_DEBUG_ENTER;
97     std::unique_lock<std::shared_mutex> lock(lock_);
98     ShutdownServer();
99 }
100 
AddObserver(std::shared_ptr<IDSoftbusObserver> observer)101 void DSoftbusAdapterImpl::AddObserver(std::shared_ptr<IDSoftbusObserver> observer)
102 {
103     CALL_DEBUG_ENTER;
104     std::unique_lock<std::shared_mutex> lock(lock_);
105     CHKPV(observer);
106     observers_.erase(Observer());
107     observers_.emplace(observer);
108 }
109 
RemoveObserver(std::shared_ptr<IDSoftbusObserver> observer)110 void DSoftbusAdapterImpl::RemoveObserver(std::shared_ptr<IDSoftbusObserver> observer)
111 {
112     CALL_DEBUG_ENTER;
113     std::unique_lock<std::shared_mutex> lock(lock_);
114     if (auto iter = observers_.find(Observer(observer)); iter != observers_.end()) {
115         observers_.erase(iter);
116     }
117     observers_.erase(Observer());
118 }
119 
CheckDeviceOnline(const std::string & networkId)120 int32_t DSoftbusAdapterImpl::CheckDeviceOnline(const std::string &networkId)
121 {
122     CALL_DEBUG_ENTER;
123     std::vector<DistributedHardware::DmDeviceInfo> deviceList;
124     if (D_DEV_MGR.GetTrustedDeviceList(FI_PKG_NAME, "", deviceList) != RET_OK) {
125         FI_HILOGE("GetTrustedDeviceList failed");
126         return RET_ERR;
127     }
128     if (deviceList.empty()) {
129         FI_HILOGE("Trust device list size is invalid");
130         return RET_ERR;
131     }
132     for (const auto &deviceInfo : deviceList) {
133         if (std::string(deviceInfo.networkId) == networkId) {
134             return RET_OK;
135         }
136     }
137     return RET_ERR;
138 }
139 
OpenSession(const std::string & networkId)140 int32_t DSoftbusAdapterImpl::OpenSession(const std::string &networkId)
141 {
142     CALL_DEBUG_ENTER;
143     std::unique_lock<std::shared_mutex> lock(lock_);
144 #ifdef ENABLE_PERFORMANCE_CHECK
145     auto startStamp = std::chrono::steady_clock::now();
146 #endif // ENABLE_PERFORMANCE_CHECK
147     int32_t ret = OpenSessionLocked(networkId);
148 #ifdef ENABLE_PERFORMANCE_CHECK
149     auto openSessionDuration = std::chrono::duration_cast<std::chrono::milliseconds>(
150         std::chrono::steady_clock::now() - startStamp).count();
151     FI_HILOGI("[PERF] OpenSessionLocked ret:%{public}d, elapsed: %{public}lld ms", ret, openSessionDuration);
152 #endif // ENABLE_PERFORMANCE_CHECK
153     return ret;
154 }
155 
CloseSession(const std::string & networkId)156 void DSoftbusAdapterImpl::CloseSession(const std::string &networkId)
157 {
158     CALL_INFO_TRACE;
159     std::unique_lock<std::shared_mutex> lock(lock_);
160     if (auto iter = sessions_.find(networkId); iter != sessions_.end()) {
161         ::Shutdown(iter->second.socket_);
162         sessions_.erase(iter);
163         FI_HILOGI("Shutdown session(%{public}d, %{public}s)", iter->second.socket_,
164             Utility::Anonymize(networkId).c_str());
165     }
166 }
167 
CloseAllSessions()168 void DSoftbusAdapterImpl::CloseAllSessions()
169 {
170     CALL_INFO_TRACE;
171     std::unique_lock<std::shared_mutex> lock(lock_);
172     CloseAllSessionsLocked();
173 }
174 
FindConnection(const std::string & networkId)175 int32_t DSoftbusAdapterImpl::FindConnection(const std::string &networkId)
176 {
177     CALL_DEBUG_ENTER;
178     auto iter = sessions_.find(networkId);
179     return (iter != sessions_.end() ? iter->second.socket_ : -1);
180 }
181 
SendPacket(const std::string & networkId,NetPacket & packet)182 int32_t DSoftbusAdapterImpl::SendPacket(const std::string &networkId, NetPacket &packet)
183 {
184     CALL_DEBUG_ENTER;
185     std::shared_lock<std::shared_mutex> lock(lock_);
186     int32_t socket = FindConnection(networkId);
187     if (socket < 0) {
188         FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(networkId).c_str());
189         return RET_ERR;
190     }
191     StreamBuffer buffer;
192     if (!packet.MakeData(buffer)) {
193         FI_HILOGE("Failed to buffer packet");
194         return RET_ERR;
195     }
196     if (buffer.Size() > MAX_PACKET_BUF_SIZE) {
197         FI_HILOGE("Packet is too large");
198         return RET_ERR;
199     }
200     int32_t ret = ::SendBytes(socket, buffer.Data(), buffer.Size());
201     if (ret != SOFTBUS_OK) {
202         FI_HILOGE("DSOFTBUS::SendBytes fail (%{public}d)", ret);
203         return RET_ERR;
204     }
205     return RET_OK;
206 }
207 
SendParcel(const std::string & networkId,Parcel & parcel)208 int32_t DSoftbusAdapterImpl::SendParcel(const std::string &networkId, Parcel &parcel)
209 {
210     CALL_DEBUG_ENTER;
211     std::shared_lock<std::shared_mutex> lock(lock_);
212     int32_t socket = FindConnection(networkId);
213     if (socket < 0) {
214         FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(networkId).c_str());
215         return RET_ERR;
216     }
217     int32_t ret = ::SendBytes(socket, reinterpret_cast<const void*>(parcel.GetData()), parcel.GetDataSize());
218     if (ret != SOFTBUS_OK) {
219         FI_HILOGE("DSOFTBUS::SendBytes fail, error:%{public}d", ret);
220         return RET_ERR;
221     }
222     return RET_OK;
223 }
224 
BroadcastPacket(NetPacket & packet)225 int32_t DSoftbusAdapterImpl::BroadcastPacket(NetPacket &packet)
226 {
227     CALL_INFO_TRACE;
228     std::shared_lock<std::shared_mutex> lock(lock_);
229     if (sessions_.empty()) {
230         FI_HILOGE("No session connected");
231         return RET_ERR;
232     }
233     StreamBuffer buffer;
234     if (!packet.MakeData(buffer)) {
235         FI_HILOGE("Failed to buffer packet");
236         return RET_ERR;
237     }
238     if (buffer.Size() > MAX_PACKET_BUF_SIZE) {
239         FI_HILOGE("Packet is too large");
240         return RET_ERR;
241     }
242     for (const auto &elem : sessions_) {
243         int32_t socket = elem.second.socket_;
244         if (socket < 0) {
245             FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(elem.first).c_str());
246             continue;
247         }
248         if (int32_t ret = ::SendBytes(socket, buffer.Data(), buffer.Size()); ret != SOFTBUS_OK) {
249             FI_HILOGE("DSOFTBUS::SendBytes fail (%{public}d)", ret);
250             continue;
251         }
252         FI_HILOGI("BroadcastPacket to networkId:%{public}s success", Utility::Anonymize(elem.first).c_str());
253     }
254     return RET_OK;
255 }
256 
HasSessionExisted(const std::string & networkId)257 bool DSoftbusAdapterImpl::HasSessionExisted(const std::string &networkId)
258 {
259     CALL_DEBUG_ENTER;
260     auto iter = sessions_.find(networkId);
261     return (iter != sessions_.end() && iter->second.socket_ != INVALID_SOCKET);
262 }
263 
OnBindLink(int32_t socket,PeerSocketInfo info)264 static void OnBindLink(int32_t socket, PeerSocketInfo info)
265 {
266     DSoftbusAdapterImpl::GetInstance()->OnBind(socket, info);
267 }
268 
OnShutdownLink(int32_t socket,ShutdownReason reason)269 static void OnShutdownLink(int32_t socket, ShutdownReason reason)
270 {
271     DSoftbusAdapterImpl::GetInstance()->OnShutdown(socket, reason);
272 }
273 
OnBytesAvailable(int32_t socket,const void * data,uint32_t dataLen)274 static void OnBytesAvailable(int32_t socket, const void *data, uint32_t dataLen)
275 {
276     DSoftbusAdapterImpl::GetInstance()->OnBytes(socket, data, dataLen);
277 }
278 
OnBind(int32_t socket,PeerSocketInfo info)279 void DSoftbusAdapterImpl::OnBind(int32_t socket, PeerSocketInfo info)
280 {
281     CALL_INFO_TRACE;
282     std::unique_lock<std::shared_mutex> lock(lock_);
283     std::string networkId = info.networkId;
284     FI_HILOGI("Bind session(%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
285     if (!CheckDeviceOsType(networkId)) {
286         FI_HILOGE("Refuse bind");
287         ::Shutdown(socket);
288     }
289     if (auto iter = sessions_.find(networkId); iter != sessions_.cend()) {
290         if (iter->second.socket_ == socket) {
291             FI_HILOGI("(%{public}d, %{public}s) has bound", iter->second.socket_,
292                 Utility::Anonymize(networkId).c_str());
293             return;
294         }
295         FI_HILOGI("(%{public}d, %{public}s) need erase", iter->second.socket_, Utility::Anonymize(networkId).c_str());
296         sessions_.erase(iter);
297     }
298     ConfigTcpAlive(socket);
299     sessions_.emplace(networkId, Session(socket));
300 
301     for (const auto &item : observers_) {
302         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
303         if (observer != nullptr) {
304             FI_HILOGD("Notify binding (%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
305             observer->OnBind(networkId);
306         }
307     }
308 }
309 
OnShutdown(int32_t socket,ShutdownReason reason)310 void DSoftbusAdapterImpl::OnShutdown(int32_t socket, ShutdownReason reason)
311 {
312     CALL_INFO_TRACE;
313     std::unique_lock<std::shared_mutex> lock(lock_);
314     auto iter = std::find_if(sessions_.cbegin(), sessions_.cend(),
315         [socket](const auto &item) {
316             return (item.second.socket_ == socket);
317         });
318     if (iter == sessions_.cend()) {
319         FI_HILOGD("Session(%{public}d) is not bound", socket);
320         return;
321     }
322     std::string networkId = iter->first;
323     sessions_.erase(iter);
324     FI_HILOGI("Shutdown session(%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
325 
326     for (const auto &item : observers_) {
327         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
328         if (observer != nullptr) {
329             FI_HILOGD("Notify shutdown of session(%{public}d, %{public}s)",
330                 socket, Utility::Anonymize(networkId).c_str());
331             observer->OnShutdown(networkId);
332         }
333     }
334 }
335 
OnBytes(int32_t socket,const void * data,uint32_t dataLen)336 void DSoftbusAdapterImpl::OnBytes(int32_t socket, const void *data, uint32_t dataLen)
337 {
338     CALL_DEBUG_ENTER;
339     std::shared_lock<std::shared_mutex> lock(lock_);
340     auto iter = std::find_if(sessions_.begin(), sessions_.end(),
341         [socket](const auto &item) {
342             return (item.second.socket_ == socket);
343         });
344     if (iter == sessions_.end()) {
345         FI_HILOGE("Invalid socket: %{public}d", socket);
346         return;
347     }
348     const std::string networkId = iter->first;
349 
350     if (*reinterpret_cast<const uint32_t*>(data) < static_cast<uint32_t>(MessageId::MAX_MESSAGE_ID)) {
351         CircleStreamBuffer &circleBuffer = iter->second.buffer_;
352 
353         if (!circleBuffer.Write(reinterpret_cast<const char*>(data), dataLen)) {
354             FI_HILOGE("Failed to write buffer");
355         }
356         HandleSessionData(networkId, circleBuffer);
357     } else {
358         HandleRawData(networkId, data, dataLen);
359     }
360 }
361 
InitSocket(SocketInfo info,int32_t socketType,int32_t & socket)362 int32_t DSoftbusAdapterImpl::InitSocket(SocketInfo info, int32_t socketType, int32_t &socket)
363 {
364     CALL_INFO_TRACE;
365     socket = ::Socket(info);
366     if (socket < 0) {
367         FI_HILOGE("DSOFTBUS::Socket failed");
368         return RET_ERR;
369     }
370     QosTV socketQos[] {
371         { .qos = QOS_TYPE_MIN_BW, .value = MIN_BW },
372         { .qos = QOS_TYPE_MAX_LATENCY, .value = LATENCY },
373         { .qos = QOS_TYPE_MIN_LATENCY, .value = LATENCY },
374     };
375     ISocketListener listener {
376         .OnBind = OnBindLink,
377         .OnShutdown = OnShutdownLink,
378         .OnBytes = OnBytesAvailable,
379     };
380     int32_t ret { -1 };
381 
382     if (socketType == SOCKET_SERVER) {
383         ret = ::Listen(socket, socketQos, sizeof(socketQos) / sizeof(socketQos[0]), &listener);
384         if (ret != 0) {
385             FI_HILOGE("DSOFTBUS::Listen failed");
386         }
387     } else if (socketType == SOCKET_CLIENT) {
388         SetSocketOpt(socket);
389         ret = ::Bind(socket, socketQos, sizeof(socketQos) / sizeof(socketQos[0]), &listener);
390         if (ret != 0) {
391             FI_HILOGE("DSOFTBUS::Bind failed");
392         }
393     }
394     if (ret != 0) {
395         ::Shutdown(socket);
396         socket = -1;
397         return ret;
398     }
399     return RET_OK;
400 }
401 
SetSocketOpt(int32_t socket)402 void DSoftbusAdapterImpl::SetSocketOpt(int32_t socket)
403 {
404     CALL_INFO_TRACE;
405     if (socket < 0) {
406         FI_HILOGE("DSOFTBUS::Socket failed");
407         return;
408     }
409     TransFlowInfo transInfo = {
410         .flowSize = 0,
411         .sessionType = SHORT_FOREGROUND_SESSION,
412         .flowQosType = LOW_LATENCY_10MS,
413     };
414     if (int32_t ret = ::SetSocketOpt(socket, OPT_LEVEL_SOFTBUS, static_cast<OptType>(OPT_TYPE_FLOW_INFO),
415         static_cast<void *>(&transInfo), sizeof(TransFlowInfo)); ret != RET_OK) {
416         FI_HILOGE("DSOFTBUS::SetSocketOpt failed, ret:%{public}d", ret);
417     }
418 }
419 
SetupServer()420 int32_t DSoftbusAdapterImpl::SetupServer()
421 {
422     CALL_INFO_TRACE;
423     if (socketFd_ > 0) {
424         return RET_OK;
425     }
426     char name[DEVICE_NAME_SIZE_MAX] { SERVER_SESSION_NAME };
427     char pkgName[PKG_NAME_SIZE_MAX] { FI_PKG_NAME };
428     FI_HILOGI("Server session name: \'%{public}s\'", name);
429     FI_HILOGI("Package name: \'%{public}s\'", pkgName);
430     SocketInfo info {
431         .name = name,
432         .pkgName = pkgName,
433         .dataType = DATA_TYPE_BYTES
434     };
435     int32_t ret = InitSocket(info, SOCKET_SERVER, socketFd_);
436     if (ret != RET_OK) {
437         FI_HILOGE("Failed to setup server");
438         return ret;
439     }
440     return RET_OK;
441 }
442 
ShutdownServer()443 void DSoftbusAdapterImpl::ShutdownServer()
444 {
445     CALL_INFO_TRACE;
446     CloseAllSessionsLocked();
447     if (socketFd_ > 0) {
448         ::Shutdown(socketFd_);
449         socketFd_ = -1;
450     }
451 }
452 
OpenSessionLocked(const std::string & networkId)453 int32_t DSoftbusAdapterImpl::OpenSessionLocked(const std::string &networkId)
454 {
455     CALL_DEBUG_ENTER;
456     if (sessions_.find(networkId) != sessions_.end()) {
457         FI_HILOGD("InputSoftbus session has already opened");
458         return RET_OK;
459     }
460     std::string sessionName = CLIENT_SESSION_NAME + networkId.substr(0, BIND_STRING_LENGTH);
461     char name[DEVICE_NAME_SIZE_MAX] {};
462     if (strcpy_s(name, sizeof(name), sessionName.c_str()) != EOK) {
463         FI_HILOGE("Invalid name:%{public}s", sessionName.c_str());
464         return RET_ERR;
465     }
466     char peerName[DEVICE_NAME_SIZE_MAX] { SERVER_SESSION_NAME };
467     char peerNetworkId[PKG_NAME_SIZE_MAX] {};
468     if (strcpy_s(peerNetworkId, sizeof(peerNetworkId), networkId.c_str()) != EOK) {
469         FI_HILOGE("Invalid peerNetworkId:%{public}s", Utility::Anonymize(networkId).c_str());
470         return RET_ERR;
471     }
472     char pkgName[PKG_NAME_SIZE_MAX] { FI_PKG_NAME };
473     FI_HILOGI("Client session name: \'%{public}s\'", name);
474     FI_HILOGI("Peer name: \'%{public}s\'", peerName);
475     FI_HILOGI("Peer network id: \'%{public}s\'", Utility::Anonymize(peerNetworkId).c_str());
476     FI_HILOGI("Package name: \'%{public}s\'", pkgName);
477     SocketInfo info {
478         .name = name,
479         .peerName = peerName,
480         .peerNetworkId = peerNetworkId,
481         .pkgName = pkgName,
482         .dataType = DATA_TYPE_BYTES
483     };
484     int32_t socket { -1 };
485 
486     int32_t ret = InitSocket(info, SOCKET_CLIENT, socket);
487     if (ret != RET_OK) {
488         FI_HILOGE("Failed to bind %{public}s", Utility::Anonymize(networkId).c_str());
489         return ret;
490     }
491     ConfigTcpAlive(socket);
492     FI_HILOGI("Connected to (%{public}s,%{public}d)", Utility::Anonymize(networkId).c_str(), socket);
493     sessions_.emplace(networkId, Session(socket));
494     OnConnectedLocked(networkId);
495     return RET_OK;
496 }
497 
OnConnectedLocked(const std::string & networkId)498 void DSoftbusAdapterImpl::OnConnectedLocked(const std::string &networkId)
499 {
500     CALL_INFO_TRACE;
501     for (const auto &item : observers_) {
502         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
503         CHKPC(observer);
504         FI_HILOGI("Notify connected to networkId:%{public}s", Utility::Anonymize(networkId).c_str());
505         observer->OnConnected(networkId);
506     }
507 }
508 
CloseAllSessionsLocked()509 void DSoftbusAdapterImpl::CloseAllSessionsLocked()
510 {
511     std::for_each(sessions_.begin(), sessions_.end(), [](const auto &item) {
512         ::Shutdown(item.second.socket_);
513         FI_HILOGI("Shutdown connection with (%{public}s,%{public}d)",
514             Utility::Anonymize(item.first).c_str(), item.second.socket_);
515     });
516     sessions_.clear();
517 }
518 
ConfigTcpAlive(int32_t socket)519 void DSoftbusAdapterImpl::ConfigTcpAlive(int32_t socket)
520 {
521     CALL_DEBUG_ENTER;
522     if (socket < 0) {
523         FI_HILOGW("Config tcp alive, invalid sessionId");
524         return;
525     }
526     int32_t handle { -1 };
527     int32_t result = GetSessionHandle(socket, &handle);
528     if (result != RET_OK) {
529         FI_HILOGE("Failed to get the session handle, socketId:%{public}d, handle:%{public}d", socket, handle);
530         return;
531     }
532     int32_t keepAliveTimeout { 10 };
533     result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPIDLE, &keepAliveTimeout, sizeof(keepAliveTimeout));
534     if (result != RET_OK) {
535         FI_HILOGE("Config tcp alive, setsockopt set idle failed, result:%{public}d", result);
536         return;
537     }
538     int32_t keepAliveCount { 5 };
539     result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPCNT, &keepAliveCount, sizeof(keepAliveCount));
540     if (result != RET_OK) {
541         FI_HILOGE("Config tcp alive, setsockopt set cnt failed");
542         return;
543     }
544     int32_t interval { 1 };
545     result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPINTVL, &interval, sizeof(interval));
546     if (result != RET_OK) {
547         FI_HILOGE("Config tcp alive, setsockopt set intvl failed");
548         return;
549     }
550     int32_t enable { 1 };
551     result = setsockopt(handle, SOL_SOCKET, SO_KEEPALIVE, &enable, sizeof(enable));
552     if (result != RET_OK) {
553         FI_HILOGE("Config tcp alive, setsockopt enable alive failed");
554         return;
555     }
556     int32_t TimeoutMs { 15000 };
557     result = setsockopt(handle, IPPROTO_TCP, TCP_USER_TIMEOUT, &TimeoutMs, sizeof(TimeoutMs));
558     if (result != RET_OK) {
559         FI_HILOGE("Failed to enable setsockopt for timeout, %{public}d", result);
560         return;
561     }
562 }
563 
HandleSessionData(const std::string & networkId,CircleStreamBuffer & circleBuffer)564 void DSoftbusAdapterImpl::HandleSessionData(const std::string &networkId, CircleStreamBuffer &circleBuffer)
565 {
566     CALL_DEBUG_ENTER;
567     while (circleBuffer.ResidualSize() >= static_cast<int32_t>(sizeof(PackHead))) {
568         const char *buf = circleBuffer.ReadBuf();
569         const PackHead *head = reinterpret_cast<const PackHead *>(buf);
570 
571         if ((head->size < 0) || (static_cast<size_t>(head->size) > MAX_PACKET_BUF_SIZE)) {
572             FI_HILOGE("Corrupted net packet");
573             break;
574         }
575         if ((head->size + static_cast<int32_t>(sizeof(PackHead))) > circleBuffer.ResidualSize()) {
576             FI_HILOGI("Incomplete package, package size:%{public}d, residual size:%{public}d",
577                 (head->size + static_cast<int32_t>(sizeof(PackHead))), circleBuffer.ResidualSize());
578             break;
579         }
580         NetPacket packet(head->idMsg);
581 
582         if ((head->size > 0) && !packet.Write(&buf[sizeof(PackHead)], head->size)) {
583             FI_HILOGE("Failed to fill packet, PacketSize:%{public}d", head->size);
584             break;
585         }
586         circleBuffer.SeekReadPos(packet.GetPacketLength());
587         HandlePacket(networkId, packet);
588     }
589 }
590 
HandlePacket(const std::string & networkId,NetPacket & packet)591 void DSoftbusAdapterImpl::HandlePacket(const std::string &networkId, NetPacket &packet)
592 {
593     CALL_DEBUG_ENTER;
594     for (const auto &item : observers_) {
595         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
596         if ((observer != nullptr) &&
597             observer->OnPacket(networkId, packet)) {
598             return;
599         }
600     }
601 }
602 
HandleRawData(const std::string & networkId,const void * data,uint32_t dataLen)603 void DSoftbusAdapterImpl::HandleRawData(const std::string &networkId, const void *data, uint32_t dataLen)
604 {
605     CALL_DEBUG_ENTER;
606     for (const auto &item : observers_) {
607         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
608         if ((observer != nullptr) &&
609             observer->OnRawData(networkId, data, dataLen)) {
610             return;
611         }
612     }
613 }
614 
InitHeartBeat()615 void DSoftbusAdapterImpl::InitHeartBeat()
616 {
617     auto runner = AppExecFwk::EventRunner::Create(HEART_BEAT_THREAD_NAME, AppExecFwk::ThreadMode::FFRT);
618     CHKPV(runner);
619     eventHandler_ = std::make_shared<AppExecFwk::EventHandler>(runner);
620     char heartBeatContent[HEART_BEAT_SIZE_BYTE] { 'a' };
621     heartBeatPacket_.Write(heartBeatContent, HEART_BEAT_SIZE_BYTE);
622 }
623 
StartHeartBeat(const std::string & networkId)624 void DSoftbusAdapterImpl::StartHeartBeat(const std::string &networkId)
625 {
626     CALL_INFO_TRACE;
627     if (GetHeartBeatState(networkId)) {
628         FI_HILOGI("HeartBeat to %{public}s running ready", Utility::Anonymize(networkId).c_str());
629         return;
630     }
631     if (KeepHeartBeating(networkId) != RET_OK) {
632         UpdateHeartBeatState(networkId, false);
633         FI_HILOGE("StartHeartBeat to %{public}s failed", Utility::Anonymize(networkId).c_str());
634         return;
635     }
636     UpdateHeartBeatState(networkId, true);
637     FI_HILOGI("StartHeartBeat to %{public}s successfully", Utility::Anonymize(networkId).c_str());
638 }
639 
StopHeartBeat(const std::string & networkId)640 void DSoftbusAdapterImpl::StopHeartBeat(const std::string &networkId)
641 {
642     UpdateHeartBeatState(networkId, false);
643     FI_HILOGI("StopHeartBeat to %{public}s successfully", Utility::Anonymize(networkId).c_str());
644 }
645 
KeepHeartBeating(const std::string & networkId)646 int32_t DSoftbusAdapterImpl::KeepHeartBeating(const std::string &networkId)
647 {
648     if (SendPacket(networkId, heartBeatPacket_) != RET_OK) {
649         FI_HILOGE("HeartBeat to %{public}s failed, stop it", Utility::Anonymize(networkId).c_str());
650         UpdateHeartBeatState(networkId, false);
651         return RET_ERR;
652     }
653     CHKPR(eventHandler_, RET_ERR);
654     if (!eventHandler_->PostTask(
655         [this, networkId]() {
656             if (GetHeartBeatState(networkId)) {
657                 this->KeepHeartBeating(networkId);
658             } else {
659                 UpdateHeartBeatState(networkId, false);
660                 FI_HILOGE("Switch off, Stop heartBeat to %{public}s", Utility::Anonymize(networkId).c_str());
661             }
662         }, HEART_BEAT_INTERVAL_MS)) {
663         FI_HILOGE("PostTask heartBeat to %{public}s failed", Utility::Anonymize(networkId).c_str());
664         UpdateHeartBeatState(networkId, false);
665         return RET_ERR;
666     }
667     return RET_OK;
668 }
669 
UpdateHeartBeatState(const std::string & networkId,bool state)670 void DSoftbusAdapterImpl::UpdateHeartBeatState(const std::string &networkId, bool state)
671 {
672     std::unique_lock<std::shared_mutex> lock(heartBeatLock_);
673     heartBeatStates_[networkId] = state;
674     FI_HILOGI("Update %{public}s state:%{public}s", Utility::Anonymize(networkId).c_str(), state ? "true" : "false");
675 }
676 
GetHeartBeatState(const std::string & networkId)677 bool DSoftbusAdapterImpl::GetHeartBeatState(const std::string &networkId)
678 {
679     std::shared_lock<std::shared_mutex> lock(heartBeatLock_);
680     if (heartBeatStates_.find(networkId) != heartBeatStates_.end()) {
681         return heartBeatStates_[networkId];
682     }
683     return false;
684 }
685 
CheckDeviceOsType(const std::string & networkId)686 bool DSoftbusAdapterImpl::CheckDeviceOsType(const std::string &networkId)
687 {
688     CALL_INFO_TRACE;
689     DistributedHardware::DmDeviceInfo deviceInfo;
690     int32_t res = D_DEV_MGR.GetDeviceInfo(FI_PKG_NAME, networkId, deviceInfo);
691     if (res != ERR_OK) {
692         FI_HILOGE("Get device failed, res:%{public}d", res);
693         return false;
694     }
695     if (deviceInfo.extraData.empty()) {
696         FI_HILOGE("Deviceinfo extradata is empty");
697         return false;
698     }
699     JsonParser extraData(deviceInfo.extraData.c_str());
700     if (!cJSON_IsObject(extraData.Get())) {
701         FI_HILOGE("extraData is not json object");
702         return false;
703     }
704     cJSON *osType = cJSON_GetObjectItemCaseSensitive(extraData.Get(), PARAM_KEY_OS_TYPE);
705     if (cJSON_IsNumber(osType)) {
706         if (osType->valueint != OS_TYPE_OH) {
707             FI_HILOGE("Ostype:%{public}d", osType->valueint);
708             return false;
709         }
710     } else {
711         FI_HILOGE("get ostype error, extraData:%{public}s", deviceInfo.extraData.c_str());
712         return false;
713     }
714     return true;
715 }
716 } // namespace DeviceStatus
717 } // namespace Msdp
718 } // namespace OHOS
719