• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "dsoftbus_adapter_impl.h"
17 
18 #ifdef ENABLE_PERFORMANCE_CHECK
19 #include <chrono>
20 #endif // ENABLE_PERFORMANCE_CHECK
21 
22 #include <netinet/in.h>
23 #include <netinet/tcp.h>
24 
25 #include "cooperate_hisysevent.h"
26 #include "device_manager.h"
27 #include "dfs_session.h"
28 #include "securec.h"
29 #include "softbus_bus_center.h"
30 #include "softbus_error_code.h"
31 
32 #include "devicestatus_define.h"
33 #include "i_ddm_adapter.h"
34 #include "utility.h"
35 
36 #undef LOG_TAG
37 #define LOG_TAG "DSoftbusAdapterImpl"
38 
39 namespace OHOS {
40 namespace Msdp {
41 namespace DeviceStatus {
42 namespace {
43 #define SERVER_SESSION_NAME "ohos.msdp.device_status.intention.serversession"
44 #define D_DEV_MGR           DistributedHardware::DeviceManager::GetInstance()
45 const std::string CLIENT_SESSION_NAME { "ohos.msdp.device_status.intention.clientsession." };
46 constexpr size_t BIND_STRING_LENGTH { 15 };
47 constexpr size_t DEVICE_NAME_SIZE_MAX { 256 };
48 constexpr size_t PKG_NAME_SIZE_MAX { 65 };
49 constexpr int32_t MIN_BW { 80 * 1024 * 1024 };
50 constexpr int32_t LATENCY { 3000 };
51 constexpr int32_t SOCKET_SERVER { 0 };
52 constexpr int32_t SOCKET_CLIENT { 1 };
53 } // namespace
54 
55 std::mutex DSoftbusAdapterImpl::mutex_;
56 std::shared_ptr<DSoftbusAdapterImpl> DSoftbusAdapterImpl::instance_;
57 
GetInstance()58 std::shared_ptr<DSoftbusAdapterImpl> DSoftbusAdapterImpl::GetInstance()
59 {
60     if (instance_ == nullptr) {
61         std::lock_guard<std::mutex> lock(mutex_);
62         if (instance_ == nullptr) {
63             instance_ = std::make_shared<DSoftbusAdapterImpl>();
64         }
65     }
66     return instance_;
67 }
68 
DestroyInstance()69 void DSoftbusAdapterImpl::DestroyInstance()
70 {
71     std::lock_guard<std::mutex> lock(mutex_);
72     instance_.reset();
73 }
74 
~DSoftbusAdapterImpl()75 DSoftbusAdapterImpl::~DSoftbusAdapterImpl()
76 {
77     Disable();
78 }
79 
Enable()80 int32_t DSoftbusAdapterImpl::Enable()
81 {
82     CALL_DEBUG_ENTER;
83     std::lock_guard guard(lock_);
84     return SetupServer();
85 }
86 
Disable()87 void DSoftbusAdapterImpl::Disable()
88 {
89     CALL_DEBUG_ENTER;
90     std::lock_guard guard(lock_);
91     ShutdownServer();
92 }
93 
AddObserver(std::shared_ptr<IDSoftbusObserver> observer)94 void DSoftbusAdapterImpl::AddObserver(std::shared_ptr<IDSoftbusObserver> observer)
95 {
96     CALL_DEBUG_ENTER;
97     std::lock_guard guard(lock_);
98     CHKPV(observer);
99     observers_.erase(Observer());
100     observers_.emplace(observer);
101 }
102 
RemoveObserver(std::shared_ptr<IDSoftbusObserver> observer)103 void DSoftbusAdapterImpl::RemoveObserver(std::shared_ptr<IDSoftbusObserver> observer)
104 {
105     CALL_DEBUG_ENTER;
106     std::lock_guard guard(lock_);
107     if (auto iter = observers_.find(Observer(observer)); iter != observers_.end()) {
108         observers_.erase(iter);
109     }
110     observers_.erase(Observer());
111 }
112 
CheckDeviceOnline(const std::string & networkId)113 bool DSoftbusAdapterImpl::CheckDeviceOnline(const std::string &networkId)
114 {
115     CALL_DEBUG_ENTER;
116     std::vector<DistributedHardware::DmDeviceInfo> deviceList;
117     if (D_DEV_MGR.GetTrustedDeviceList(FI_PKG_NAME, "", deviceList) != RET_OK) {
118         FI_HILOGE("GetTrustedDeviceList failed");
119         return false;
120     }
121     if (deviceList.empty()) {
122         FI_HILOGE("Trust device list size is invalid");
123         return false;
124     }
125     for (const auto &deviceInfo : deviceList) {
126         if (std::string(deviceInfo.networkId) == networkId) {
127             return true;
128         }
129     }
130     return false;
131 }
132 
OpenSession(const std::string & networkId)133 int32_t DSoftbusAdapterImpl::OpenSession(const std::string &networkId)
134 {
135     CALL_DEBUG_ENTER;
136     std::lock_guard guard(lock_);
137 #ifdef ENABLE_PERFORMANCE_CHECK
138     auto startStamp = std::chrono::steady_clock::now();
139 #endif // ENABLE_PERFORMANCE_CHECK
140     if (!DSoftbusAdapterImpl::CheckDeviceOnline(networkId)) {
141         FI_HILOGE("CheckDeviceOnline failed, networkId:%{public}s", Utility::Anonymize(networkId).c_str());
142         return RET_ERR;
143     }
144     int32_t ret = OpenSessionLocked(networkId);
145 #ifdef ENABLE_PERFORMANCE_CHECK
146     auto openSessionDuration =
147         std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - startStamp).count();
148     FI_HILOGI("[PERF] OpenSessionLocked ret:%{public}d, elapsed:%{public}lld ms", ret, openSessionDuration);
149 #endif // ENABLE_PERFORMANCE_CHECK
150     if (ret != RET_OK) {
151         CooperateDFX::WriteOpenSession(OHOS::HiviewDFX::HiSysEvent::EventType::FAULT);
152     } else {
153         CooperateDFX::WriteOpenSession(OHOS::HiviewDFX::HiSysEvent::EventType::BEHAVIOR);
154     }
155     return ret;
156 }
157 
CloseSession(const std::string & networkId)158 void DSoftbusAdapterImpl::CloseSession(const std::string &networkId)
159 {
160     CALL_INFO_TRACE;
161     std::lock_guard guard(lock_);
162     if (auto iter = sessions_.find(networkId); iter != sessions_.end()) {
163         ::Shutdown(iter->second.socket_);
164         sessions_.erase(iter);
165         FI_HILOGI(
166             "Shutdown session(%{public}d, %{public}s)", iter->second.socket_, Utility::Anonymize(networkId).c_str());
167     }
168 }
169 
CloseAllSessions()170 void DSoftbusAdapterImpl::CloseAllSessions()
171 {
172     CALL_INFO_TRACE;
173     std::lock_guard guard(lock_);
174     CloseAllSessionsLocked();
175 }
176 
FindConnection(const std::string & networkId)177 int32_t DSoftbusAdapterImpl::FindConnection(const std::string &networkId)
178 {
179     CALL_DEBUG_ENTER;
180     auto iter = sessions_.find(networkId);
181     return (iter != sessions_.end() ? iter->second.socket_ : -1);
182 }
183 
SendPacket(const std::string & networkId,NetPacket & packet)184 int32_t DSoftbusAdapterImpl::SendPacket(const std::string &networkId, NetPacket &packet)
185 {
186     CALL_DEBUG_ENTER;
187     std::lock_guard guard(lock_);
188     int32_t socket = FindConnection(networkId);
189     if (socket < 0) {
190         FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(networkId).c_str());
191         return RET_ERR;
192     }
193     StreamBuffer buffer;
194     if (!packet.MakeData(buffer)) {
195         FI_HILOGE("Failed to buffer packet");
196         return RET_ERR;
197     }
198     if (buffer.Size() > MAX_PACKET_BUF_SIZE) {
199         FI_HILOGE("Packet is too large");
200         return RET_ERR;
201     }
202     int32_t ret = ::SendBytes(socket, buffer.Data(), buffer.Size());
203     if (ret != SOFTBUS_OK) {
204         FI_HILOGE("DSOFTBUS::SendBytes fail (%{public}d)", ret);
205         return RET_ERR;
206     }
207     return RET_OK;
208 }
209 
SendParcel(const std::string & networkId,Parcel & parcel)210 int32_t DSoftbusAdapterImpl::SendParcel(const std::string &networkId, Parcel &parcel)
211 {
212     CALL_DEBUG_ENTER;
213     std::lock_guard guard(lock_);
214     int32_t socket = FindConnection(networkId);
215     if (socket < 0) {
216         FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(networkId).c_str());
217         return RET_ERR;
218     }
219     int32_t ret = ::SendBytes(socket, reinterpret_cast<const void *>(parcel.GetData()), parcel.GetDataSize());
220     if (ret != SOFTBUS_OK) {
221         FI_HILOGE("DSOFTBUS::SendBytes fail, error:%{public}d", ret);
222         return RET_ERR;
223     }
224     return RET_OK;
225 }
226 
BroadcastPacket(NetPacket & packet)227 int32_t DSoftbusAdapterImpl::BroadcastPacket(NetPacket &packet)
228 {
229     CALL_INFO_TRACE;
230     std::lock_guard guard(lock_);
231     if (sessions_.empty()) {
232         FI_HILOGE("No session connected");
233         return RET_ERR;
234     }
235     StreamBuffer buffer;
236     if (!packet.MakeData(buffer)) {
237         FI_HILOGE("Failed to buffer packet");
238         return RET_ERR;
239     }
240     if (buffer.Size() > MAX_PACKET_BUF_SIZE) {
241         FI_HILOGE("Packet is too large");
242         return RET_ERR;
243     }
244     for (const auto &elem : sessions_) {
245         int32_t socket = elem.second.socket_;
246         if (socket < 0) {
247             FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(elem.first).c_str());
248             continue;
249         }
250         if (int32_t ret = ::SendBytes(socket, buffer.Data(), buffer.Size()); ret != SOFTBUS_OK) {
251             FI_HILOGE("DSOFTBUS::SendBytes fail (%{public}d)", ret);
252             continue;
253         }
254         FI_HILOGI("BroadcastPacket to networkId:%{public}s success", Utility::Anonymize(elem.first).c_str());
255     }
256     return RET_OK;
257 }
258 
OnBindLink(int32_t socket,PeerSocketInfo info)259 static void OnBindLink(int32_t socket, PeerSocketInfo info)
260 {
261     DSoftbusAdapterImpl::GetInstance()->OnBind(socket, info);
262 }
263 
OnShutdownLink(int32_t socket,ShutdownReason reason)264 static void OnShutdownLink(int32_t socket, ShutdownReason reason)
265 {
266     DSoftbusAdapterImpl::GetInstance()->OnShutdown(socket, reason);
267 }
268 
OnBytesAvailable(int32_t socket,const void * data,uint32_t dataLen)269 static void OnBytesAvailable(int32_t socket, const void *data, uint32_t dataLen)
270 {
271     DSoftbusAdapterImpl::GetInstance()->OnBytes(socket, data, dataLen);
272 }
273 
OnBind(int32_t socket,PeerSocketInfo info)274 void DSoftbusAdapterImpl::OnBind(int32_t socket, PeerSocketInfo info)
275 {
276     CALL_INFO_TRACE;
277     std::lock_guard guard(lock_);
278     std::string networkId = info.networkId;
279     FI_HILOGI("Bind session(%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
280     if (auto iter = sessions_.find(networkId); iter != sessions_.cend()) {
281         if (iter->second.socket_ == socket) {
282             FI_HILOGI(
283                 "(%{public}d, %{public}s) has bound", iter->second.socket_, Utility::Anonymize(networkId).c_str());
284             return;
285         }
286         FI_HILOGI("(%{public}d, %{public}s) need erase", iter->second.socket_, Utility::Anonymize(networkId).c_str());
287         sessions_.erase(iter);
288     }
289     ConfigTcpAlive(socket);
290     sessions_.emplace(networkId, Session(socket));
291 
292     for (const auto &item : observers_) {
293         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
294         if (observer != nullptr) {
295             FI_HILOGD("Notify binding (%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
296             observer->OnBind(networkId);
297         }
298     }
299 }
300 
OnShutdown(int32_t socket,ShutdownReason reason)301 void DSoftbusAdapterImpl::OnShutdown(int32_t socket, ShutdownReason reason)
302 {
303     CALL_INFO_TRACE;
304     std::lock_guard guard(lock_);
305     auto iter = std::find_if(sessions_.cbegin(), sessions_.cend(), [socket](const auto &item) {
306         return (item.second.socket_ == socket);
307     });
308     if (iter == sessions_.cend()) {
309         FI_HILOGD("Session(%{public}d) is not bound", socket);
310         return;
311     }
312     std::string networkId = iter->first;
313     sessions_.erase(iter);
314     FI_HILOGI("Shutdown session(%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
315 
316     for (const auto &item : observers_) {
317         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
318         if (observer != nullptr) {
319             FI_HILOGD(
320                 "Notify shutdown of session(%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
321             observer->OnShutdown(networkId);
322         }
323     }
324 }
325 
OnBytes(int32_t socket,const void * data,uint32_t dataLen)326 void DSoftbusAdapterImpl::OnBytes(int32_t socket, const void *data, uint32_t dataLen)
327 {
328     CALL_DEBUG_ENTER;
329     std::lock_guard guard(lock_);
330     auto iter = std::find_if(sessions_.begin(), sessions_.end(), [socket](const auto &item) {
331         return (item.second.socket_ == socket);
332     });
333     if (iter == sessions_.end()) {
334         FI_HILOGE("Invalid socket:%{public}d", socket);
335         return;
336     }
337     const std::string networkId = iter->first;
338 
339     if (*reinterpret_cast<const uint32_t *>(data) < static_cast<uint32_t>(MessageId::MAX_MESSAGE_ID)) {
340         CircleStreamBuffer &circleBuffer = iter->second.buffer_;
341 
342         if (!circleBuffer.Write(reinterpret_cast<const char *>(data), dataLen)) {
343             FI_HILOGE("Failed to write buffer");
344         }
345         HandleSessionData(networkId, circleBuffer);
346     } else {
347         HandleRawData(networkId, data, dataLen);
348     }
349 }
350 
InitSocket(SocketInfo info,int32_t socketType,int32_t & socket)351 int32_t DSoftbusAdapterImpl::InitSocket(SocketInfo info, int32_t socketType, int32_t &socket)
352 {
353     CALL_INFO_TRACE;
354     socket = ::Socket(info);
355     if (socket < 0) {
356         FI_HILOGE("DSOFTBUS::Socket failed");
357         return RET_ERR;
358     }
359     QosTV socketQos[] {
360         { .qos = QOS_TYPE_MIN_BW,      .value = MIN_BW },
361         { .qos = QOS_TYPE_MAX_LATENCY, .value = LATENCY },
362         { .qos = QOS_TYPE_MIN_LATENCY, .value = LATENCY },
363     };
364     ISocketListener listener {
365         .OnBind = OnBindLink,
366         .OnShutdown = OnShutdownLink,
367         .OnBytes = OnBytesAvailable,
368     };
369     int32_t ret { -1 };
370 
371     if (socketType == SOCKET_SERVER) {
372         ret = ::Listen(socket, socketQos, sizeof(socketQos) / sizeof(socketQos[0]), &listener);
373         if (ret != 0) {
374             FI_HILOGE("DSOFTBUS::Listen failed");
375         }
376     } else if (socketType == SOCKET_CLIENT) {
377         ret = ::Bind(socket, socketQos, sizeof(socketQos) / sizeof(socketQos[0]), &listener);
378         if (ret != 0) {
379             FI_HILOGE("DSOFTBUS::Bind failed");
380         }
381     }
382     if (ret != 0) {
383         ::Shutdown(socket);
384         socket = -1;
385         return ret;
386     }
387     return RET_OK;
388 }
389 
SetupServer()390 int32_t DSoftbusAdapterImpl::SetupServer()
391 {
392     CALL_INFO_TRACE;
393     if (socketFd_ > 0) {
394         return RET_OK;
395     }
396     char name[DEVICE_NAME_SIZE_MAX] { SERVER_SESSION_NAME };
397     char pkgName[PKG_NAME_SIZE_MAX] { FI_PKG_NAME };
398     FI_HILOGI("Server session name: \'%{public}s\'", name);
399     FI_HILOGI("Package name: \'%{public}s\'", pkgName);
400     SocketInfo info { .name = name, .pkgName = pkgName, .dataType = DATA_TYPE_BYTES };
401     int32_t ret = InitSocket(info, SOCKET_SERVER, socketFd_);
402     if (ret != RET_OK) {
403         FI_HILOGE("Failed to setup server");
404         return ret;
405     }
406     return RET_OK;
407 }
408 
ShutdownServer()409 void DSoftbusAdapterImpl::ShutdownServer()
410 {
411     CALL_INFO_TRACE;
412     CloseAllSessionsLocked();
413     if (socketFd_ > 0) {
414         ::Shutdown(socketFd_);
415         socketFd_ = -1;
416     }
417 }
418 
OpenSessionLocked(const std::string & networkId)419 int32_t DSoftbusAdapterImpl::OpenSessionLocked(const std::string &networkId)
420 {
421     CALL_INFO_TRACE;
422     if (sessions_.find(networkId) != sessions_.end()) {
423         FI_HILOGD("InputSoftbus session has already opened");
424         return RET_OK;
425     }
426     std::string sessionName = CLIENT_SESSION_NAME + networkId.substr(0, BIND_STRING_LENGTH);
427     char name[DEVICE_NAME_SIZE_MAX] {};
428     if (strcpy_s(name, sizeof(name), sessionName.c_str()) != EOK) {
429         FI_HILOGE("Invalid name:%{public}s", sessionName.c_str());
430         return RET_ERR;
431     }
432     char peerName[DEVICE_NAME_SIZE_MAX] { SERVER_SESSION_NAME };
433     char peerNetworkId[PKG_NAME_SIZE_MAX] {};
434     if (strcpy_s(peerNetworkId, sizeof(peerNetworkId), networkId.c_str()) != EOK) {
435         FI_HILOGE("Invalid peerNetworkId:%{public}s", Utility::Anonymize(networkId).c_str());
436         return RET_ERR;
437     }
438     char pkgName[PKG_NAME_SIZE_MAX] { FI_PKG_NAME };
439     FI_HILOGI("Client session name: \'%{public}s\'", name);
440     FI_HILOGI("Peer name: \'%{public}s\'", peerName);
441     FI_HILOGI("Peer network id: \'%{public}s\'", Utility::Anonymize(peerNetworkId).c_str());
442     FI_HILOGI("Package name: \'%{public}s\'", pkgName);
443     SocketInfo info { .name = name,
444         .peerName = peerName,
445         .peerNetworkId = peerNetworkId,
446         .pkgName = pkgName,
447         .dataType = DATA_TYPE_BYTES };
448     int32_t socket { -1 };
449 
450     int32_t ret = InitSocket(info, SOCKET_CLIENT, socket);
451     if (ret != RET_OK) {
452         FI_HILOGE("Failed to bind %{public}s", Utility::Anonymize(networkId).c_str());
453         return ret;
454     }
455     ConfigTcpAlive(socket);
456 
457     sessions_.emplace(networkId, Session(socket));
458     OnConnectedLocked(networkId);
459     return RET_OK;
460 }
461 
OnConnectedLocked(const std::string & networkId)462 void DSoftbusAdapterImpl::OnConnectedLocked(const std::string &networkId)
463 {
464     CALL_INFO_TRACE;
465     for (const auto &item : observers_) {
466         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
467         CHKPC(observer);
468         FI_HILOGI("Notify connected to networkId:%{public}s", Utility::Anonymize(networkId).c_str());
469         observer->OnConnected(networkId);
470     }
471 }
472 
CloseAllSessionsLocked()473 void DSoftbusAdapterImpl::CloseAllSessionsLocked()
474 {
475     std::for_each(sessions_.begin(), sessions_.end(), [](const auto &item) {
476         ::Shutdown(item.second.socket_);
477         FI_HILOGI("Shutdown connection with \'%{public}s\'", Utility::Anonymize(item.first).c_str());
478     });
479     sessions_.clear();
480 }
481 
ConfigTcpAlive(int32_t socket)482 void DSoftbusAdapterImpl::ConfigTcpAlive(int32_t socket)
483 {
484     CALL_DEBUG_ENTER;
485     if (socket < 0) {
486         FI_HILOGW("Config tcp alive, invalid sessionId");
487         return;
488     }
489     int32_t handle { -1 };
490     int32_t result = GetSessionHandle(socket, &handle);
491     if (result != RET_OK) {
492         FI_HILOGE("Failed to get the session handle, socketId:%{public}d, handle:%{public}d", socket, handle);
493         return;
494     }
495     int32_t keepAliveTimeout { 10 };
496     result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPIDLE, &keepAliveTimeout, sizeof(keepAliveTimeout));
497     if (result != RET_OK) {
498         FI_HILOGE("Config tcp alive, setsockopt set idle failed, result:%{public}d", result);
499         return;
500     }
501     int32_t keepAliveCount { 5 };
502     result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPCNT, &keepAliveCount, sizeof(keepAliveCount));
503     if (result != RET_OK) {
504         FI_HILOGE("Config tcp alive, setsockopt set cnt failed");
505         return;
506     }
507     int32_t interval { 1 };
508     result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPINTVL, &interval, sizeof(interval));
509     if (result != RET_OK) {
510         FI_HILOGE("Config tcp alive, setsockopt set intvl failed");
511         return;
512     }
513     int32_t enable { 1 };
514     result = setsockopt(handle, SOL_SOCKET, SO_KEEPALIVE, &enable, sizeof(enable));
515     if (result != RET_OK) {
516         FI_HILOGE("Config tcp alive, setsockopt enable alive failed");
517         return;
518     }
519     int32_t TimeoutMs { 15000 };
520     result = setsockopt(handle, IPPROTO_TCP, TCP_USER_TIMEOUT, &TimeoutMs, sizeof(TimeoutMs));
521     if (result != RET_OK) {
522         FI_HILOGE("Failed to enable setsockopt for timeout, %{public}d", result);
523         return;
524     }
525 }
526 
HandleSessionData(const std::string & networkId,CircleStreamBuffer & circleBuffer)527 void DSoftbusAdapterImpl::HandleSessionData(const std::string &networkId, CircleStreamBuffer &circleBuffer)
528 {
529     CALL_DEBUG_ENTER;
530     while (circleBuffer.ResidualSize() >= static_cast<int32_t>(sizeof(PackHead))) {
531         const char *buf = circleBuffer.ReadBuf();
532         const PackHead *head = reinterpret_cast<const PackHead *>(buf);
533 
534         if ((head->size < 0) || (static_cast<size_t>(head->size) > MAX_PACKET_BUF_SIZE)) {
535             FI_HILOGE("Corrupted net packet");
536             break;
537         }
538         if ((head->size + static_cast<int32_t>(sizeof(PackHead))) > circleBuffer.ResidualSize()) {
539             FI_HILOGI("Incomplete package, package size:%{public}d, residual size:%{public}d",
540                 (head->size + static_cast<int32_t>(sizeof(PackHead))), circleBuffer.ResidualSize());
541             break;
542         }
543         NetPacket packet(head->idMsg);
544 
545         if ((head->size > 0) && !packet.Write(&buf[sizeof(PackHead)], head->size)) {
546             FI_HILOGE("Failed to fill packet, PacketSize:%{public}d", head->size);
547             break;
548         }
549         circleBuffer.SeekReadPos(packet.GetPacketLength());
550         HandlePacket(networkId, packet);
551     }
552 }
553 
HandlePacket(const std::string & networkId,NetPacket & packet)554 void DSoftbusAdapterImpl::HandlePacket(const std::string &networkId, NetPacket &packet)
555 {
556     CALL_DEBUG_ENTER;
557     for (const auto &item : observers_) {
558         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
559         if ((observer != nullptr) && observer->OnPacket(networkId, packet)) {
560             return;
561         }
562     }
563 }
564 
HandleRawData(const std::string & networkId,const void * data,uint32_t dataLen)565 void DSoftbusAdapterImpl::HandleRawData(const std::string &networkId, const void *data, uint32_t dataLen)
566 {
567     CALL_DEBUG_ENTER;
568     for (const auto &item : observers_) {
569         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
570         if ((observer != nullptr) && observer->OnRawData(networkId, data, dataLen)) {
571             return;
572         }
573     }
574 }
575 } // namespace DeviceStatus
576 } // namespace Msdp
577 } // namespace OHOS
578