1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "dsoftbus_adapter_impl.h"
17
18 #ifdef ENABLE_PERFORMANCE_CHECK
19 #include <chrono>
20 #endif // ENABLE_PERFORMANCE_CHECK
21
22 #include <netinet/in.h>
23 #include <netinet/tcp.h>
24 #include "device_manager.h"
25 #include "dfs_session.h"
26 #include "securec.h"
27 #include "softbus_bus_center.h"
28 #include "softbus_error_code.h"
29
30 #include "devicestatus_define.h"
31 #include "i_ddm_adapter.h"
32 #include "utility.h"
33
34 #undef LOG_TAG
35 #define LOG_TAG "DSoftbusAdapterImpl"
36
37 namespace OHOS {
38 namespace Msdp {
39 namespace DeviceStatus {
40 namespace {
41 #define SERVER_SESSION_NAME "ohos.msdp.device_status.intention.serversession"
42 #define D_DEV_MGR DistributedHardware::DeviceManager::GetInstance()
43 const std::string CLIENT_SESSION_NAME { "ohos.msdp.device_status.intention.clientsession." };
44 constexpr size_t BIND_STRING_LENGTH { 15 };
45 constexpr size_t DEVICE_NAME_SIZE_MAX { 256 };
46 constexpr size_t PKG_NAME_SIZE_MAX { 65 };
47 constexpr int32_t MIN_BW { 80 * 1024 * 1024 };
48 constexpr int32_t LATENCY { 3000 };
49 constexpr int32_t SOCKET_SERVER { 0 };
50 constexpr int32_t SOCKET_CLIENT { 1 };
51 constexpr int32_t INVALID_SOCKET { -1 };
52 constexpr int32_t HEART_BEAT_INTERVAL_MS { 64 };
53 constexpr int32_t HEART_BEAT_SIZE_BYTE { 28 }; // Ensure size of heartBeat packet is 64Bytes.
54 const std::string HEART_BEAT_THREAD_NAME { "OS_Cooperate_Heart_Beat" };
55 }
56
57 std::mutex DSoftbusAdapterImpl::mutex_;
58 std::shared_ptr<DSoftbusAdapterImpl> DSoftbusAdapterImpl::instance_;
59
GetInstance()60 std::shared_ptr<DSoftbusAdapterImpl> DSoftbusAdapterImpl::GetInstance()
61 {
62 if (instance_ == nullptr) {
63 std::lock_guard<std::mutex> lock(mutex_);
64 if (instance_ == nullptr) {
65 instance_ = std::make_shared<DSoftbusAdapterImpl>();
66 }
67 }
68 return instance_;
69 }
70
DestroyInstance()71 void DSoftbusAdapterImpl::DestroyInstance()
72 {
73 std::lock_guard<std::mutex> lock(mutex_);
74 instance_.reset();
75 }
76
~DSoftbusAdapterImpl()77 DSoftbusAdapterImpl::~DSoftbusAdapterImpl()
78 {
79 Disable();
80 }
81
Enable()82 int32_t DSoftbusAdapterImpl::Enable()
83 {
84 CALL_DEBUG_ENTER;
85 InitHeartBeat();
86 std::unique_lock<std::shared_mutex> lock(lock_);
87 return SetupServer();
88 }
89
Disable()90 void DSoftbusAdapterImpl::Disable()
91 {
92 CALL_DEBUG_ENTER;
93 std::unique_lock<std::shared_mutex> lock(lock_);
94 ShutdownServer();
95 }
96
AddObserver(std::shared_ptr<IDSoftbusObserver> observer)97 void DSoftbusAdapterImpl::AddObserver(std::shared_ptr<IDSoftbusObserver> observer)
98 {
99 CALL_DEBUG_ENTER;
100 std::unique_lock<std::shared_mutex> lock(lock_);
101 CHKPV(observer);
102 observers_.erase(Observer());
103 observers_.emplace(observer);
104 }
105
RemoveObserver(std::shared_ptr<IDSoftbusObserver> observer)106 void DSoftbusAdapterImpl::RemoveObserver(std::shared_ptr<IDSoftbusObserver> observer)
107 {
108 CALL_DEBUG_ENTER;
109 std::unique_lock<std::shared_mutex> lock(lock_);
110 if (auto iter = observers_.find(Observer(observer)); iter != observers_.end()) {
111 observers_.erase(iter);
112 }
113 observers_.erase(Observer());
114 }
115
CheckDeviceOnline(const std::string & networkId)116 bool DSoftbusAdapterImpl::CheckDeviceOnline(const std::string &networkId)
117 {
118 CALL_DEBUG_ENTER;
119 std::vector<DistributedHardware::DmDeviceInfo> deviceList;
120 if (D_DEV_MGR.GetTrustedDeviceList(FI_PKG_NAME, "", deviceList) != RET_OK) {
121 FI_HILOGE("GetTrustedDeviceList failed");
122 return false;
123 }
124 if (deviceList.empty()) {
125 FI_HILOGE("Trust device list size is invalid");
126 return false;
127 }
128 for (const auto &deviceInfo : deviceList) {
129 if (std::string(deviceInfo.networkId) == networkId) {
130 return true;
131 }
132 }
133 return false;
134 }
135
OpenSession(const std::string & networkId)136 int32_t DSoftbusAdapterImpl::OpenSession(const std::string &networkId)
137 {
138 CALL_DEBUG_ENTER;
139 std::unique_lock<std::shared_mutex> lock(lock_);
140 #ifdef ENABLE_PERFORMANCE_CHECK
141 auto startStamp = std::chrono::steady_clock::now();
142 #endif // ENABLE_PERFORMANCE_CHECK
143 if (!DSoftbusAdapterImpl::CheckDeviceOnline(networkId)) {
144 FI_HILOGE("CheckDeviceOnline failed, networkId:%{public}s", Utility::Anonymize(networkId).c_str());
145 return RET_ERR;
146 }
147 int32_t ret = OpenSessionLocked(networkId);
148 #ifdef ENABLE_PERFORMANCE_CHECK
149 auto openSessionDuration = std::chrono::duration_cast<std::chrono::milliseconds>(
150 std::chrono::steady_clock::now() - startStamp).count();
151 FI_HILOGI("[PERF] OpenSessionLocked ret:%{public}d, elapsed: %{public}lld ms", ret, openSessionDuration);
152 #endif // ENABLE_PERFORMANCE_CHECK
153 return ret;
154 }
155
CloseSession(const std::string & networkId)156 void DSoftbusAdapterImpl::CloseSession(const std::string &networkId)
157 {
158 CALL_INFO_TRACE;
159 std::unique_lock<std::shared_mutex> lock(lock_);
160 if (auto iter = sessions_.find(networkId); iter != sessions_.end()) {
161 ::Shutdown(iter->second.socket_);
162 sessions_.erase(iter);
163 FI_HILOGI("Shutdown session(%{public}d, %{public}s)", iter->second.socket_,
164 Utility::Anonymize(networkId).c_str());
165 }
166 }
167
CloseAllSessions()168 void DSoftbusAdapterImpl::CloseAllSessions()
169 {
170 CALL_INFO_TRACE;
171 std::unique_lock<std::shared_mutex> lock(lock_);
172 CloseAllSessionsLocked();
173 }
174
FindConnection(const std::string & networkId)175 int32_t DSoftbusAdapterImpl::FindConnection(const std::string &networkId)
176 {
177 CALL_DEBUG_ENTER;
178 auto iter = sessions_.find(networkId);
179 return (iter != sessions_.end() ? iter->second.socket_ : -1);
180 }
181
SendPacket(const std::string & networkId,NetPacket & packet)182 int32_t DSoftbusAdapterImpl::SendPacket(const std::string &networkId, NetPacket &packet)
183 {
184 CALL_DEBUG_ENTER;
185 std::shared_lock<std::shared_mutex> lock(lock_);
186 int32_t socket = FindConnection(networkId);
187 if (socket < 0) {
188 FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(networkId).c_str());
189 return RET_ERR;
190 }
191 StreamBuffer buffer;
192 if (!packet.MakeData(buffer)) {
193 FI_HILOGE("Failed to buffer packet");
194 return RET_ERR;
195 }
196 if (buffer.Size() > MAX_PACKET_BUF_SIZE) {
197 FI_HILOGE("Packet is too large");
198 return RET_ERR;
199 }
200 int32_t ret = ::SendBytes(socket, buffer.Data(), buffer.Size());
201 if (ret != SOFTBUS_OK) {
202 FI_HILOGE("DSOFTBUS::SendBytes fail (%{public}d)", ret);
203 return RET_ERR;
204 }
205 return RET_OK;
206 }
207
SendParcel(const std::string & networkId,Parcel & parcel)208 int32_t DSoftbusAdapterImpl::SendParcel(const std::string &networkId, Parcel &parcel)
209 {
210 CALL_DEBUG_ENTER;
211 std::shared_lock<std::shared_mutex> lock(lock_);
212 int32_t socket = FindConnection(networkId);
213 if (socket < 0) {
214 FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(networkId).c_str());
215 return RET_ERR;
216 }
217 int32_t ret = ::SendBytes(socket, reinterpret_cast<const void*>(parcel.GetData()), parcel.GetDataSize());
218 if (ret != SOFTBUS_OK) {
219 FI_HILOGE("DSOFTBUS::SendBytes fail, error:%{public}d", ret);
220 return RET_ERR;
221 }
222 return RET_OK;
223 }
224
BroadcastPacket(NetPacket & packet)225 int32_t DSoftbusAdapterImpl::BroadcastPacket(NetPacket &packet)
226 {
227 CALL_INFO_TRACE;
228 std::shared_lock<std::shared_mutex> lock(lock_);
229 if (sessions_.empty()) {
230 FI_HILOGE("No session connected");
231 return RET_ERR;
232 }
233 StreamBuffer buffer;
234 if (!packet.MakeData(buffer)) {
235 FI_HILOGE("Failed to buffer packet");
236 return RET_ERR;
237 }
238 if (buffer.Size() > MAX_PACKET_BUF_SIZE) {
239 FI_HILOGE("Packet is too large");
240 return RET_ERR;
241 }
242 for (const auto &elem : sessions_) {
243 int32_t socket = elem.second.socket_;
244 if (socket < 0) {
245 FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(elem.first).c_str());
246 continue;
247 }
248 if (int32_t ret = ::SendBytes(socket, buffer.Data(), buffer.Size()); ret != SOFTBUS_OK) {
249 FI_HILOGE("DSOFTBUS::SendBytes fail (%{public}d)", ret);
250 continue;
251 }
252 FI_HILOGI("BroadcastPacket to networkId:%{public}s success", Utility::Anonymize(elem.first).c_str());
253 }
254 return RET_OK;
255 }
256
HasSessionExisted(const std::string & networkId)257 bool DSoftbusAdapterImpl::HasSessionExisted(const std::string &networkId)
258 {
259 CALL_DEBUG_ENTER;
260 auto iter = sessions_.find(networkId);
261 return (iter != sessions_.end() && iter->second.socket_ != INVALID_SOCKET);
262 }
263
OnBindLink(int32_t socket,PeerSocketInfo info)264 static void OnBindLink(int32_t socket, PeerSocketInfo info)
265 {
266 DSoftbusAdapterImpl::GetInstance()->OnBind(socket, info);
267 }
268
OnShutdownLink(int32_t socket,ShutdownReason reason)269 static void OnShutdownLink(int32_t socket, ShutdownReason reason)
270 {
271 DSoftbusAdapterImpl::GetInstance()->OnShutdown(socket, reason);
272 }
273
OnBytesAvailable(int32_t socket,const void * data,uint32_t dataLen)274 static void OnBytesAvailable(int32_t socket, const void *data, uint32_t dataLen)
275 {
276 DSoftbusAdapterImpl::GetInstance()->OnBytes(socket, data, dataLen);
277 }
278
OnBind(int32_t socket,PeerSocketInfo info)279 void DSoftbusAdapterImpl::OnBind(int32_t socket, PeerSocketInfo info)
280 {
281 CALL_INFO_TRACE;
282 std::unique_lock<std::shared_mutex> lock(lock_);
283 std::string networkId = info.networkId;
284 FI_HILOGI("Bind session(%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
285 if (auto iter = sessions_.find(networkId); iter != sessions_.cend()) {
286 if (iter->second.socket_ == socket) {
287 FI_HILOGI("(%{public}d, %{public}s) has bound", iter->second.socket_,
288 Utility::Anonymize(networkId).c_str());
289 return;
290 }
291 FI_HILOGI("(%{public}d, %{public}s) need erase", iter->second.socket_, Utility::Anonymize(networkId).c_str());
292 sessions_.erase(iter);
293 }
294 ConfigTcpAlive(socket);
295 sessions_.emplace(networkId, Session(socket));
296
297 for (const auto &item : observers_) {
298 std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
299 if (observer != nullptr) {
300 FI_HILOGD("Notify binding (%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
301 observer->OnBind(networkId);
302 }
303 }
304 }
305
OnShutdown(int32_t socket,ShutdownReason reason)306 void DSoftbusAdapterImpl::OnShutdown(int32_t socket, ShutdownReason reason)
307 {
308 CALL_INFO_TRACE;
309 std::unique_lock<std::shared_mutex> lock(lock_);
310 auto iter = std::find_if(sessions_.cbegin(), sessions_.cend(),
311 [socket](const auto &item) {
312 return (item.second.socket_ == socket);
313 });
314 if (iter == sessions_.cend()) {
315 FI_HILOGD("Session(%{public}d) is not bound", socket);
316 return;
317 }
318 std::string networkId = iter->first;
319 sessions_.erase(iter);
320 FI_HILOGI("Shutdown session(%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
321
322 for (const auto &item : observers_) {
323 std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
324 if (observer != nullptr) {
325 FI_HILOGD("Notify shutdown of session(%{public}d, %{public}s)",
326 socket, Utility::Anonymize(networkId).c_str());
327 observer->OnShutdown(networkId);
328 }
329 }
330 }
331
OnBytes(int32_t socket,const void * data,uint32_t dataLen)332 void DSoftbusAdapterImpl::OnBytes(int32_t socket, const void *data, uint32_t dataLen)
333 {
334 CALL_DEBUG_ENTER;
335 std::shared_lock<std::shared_mutex> lock(lock_);
336 auto iter = std::find_if(sessions_.begin(), sessions_.end(),
337 [socket](const auto &item) {
338 return (item.second.socket_ == socket);
339 });
340 if (iter == sessions_.end()) {
341 FI_HILOGE("Invalid socket: %{public}d", socket);
342 return;
343 }
344 const std::string networkId = iter->first;
345
346 if (*reinterpret_cast<const uint32_t*>(data) < static_cast<uint32_t>(MessageId::MAX_MESSAGE_ID)) {
347 CircleStreamBuffer &circleBuffer = iter->second.buffer_;
348
349 if (!circleBuffer.Write(reinterpret_cast<const char*>(data), dataLen)) {
350 FI_HILOGE("Failed to write buffer");
351 }
352 HandleSessionData(networkId, circleBuffer);
353 } else {
354 HandleRawData(networkId, data, dataLen);
355 }
356 }
357
InitSocket(SocketInfo info,int32_t socketType,int32_t & socket)358 int32_t DSoftbusAdapterImpl::InitSocket(SocketInfo info, int32_t socketType, int32_t &socket)
359 {
360 CALL_INFO_TRACE;
361 socket = ::Socket(info);
362 if (socket < 0) {
363 FI_HILOGE("DSOFTBUS::Socket failed");
364 return RET_ERR;
365 }
366 QosTV socketQos[] {
367 { .qos = QOS_TYPE_MIN_BW, .value = MIN_BW },
368 { .qos = QOS_TYPE_MAX_LATENCY, .value = LATENCY },
369 { .qos = QOS_TYPE_MIN_LATENCY, .value = LATENCY },
370 };
371 ISocketListener listener {
372 .OnBind = OnBindLink,
373 .OnShutdown = OnShutdownLink,
374 .OnBytes = OnBytesAvailable,
375 };
376 int32_t ret { -1 };
377
378 if (socketType == SOCKET_SERVER) {
379 ret = ::Listen(socket, socketQos, sizeof(socketQos) / sizeof(socketQos[0]), &listener);
380 if (ret != 0) {
381 FI_HILOGE("DSOFTBUS::Listen failed");
382 }
383 } else if (socketType == SOCKET_CLIENT) {
384 ret = ::Bind(socket, socketQos, sizeof(socketQos) / sizeof(socketQos[0]), &listener);
385 if (ret != 0) {
386 FI_HILOGE("DSOFTBUS::Bind failed");
387 }
388 }
389 if (ret != 0) {
390 ::Shutdown(socket);
391 socket = -1;
392 return ret;
393 }
394 return RET_OK;
395 }
396
SetupServer()397 int32_t DSoftbusAdapterImpl::SetupServer()
398 {
399 CALL_INFO_TRACE;
400 if (socketFd_ > 0) {
401 return RET_OK;
402 }
403 char name[DEVICE_NAME_SIZE_MAX] { SERVER_SESSION_NAME };
404 char pkgName[PKG_NAME_SIZE_MAX] { FI_PKG_NAME };
405 FI_HILOGI("Server session name: \'%{public}s\'", name);
406 FI_HILOGI("Package name: \'%{public}s\'", pkgName);
407 SocketInfo info {
408 .name = name,
409 .pkgName = pkgName,
410 .dataType = DATA_TYPE_BYTES
411 };
412 int32_t ret = InitSocket(info, SOCKET_SERVER, socketFd_);
413 if (ret != RET_OK) {
414 FI_HILOGE("Failed to setup server");
415 return ret;
416 }
417 return RET_OK;
418 }
419
ShutdownServer()420 void DSoftbusAdapterImpl::ShutdownServer()
421 {
422 CALL_INFO_TRACE;
423 CloseAllSessionsLocked();
424 if (socketFd_ > 0) {
425 ::Shutdown(socketFd_);
426 socketFd_ = -1;
427 }
428 }
429
OpenSessionLocked(const std::string & networkId)430 int32_t DSoftbusAdapterImpl::OpenSessionLocked(const std::string &networkId)
431 {
432 CALL_DEBUG_ENTER;
433 if (sessions_.find(networkId) != sessions_.end()) {
434 FI_HILOGD("InputSoftbus session has already opened");
435 return RET_OK;
436 }
437 std::string sessionName = CLIENT_SESSION_NAME + networkId.substr(0, BIND_STRING_LENGTH);
438 char name[DEVICE_NAME_SIZE_MAX] {};
439 if (strcpy_s(name, sizeof(name), sessionName.c_str()) != EOK) {
440 FI_HILOGE("Invalid name:%{public}s", sessionName.c_str());
441 return RET_ERR;
442 }
443 char peerName[DEVICE_NAME_SIZE_MAX] { SERVER_SESSION_NAME };
444 char peerNetworkId[PKG_NAME_SIZE_MAX] {};
445 if (strcpy_s(peerNetworkId, sizeof(peerNetworkId), networkId.c_str()) != EOK) {
446 FI_HILOGE("Invalid peerNetworkId:%{public}s", Utility::Anonymize(networkId).c_str());
447 return RET_ERR;
448 }
449 char pkgName[PKG_NAME_SIZE_MAX] { FI_PKG_NAME };
450 FI_HILOGI("Client session name: \'%{public}s\'", name);
451 FI_HILOGI("Peer name: \'%{public}s\'", peerName);
452 FI_HILOGI("Peer network id: \'%{public}s\'", Utility::Anonymize(peerNetworkId).c_str());
453 FI_HILOGI("Package name: \'%{public}s\'", pkgName);
454 SocketInfo info {
455 .name = name,
456 .peerName = peerName,
457 .peerNetworkId = peerNetworkId,
458 .pkgName = pkgName,
459 .dataType = DATA_TYPE_BYTES
460 };
461 int32_t socket { -1 };
462
463 int32_t ret = InitSocket(info, SOCKET_CLIENT, socket);
464 if (ret != RET_OK) {
465 FI_HILOGE("Failed to bind %{public}s", Utility::Anonymize(networkId).c_str());
466 return ret;
467 }
468 ConfigTcpAlive(socket);
469 FI_HILOGI("Connected to (%{public}s,%{public}d)", Utility::Anonymize(networkId).c_str(), socket);
470 sessions_.emplace(networkId, Session(socket));
471 OnConnectedLocked(networkId);
472 return RET_OK;
473 }
474
OnConnectedLocked(const std::string & networkId)475 void DSoftbusAdapterImpl::OnConnectedLocked(const std::string &networkId)
476 {
477 CALL_INFO_TRACE;
478 for (const auto &item : observers_) {
479 std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
480 CHKPC(observer);
481 FI_HILOGI("Notify connected to networkId:%{public}s", Utility::Anonymize(networkId).c_str());
482 observer->OnConnected(networkId);
483 }
484 }
485
CloseAllSessionsLocked()486 void DSoftbusAdapterImpl::CloseAllSessionsLocked()
487 {
488 std::for_each(sessions_.begin(), sessions_.end(), [](const auto &item) {
489 ::Shutdown(item.second.socket_);
490 FI_HILOGI("Shutdown connection with (%{public}s,%{public}d)",
491 Utility::Anonymize(item.first).c_str(), item.second.socket_);
492 });
493 sessions_.clear();
494 }
495
ConfigTcpAlive(int32_t socket)496 void DSoftbusAdapterImpl::ConfigTcpAlive(int32_t socket)
497 {
498 CALL_DEBUG_ENTER;
499 if (socket < 0) {
500 FI_HILOGW("Config tcp alive, invalid sessionId");
501 return;
502 }
503 int32_t handle { -1 };
504 int32_t result = GetSessionHandle(socket, &handle);
505 if (result != RET_OK) {
506 FI_HILOGE("Failed to get the session handle, socketId:%{public}d, handle:%{public}d", socket, handle);
507 return;
508 }
509 int32_t keepAliveTimeout { 10 };
510 result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPIDLE, &keepAliveTimeout, sizeof(keepAliveTimeout));
511 if (result != RET_OK) {
512 FI_HILOGE("Config tcp alive, setsockopt set idle failed, result:%{public}d", result);
513 return;
514 }
515 int32_t keepAliveCount { 5 };
516 result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPCNT, &keepAliveCount, sizeof(keepAliveCount));
517 if (result != RET_OK) {
518 FI_HILOGE("Config tcp alive, setsockopt set cnt failed");
519 return;
520 }
521 int32_t interval { 1 };
522 result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPINTVL, &interval, sizeof(interval));
523 if (result != RET_OK) {
524 FI_HILOGE("Config tcp alive, setsockopt set intvl failed");
525 return;
526 }
527 int32_t enable { 1 };
528 result = setsockopt(handle, SOL_SOCKET, SO_KEEPALIVE, &enable, sizeof(enable));
529 if (result != RET_OK) {
530 FI_HILOGE("Config tcp alive, setsockopt enable alive failed");
531 return;
532 }
533 int32_t TimeoutMs { 15000 };
534 result = setsockopt(handle, IPPROTO_TCP, TCP_USER_TIMEOUT, &TimeoutMs, sizeof(TimeoutMs));
535 if (result != RET_OK) {
536 FI_HILOGE("Failed to enable setsockopt for timeout, %{public}d", result);
537 return;
538 }
539 }
540
HandleSessionData(const std::string & networkId,CircleStreamBuffer & circleBuffer)541 void DSoftbusAdapterImpl::HandleSessionData(const std::string &networkId, CircleStreamBuffer &circleBuffer)
542 {
543 CALL_DEBUG_ENTER;
544 while (circleBuffer.ResidualSize() >= static_cast<int32_t>(sizeof(PackHead))) {
545 const char *buf = circleBuffer.ReadBuf();
546 const PackHead *head = reinterpret_cast<const PackHead *>(buf);
547
548 if ((head->size < 0) || (static_cast<size_t>(head->size) > MAX_PACKET_BUF_SIZE)) {
549 FI_HILOGE("Corrupted net packet");
550 break;
551 }
552 if ((head->size + static_cast<int32_t>(sizeof(PackHead))) > circleBuffer.ResidualSize()) {
553 FI_HILOGI("Incomplete package, package size:%{public}d, residual size:%{public}d",
554 (head->size + static_cast<int32_t>(sizeof(PackHead))), circleBuffer.ResidualSize());
555 break;
556 }
557 NetPacket packet(head->idMsg);
558
559 if ((head->size > 0) && !packet.Write(&buf[sizeof(PackHead)], head->size)) {
560 FI_HILOGE("Failed to fill packet, PacketSize:%{public}d", head->size);
561 break;
562 }
563 circleBuffer.SeekReadPos(packet.GetPacketLength());
564 HandlePacket(networkId, packet);
565 }
566 }
567
HandlePacket(const std::string & networkId,NetPacket & packet)568 void DSoftbusAdapterImpl::HandlePacket(const std::string &networkId, NetPacket &packet)
569 {
570 CALL_DEBUG_ENTER;
571 for (const auto &item : observers_) {
572 std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
573 if ((observer != nullptr) &&
574 observer->OnPacket(networkId, packet)) {
575 return;
576 }
577 }
578 }
579
HandleRawData(const std::string & networkId,const void * data,uint32_t dataLen)580 void DSoftbusAdapterImpl::HandleRawData(const std::string &networkId, const void *data, uint32_t dataLen)
581 {
582 CALL_DEBUG_ENTER;
583 for (const auto &item : observers_) {
584 std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
585 if ((observer != nullptr) &&
586 observer->OnRawData(networkId, data, dataLen)) {
587 return;
588 }
589 }
590 }
591
InitHeartBeat()592 void DSoftbusAdapterImpl::InitHeartBeat()
593 {
594 auto runner = AppExecFwk::EventRunner::Create(HEART_BEAT_THREAD_NAME, AppExecFwk::ThreadMode::FFRT);
595 CHKPV(runner);
596 eventHandler_ = std::make_shared<AppExecFwk::EventHandler>(runner);
597 char heartBeatContent[HEART_BEAT_SIZE_BYTE] { 'a' };
598 heartBeatPacket_.Write(heartBeatContent, HEART_BEAT_SIZE_BYTE);
599 }
600
StartHeartBeat(const std::string & networkId)601 void DSoftbusAdapterImpl::StartHeartBeat(const std::string &networkId)
602 {
603 CALL_INFO_TRACE;
604 if (GetHeartBeatState(networkId)) {
605 FI_HILOGI("HeartBeat to %{public}s running ready", Utility::Anonymize(networkId).c_str());
606 return;
607 }
608 if (KeepHeartBeating(networkId) != RET_OK) {
609 UpdateHeartBeatState(networkId, false);
610 FI_HILOGE("StartHeartBeat to %{public}s failed", Utility::Anonymize(networkId).c_str());
611 return;
612 }
613 UpdateHeartBeatState(networkId, true);
614 FI_HILOGI("StartHeartBeat to %{public}s successfully", Utility::Anonymize(networkId).c_str());
615 }
616
StopHeartBeat(const std::string & networkId)617 void DSoftbusAdapterImpl::StopHeartBeat(const std::string &networkId)
618 {
619 UpdateHeartBeatState(networkId, false);
620 FI_HILOGI("StopHeartBeat to %{public}s successfully", Utility::Anonymize(networkId).c_str());
621 }
622
KeepHeartBeating(const std::string & networkId)623 int32_t DSoftbusAdapterImpl::KeepHeartBeating(const std::string &networkId)
624 {
625 if (SendPacket(networkId, heartBeatPacket_) != RET_OK) {
626 FI_HILOGE("HeartBeat to %{public}s failed, stop it", Utility::Anonymize(networkId).c_str());
627 UpdateHeartBeatState(networkId, false);
628 return RET_ERR;
629 }
630 CHKPR(eventHandler_, RET_ERR);
631 if (!eventHandler_->PostTask(
632 [this, networkId]() {
633 if (GetHeartBeatState(networkId)) {
634 this->KeepHeartBeating(networkId);
635 } else {
636 UpdateHeartBeatState(networkId, false);
637 FI_HILOGE("Switch off, Stop heartBeat to %{public}s", Utility::Anonymize(networkId).c_str());
638 }
639 }, HEART_BEAT_INTERVAL_MS)) {
640 FI_HILOGE("PostTask heartBeat to %{public}s failed", Utility::Anonymize(networkId).c_str());
641 UpdateHeartBeatState(networkId, false);
642 return RET_ERR;
643 }
644 return RET_OK;
645 }
646
UpdateHeartBeatState(const std::string & networkId,bool state)647 void DSoftbusAdapterImpl::UpdateHeartBeatState(const std::string &networkId, bool state)
648 {
649 std::unique_lock<std::shared_mutex> lock(heartBeatLock_);
650 heartBeatStates_[networkId] = state;
651 FI_HILOGI("Update %{public}s state:%{public}s", Utility::Anonymize(networkId).c_str(), state ? "true" : "false");
652 }
653
GetHeartBeatState(const std::string & networkId)654 bool DSoftbusAdapterImpl::GetHeartBeatState(const std::string &networkId)
655 {
656 std::shared_lock<std::shared_mutex> lock(heartBeatLock_);
657 if (heartBeatStates_.find(networkId) != heartBeatStates_.end()) {
658 return heartBeatStates_[networkId];
659 }
660 return false;
661 }
662 } // namespace DeviceStatus
663 } // namespace Msdp
664 } // namespace OHOS
665