• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <mutex>
16 #include <string>
17 #include <thread>
18 
19 #include "communicator_context.h"
20 #include "communication/connect_manager.h"
21 #include "data_level.h"
22 #include "device_manager_adapter.h"
23 #include "dfx_types.h"
24 #include "kvstore_utils.h"
25 #include "log_print.h"
26 #include "reporter.h"
27 #include "securec.h"
28 #include "session.h"
29 #include "softbus_adapter.h"
30 #include "softbus_bus_center.h"
31 #include "softbus_error_code.h"
32 #ifdef LOG_TAG
33 #undef LOG_TAG
34 #endif
35 #define LOG_TAG "SoftBusAdapter"
36 
37 namespace OHOS {
38 namespace AppDistributedKv {
39 using Context = DistributedData::CommunicatorContext;
40 constexpr uint32_t DEFAULT_MTU_SIZE = 4096 * 1024u;
41 constexpr uint32_t DEFAULT_TIMEOUT = 30 * 1000;
42 using namespace std;
43 using namespace OHOS::DistributedDataDfx;
44 using namespace OHOS::DistributedKv;
45 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
46 
47 class AppDataListenerWrap {
48 public:
49     static void SetDataHandler(SoftBusAdapter *handler);
50 
51     static void OnClientShutdown(int32_t socket, ShutdownReason reason);
52     static void OnClientBytesReceived(int32_t socket, const void *data, uint32_t dataLen);
53     static void OnClientSocketChanged(int32_t socket, QoSEvent eventId, const QosTV *qos, uint32_t qosCount);
54 
55     static void OnServerBind(int32_t socket, PeerSocketInfo info);
56     static void OnServerShutdown(int32_t socket, ShutdownReason reason);
57     static void OnServerBytesReceived(int32_t socket, const void *data, uint32_t dataLen);
58 
59 private:
60     // notify all listeners when received message
61     static void NotifyDataListeners(const uint8_t *data, const int size, const std::string &deviceId,
62         const PipeInfo &pipeInfo);
63     static std::string GetPipeId(const std::string &name);
64 
65     static SoftBusAdapter *softBusAdapter_;
66 };
67 
68 SoftBusAdapter *AppDataListenerWrap::softBusAdapter_;
69 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::instance_;
70 
71 namespace {
OnDataLevelChanged(const char * networkId,const DataLevel dataLevel)72 void OnDataLevelChanged(const char* networkId, const DataLevel dataLevel)
73 {
74     if (networkId == nullptr) {
75         return;
76     }
77     LevelInfo level = {
78         .dynamic = dataLevel.dynamicLevel,
79         .statics = dataLevel.staticLevel,
80         .switches = dataLevel.switchLevel,
81         .switchesLen = dataLevel.switchLength,
82     };
83     auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
84     SoftBusAdapter::GetInstance()->OnBroadcast({ uuid }, std::move(level));
85 }
86 
87 IDataLevelCb g_callback = {
88     .onDataLevelChanged = OnDataLevelChanged,
89 };
90 } // namespace
SoftBusAdapter()91 SoftBusAdapter::SoftBusAdapter()
92 {
93     ZLOGI("begin");
94     AppDataListenerWrap::SetDataHandler(this);
95 
96     clientListener_.OnShutdown = AppDataListenerWrap::OnClientShutdown;
97     clientListener_.OnBytes = AppDataListenerWrap::OnClientBytesReceived;
98     clientListener_.OnMessage = AppDataListenerWrap::OnClientBytesReceived;
99     clientListener_.OnQos = AppDataListenerWrap::OnClientSocketChanged;
100 
101     serverListener_.OnBind = AppDataListenerWrap::OnServerBind;
102     serverListener_.OnShutdown = AppDataListenerWrap::OnServerShutdown;
103     serverListener_.OnBytes = AppDataListenerWrap::OnServerBytesReceived;
104     serverListener_.OnMessage = AppDataListenerWrap::OnServerBytesReceived;
105 
106     auto status = DmAdapter::GetInstance().StartWatchDeviceChange(this, { "softBusAdapter" });
107     if (status != Status::SUCCESS) {
108         ZLOGW("register device change failed, status:%d", static_cast<int>(status));
109     }
110 
111     Context::GetInstance().SetSessionListener([this](const std::string &deviceId) {
112         StartCloseSessionTask(deviceId);
113     });
114 
115     ConnectManager::GetInstance()->RegisterCloseSessionTask([this](const std::string &networkId) {
116         return CloseSession(networkId);
117     });
118     ConnectManager::GetInstance()->RegisterSessionCloseListener("context", [](const std::string &networkId) {
119         auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
120         Context::GetInstance().NotifySessionClose(uuid);
121     });
122     ConnectManager::GetInstance()->OnStart();
123 }
124 
~SoftBusAdapter()125 SoftBusAdapter::~SoftBusAdapter()
126 {
127     ZLOGI("begin");
128     if (onBroadcast_) {
129         UnregDataLevelChangeCb(PKG_NAME);
130     }
131     connects_.Clear();
132     ConnectManager::GetInstance()->OnDestory();
133 }
134 
GetInstance()135 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::GetInstance()
136 {
137     static std::once_flag onceFlag;
138     std::call_once(onceFlag, [&] {
139         instance_ = std::make_shared<SoftBusAdapter>();
140     });
141     return instance_;
142 }
143 
StartWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)144 Status SoftBusAdapter::StartWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo)
145 {
146     ZLOGD("begin");
147     if (observer == nullptr) {
148         return Status::INVALID_ARGUMENT;
149     }
150 
151     auto ret = dataChangeListeners_.Insert(pipeInfo.pipeId, observer);
152     if (!ret) {
153         ZLOGW("Add listener error or repeated adding.");
154         return Status::ERROR;
155     }
156 
157     return Status::SUCCESS;
158 }
159 
StopWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)160 Status SoftBusAdapter::StopWatchDataChange(__attribute__((unused)) const AppDataChangeListener *observer,
161     const PipeInfo &pipeInfo)
162 {
163     ZLOGD("begin");
164     if (dataChangeListeners_.Erase(pipeInfo.pipeId) != 0) {
165         return Status::SUCCESS;
166     }
167     ZLOGW("stop data observer error, pipeInfo:%{public}s", pipeInfo.pipeId.c_str());
168     return Status::ERROR;
169 }
170 
GetExpireTime(std::shared_ptr<SoftBusClient> & conn)171 void SoftBusAdapter::GetExpireTime(std::shared_ptr<SoftBusClient> &conn)
172 {
173     Time now = std::chrono::steady_clock::now();
174     auto expireTime = conn->GetExpireTime() > now ? conn->GetExpireTime() : now;
175     lock_guard<decltype(taskMutex_)> lock(taskMutex_);
176     if (taskId_ != ExecutorPool::INVALID_TASK_ID && expireTime < next_) {
177         taskId_ = Context::GetInstance().GetThreadPool()->Reset(taskId_, expireTime - now);
178         next_ = expireTime;
179     }
180 }
181 
SendData(const PipeInfo & pipeInfo,const DeviceId & deviceId,const DataInfo & dataInfo,uint32_t length,const MessageInfo & info)182 std::pair<Status, int32_t> SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
183     const DataInfo &dataInfo, uint32_t length, const MessageInfo &info)
184 {
185     bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId.deviceId);
186     uint32_t qosType = isOHOSType ? SoftBusClient::QOS_HML : SoftBusClient::QOS_BR;
187     std::shared_ptr<SoftBusClient> conn = GetConnect(pipeInfo, deviceId, qosType);
188     if (conn == nullptr) {
189         return std::make_pair(Status::ERROR, 0);
190     }
191     auto status = conn->CheckStatus();
192     if (status == Status::RATE_LIMIT) {
193         return std::make_pair(Status::RATE_LIMIT, 0);
194     }
195     if (status != Status::SUCCESS) {
196         return OpenConnect(conn, deviceId);
197     }
198     status = conn->SendData(dataInfo, &clientListener_);
199     if ((status != Status::NETWORK_ERROR) && (status != Status::RATE_LIMIT)) {
200         GetExpireTime(conn);
201     }
202     auto errCode = conn->GetSoftBusError();
203     return std::make_pair(status, errCode);
204 }
205 
GetConnect(const PipeInfo & pipeInfo,const DeviceId & deviceId,uint32_t qosType)206 std::shared_ptr<SoftBusClient> SoftBusAdapter::GetConnect(const PipeInfo &pipeInfo, const DeviceId &deviceId,
207     uint32_t qosType)
208 {
209     std::shared_ptr<SoftBusClient> conn;
210     std::string networkId = DmAdapter::GetInstance().ToNetworkID(deviceId.deviceId);
211     connects_.Compute(deviceId.deviceId, [&pipeInfo, &deviceId, &conn, qosType, &networkId](const auto &key,
212         std::vector<std::shared_ptr<SoftBusClient>> &connects) -> bool {
213         for (auto &connect : connects) {
214             if (connect == nullptr) {
215                 continue;
216             }
217             if (connect->GetQoSType() == qosType) {
218                 conn = connect;
219                 return true;
220             }
221         }
222         auto connect = std::make_shared<SoftBusClient>(pipeInfo, deviceId, networkId, qosType);
223         connects.emplace_back(connect);
224         conn = connect;
225         return true;
226     });
227     return conn;
228 }
229 
OpenConnect(const std::shared_ptr<SoftBusClient> & conn,const DeviceId & deviceId)230 std::pair<Status, int32_t> SoftBusAdapter::OpenConnect(const std::shared_ptr<SoftBusClient> &conn,
231     const DeviceId &deviceId)
232 {
233     auto task = [this, connect = std::weak_ptr<SoftBusClient>(conn)]() {
234         auto conn = connect.lock();
235         if (conn != nullptr) {
236             conn->OpenConnect(&clientListener_);
237         }
238     };
239     auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId.deviceId).networkId;
240     ConnectManager::GetInstance()->ApplyConnect(networkId, task);
241     return std::make_pair(Status::RATE_LIMIT, 0);
242 }
243 
StartCloseSessionTask(const std::string & deviceId)244 void SoftBusAdapter::StartCloseSessionTask(const std::string &deviceId)
245 {
246     std::shared_ptr<SoftBusClient> conn;
247     bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId);
248     uint32_t qosType = isOHOSType ? SoftBusClient::QOS_HML : SoftBusClient::QOS_BR;
249     auto connects = connects_.Find(deviceId);
250     if (!connects.first) {
251         return;
252     }
253     for (auto &connect : connects.second) {
254         if (connect->GetQoSType() == qosType) {
255             conn = connect;
256             break;
257         }
258     }
259     if (conn == nullptr) {
260         return;
261     }
262     Time now = std::chrono::steady_clock::now();
263     auto expireTime = conn->GetExpireTime() > now ? conn->GetExpireTime() : now;
264     lock_guard<decltype(taskMutex_)> lock(taskMutex_);
265     if (taskId_ == ExecutorPool::INVALID_TASK_ID) {
266         ZLOGI("Start close session, deviceId:%{public}s", KvStoreUtils::ToBeAnonymous(deviceId).c_str());
267         taskId_ = Context::GetInstance().GetThreadPool()->Schedule(expireTime - now, GetCloseSessionTask());
268         next_ = expireTime;
269     }
270 }
271 
GetCloseSessionTask()272 SoftBusAdapter::Task SoftBusAdapter::GetCloseSessionTask()
273 {
274     return [this]() mutable {
275         Time now = std::chrono::steady_clock::now();
276         std::vector<std::shared_ptr<SoftBusClient>> connToClose;
277         connects_.ForEach([&now, &connToClose](const auto &key, auto &connects) -> bool {
278             std::vector<std::shared_ptr<SoftBusClient>> holdConnects;
279             for (auto conn : connects) {
280                 if (conn == nullptr) {
281                     continue;
282                 }
283                 auto expireTime = conn->GetExpireTime();
284                 if (expireTime <= now) {
285                     ZLOGI("[timeout] close session socket:%{public}d", conn->GetSocket());
286                     connToClose.emplace_back(conn);
287                 } else {
288                     holdConnects.emplace_back(conn);
289                 }
290             }
291             connects = std::move(holdConnects);
292             return false;
293         });
294         connects_.EraseIf([](const auto &key, const auto &conn) -> bool {
295             if (conn.empty()) {
296                 ConnectManager::GetInstance()->OnSessionClose(DmAdapter::GetInstance().GetDeviceInfo(key).networkId);
297             }
298             return conn.empty();
299         });
300         Time next = INVALID_NEXT;
301         lock_guard<decltype(taskMutex_)> lg(taskMutex_);
302         connects_.ForEach([&next](const auto &key, auto &connects) -> bool {
303             for (auto conn : connects) {
304                 if (conn == nullptr) {
305                     continue;
306                 }
307                 auto expireTime = conn->GetExpireTime();
308                 if (expireTime < next) {
309                     next = expireTime;
310                 }
311             }
312             return false;
313         });
314         if (next == INVALID_NEXT) {
315             taskId_ = ExecutorPool::INVALID_TASK_ID;
316             return;
317         }
318         taskId_ = Context::GetInstance().GetThreadPool()->Schedule(
319             next > now ? next - now : ExecutorPool::INVALID_DELAY, GetCloseSessionTask());
320         next_ = next;
321     };
322 }
323 
GetMtuSize(const DeviceId & deviceId)324 uint32_t SoftBusAdapter::GetMtuSize(const DeviceId &deviceId)
325 {
326     uint32_t mtuSize = DEFAULT_MTU_SIZE;
327     connects_.ComputeIfPresent(deviceId.deviceId, [&mtuSize](auto, auto &connects) {
328         uint32_t mtu = 0;
329         for (auto conn : connects) {
330             if (conn == nullptr) {
331                 continue;
332             }
333             if (mtu < conn->GetMtuSize()) {
334                 mtu = conn->GetMtuSize();
335             }
336         }
337         if (mtu != 0) {
338             mtuSize = mtu;
339         }
340         return true;
341     });
342     return mtuSize;
343 }
344 
GetTimeout(const DeviceId & deviceId)345 uint32_t SoftBusAdapter::GetTimeout(const DeviceId &deviceId)
346 {
347     return DEFAULT_TIMEOUT;
348 }
349 
DelConnect(int32_t socket,bool isForce)350 std::string SoftBusAdapter::DelConnect(int32_t socket, bool isForce)
351 {
352     std::string name;
353     std::set<std::string> closedConnect;
354     connects_.EraseIf([socket, isForce, &name, &closedConnect](const auto &deviceId, auto &connects) -> bool {
355         if (!isForce && DmAdapter::GetInstance().IsOHOSType(deviceId)) {
356             return false;
357         }
358         std::string networkId;
359         for (auto iter = connects.begin(); iter != connects.end();) {
360             if (*iter != nullptr && **iter == socket) {
361                 name += deviceId;
362                 name += " ";
363                 networkId = (*iter)->GetNetworkId();
364                 iter = connects.erase(iter);
365             } else {
366                 iter++;
367             }
368         }
369         if (connects.empty() && !networkId.empty()) {
370             closedConnect.insert(std::move(networkId));
371             return true;
372         }
373         return false;
374     });
375     for (const auto &networkId : closedConnect) {
376         ConnectManager::GetInstance()->OnSessionClose(networkId);
377     }
378     return name;
379 }
380 
OnClientShutdown(int32_t socket,bool isForce)381 std::string SoftBusAdapter::OnClientShutdown(int32_t socket, bool isForce)
382 {
383     return DelConnect(socket, isForce);
384 }
385 
IsSameStartedOnPeer(const struct PipeInfo & pipeInfo,const struct DeviceId & peer)386 bool SoftBusAdapter::IsSameStartedOnPeer(const struct PipeInfo &pipeInfo,
387     __attribute__((unused)) const struct DeviceId &peer)
388 {
389     ZLOGI("pipeInfo:%{public}s deviceId:%{public}s", pipeInfo.pipeId.c_str(),
390         KvStoreUtils::ToBeAnonymous(peer.deviceId).c_str());
391     return true;
392 }
393 
SetMessageTransFlag(const PipeInfo & pipeInfo,bool flag)394 void SoftBusAdapter::SetMessageTransFlag(const PipeInfo &pipeInfo, bool flag)
395 {
396     ZLOGI("pipeInfo: %s flag: %d", pipeInfo.pipeId.c_str(), flag);
397     flag_ = flag;
398 }
399 
CreateSessionServerAdapter(const std::string & sessionName)400 int SoftBusAdapter::CreateSessionServerAdapter(const std::string &sessionName)
401 {
402     ZLOGD("begin");
403     SocketInfo socketInfo;
404     std::string sessionServerName = sessionName;
405     socketInfo.name = const_cast<char *>(sessionServerName.c_str());
406     std::string pkgName = "ohos.distributeddata";
407     socketInfo.pkgName = pkgName.data();
408     socket_ = Socket(socketInfo);
409     return Listen(socket_, Qos, QOS_COUNT, &serverListener_);
410 }
411 
RemoveSessionServerAdapter(const std::string & sessionName) const412 int SoftBusAdapter::RemoveSessionServerAdapter(const std::string &sessionName) const
413 {
414     ZLOGD("begin");
415     Shutdown(socket_);
416     return 0;
417 }
418 
NotifyDataListeners(const uint8_t * data,int size,const std::string & deviceId,const PipeInfo & pipeInfo)419 void SoftBusAdapter::NotifyDataListeners(const uint8_t *data, int size, const std::string &deviceId,
420     const PipeInfo &pipeInfo)
421 {
422     ZLOGD("begin");
423     auto ret = dataChangeListeners_.ComputeIfPresent(pipeInfo.pipeId,
424         [&data, &size, &deviceId, &pipeInfo](const auto &key, const AppDataChangeListener *&value) {
425             ZLOGD("ready to notify, pipeName:%{public}s, deviceId:%{public}s.", pipeInfo.pipeId.c_str(),
426                 KvStoreUtils::ToBeAnonymous(deviceId).c_str());
427             DeviceInfo deviceInfo = DmAdapter::GetInstance().GetDeviceInfo(deviceId);
428             value->OnMessage(deviceInfo, data, size, pipeInfo);
429             TrafficStat ts{ pipeInfo.pipeId, deviceId, 0, size };
430             Reporter::GetInstance()->TrafficStatistic()->Report(ts);
431             return true;
432         });
433     if (!ret) {
434         ZLOGW("no listener %{public}s.", pipeInfo.pipeId.c_str());
435     }
436 }
437 
Broadcast(const PipeInfo & pipeInfo,const LevelInfo & levelInfo)438 Status SoftBusAdapter::Broadcast(const PipeInfo &pipeInfo, const LevelInfo &levelInfo)
439 {
440     DataLevel level = {
441         .dynamicLevel = levelInfo.dynamic,
442         .staticLevel = levelInfo.statics,
443         .switchLevel = levelInfo.switches,
444         .switchLength = levelInfo.switchesLen,
445     };
446     auto status = SetDataLevel(&level);
447     if (status == SOFTBUS_FUNC_NOT_SUPPORT) {
448         return Status::NOT_SUPPORT_BROADCAST;
449     }
450     return status ? Status::ERROR : Status::SUCCESS;
451 }
452 
OnBroadcast(const DeviceId & device,const LevelInfo & levelInfo)453 void SoftBusAdapter::OnBroadcast(const DeviceId &device, const LevelInfo &levelInfo)
454 {
455     ZLOGI("device:%{public}s", KvStoreUtils::ToBeAnonymous(device.deviceId).c_str());
456     if (!onBroadcast_) {
457         ZLOGW("no listener device:%{public}s", KvStoreUtils::ToBeAnonymous(device.deviceId).c_str());
458         return;
459     }
460     onBroadcast_(device.deviceId, levelInfo);
461 }
462 
ListenBroadcastMsg(const PipeInfo & pipeInfo,std::function<void (const std::string &,const LevelInfo &)> listener)463 int32_t SoftBusAdapter::ListenBroadcastMsg(const PipeInfo &pipeInfo,
464     std::function<void(const std::string &, const LevelInfo &)> listener)
465 {
466     if (onBroadcast_) {
467         return SOFTBUS_ALREADY_EXISTED;
468     }
469     onBroadcast_ = std::move(listener);
470     return RegDataLevelChangeCb(pipeInfo.pipeId.c_str(), &g_callback);
471 }
472 
SetDataHandler(SoftBusAdapter * handler)473 void AppDataListenerWrap::SetDataHandler(SoftBusAdapter *handler)
474 {
475     ZLOGI("begin");
476     softBusAdapter_ = handler;
477 }
478 
OnClientShutdown(int32_t socket,ShutdownReason reason)479 void AppDataListenerWrap::OnClientShutdown(int32_t socket, ShutdownReason reason)
480 {
481     // when the local close the session, this callback function will not be triggered;
482     // when the current function is called, soft bus has released the session resource, only connId is valid;
483     std::string name = softBusAdapter_->OnClientShutdown(socket);
484     ZLOGI("[shutdown] socket:%{public}d, name:%{public}s", socket, KvStoreUtils::ToBeAnonymous(name).c_str());
485 }
486 
OnClientBytesReceived(int32_t socket,const void * data,uint32_t dataLen)487 void AppDataListenerWrap::OnClientBytesReceived(int32_t socket, const void *data, uint32_t dataLen) {}
488 
OnClientSocketChanged(int32_t socket,QoSEvent eventId,const QosTV * qos,uint32_t qosCount)489 void AppDataListenerWrap::OnClientSocketChanged(int32_t socket, QoSEvent eventId, const QosTV *qos, uint32_t qosCount)
490 {
491     if (eventId == QoSEvent::QOS_SATISFIED && qos != nullptr && qos[0].qos == QOS_TYPE_MIN_BW && qosCount == 1) {
492         auto name = softBusAdapter_->OnClientShutdown(socket, false);
493         ZLOGI("[SocketChanged] socket:%{public}d, name:%{public}s", socket, KvStoreUtils::ToBeAnonymous(name).c_str());
494     }
495 }
496 
OnServerBind(int32_t socket,PeerSocketInfo info)497 void AppDataListenerWrap::OnServerBind(int32_t socket, PeerSocketInfo info)
498 {
499     softBusAdapter_->OnBind(socket, info);
500     std::string peerDevUuid = DmAdapter::GetInstance().GetUuidByNetworkId(std::string(info.networkId));
501 
502     ZLOGI("[OnServerBind] socket:%{public}d, peer name:%{public}s, peer devId:%{public}s", socket, info.name,
503         KvStoreUtils::ToBeAnonymous(peerDevUuid).c_str());
504 }
505 
OnServerShutdown(int32_t socket,ShutdownReason reason)506 void AppDataListenerWrap::OnServerShutdown(int32_t socket, ShutdownReason reason)
507 {
508     softBusAdapter_->OnServerShutdown(socket);
509     ZLOGI("Shut down reason:%{public}d socket id:%{public}d", reason, socket);
510 }
511 
OnServerBytesReceived(int32_t socket,const void * data,uint32_t dataLen)512 void AppDataListenerWrap::OnServerBytesReceived(int32_t socket, const void *data, uint32_t dataLen)
513 {
514     SoftBusAdapter::ServerSocketInfo info;
515     if (!softBusAdapter_->GetPeerSocketInfo(socket, info)) {
516         ZLOGE("Get peer socket info failed, socket id %{public}d", socket);
517         return;
518     };
519     std::string peerDevUuid = DmAdapter::GetInstance().GetUuidByNetworkId(std::string(info.networkId));
520     ZLOGD("[OnBytesReceived] socket:%{public}d, peer name:%{public}s, peer devId:%{public}s, data len:%{public}u",
521         socket, info.name.c_str(), KvStoreUtils::ToBeAnonymous(peerDevUuid).c_str(), dataLen);
522 
523     std::string pipeId = GetPipeId(info.name);
524     if (pipeId.empty()) {
525         ZLOGE("pipId is invalid");
526         return;
527     }
528 
529     NotifyDataListeners(reinterpret_cast<const uint8_t *>(data), dataLen, peerDevUuid, { pipeId, "" });
530 }
531 
GetPipeId(const std::string & name)532 std::string AppDataListenerWrap::GetPipeId(const std::string &name)
533 {
534     auto pos = name.find('_');
535     if (pos != std::string::npos) {
536         return name.substr(0, pos);
537     }
538     return name;
539 }
540 
NotifyDataListeners(const uint8_t * data,const int size,const std::string & deviceId,const PipeInfo & pipeInfo)541 void AppDataListenerWrap::NotifyDataListeners(const uint8_t *data, const int size, const std::string &deviceId,
542     const PipeInfo &pipeInfo)
543 {
544     softBusAdapter_->NotifyDataListeners(data, size, deviceId, pipeInfo);
545 }
546 
GetPeerSocketInfo(int32_t socket,ServerSocketInfo & info)547 bool SoftBusAdapter::GetPeerSocketInfo(int32_t socket, ServerSocketInfo &info)
548 {
549     auto it = peerSocketInfos_.Find(socket);
550     if (it.first) {
551         info = it.second;
552         return true;
553     }
554     return false;
555 }
556 
OnBind(int32_t socket,PeerSocketInfo info)557 void SoftBusAdapter::OnBind(int32_t socket, PeerSocketInfo info)
558 {
559     ServerSocketInfo socketInfo;
560     socketInfo.name = info.name;
561     socketInfo.networkId = info.networkId;
562     socketInfo.pkgName = info.pkgName;
563     peerSocketInfos_.Insert(socket, socketInfo);
564 }
565 
OnServerShutdown(int32_t socket)566 void SoftBusAdapter::OnServerShutdown(int32_t socket)
567 {
568     peerSocketInfos_.Erase(socket);
569 }
570 
OnDeviceChanged(const AppDistributedKv::DeviceInfo & info,const AppDistributedKv::DeviceChangeType & type) const571 void SoftBusAdapter::OnDeviceChanged(const AppDistributedKv::DeviceInfo &info,
572     const AppDistributedKv::DeviceChangeType &type) const
573 {
574     return;
575 }
576 
CloseSession(const std::string & networkId)577 bool SoftBusAdapter::CloseSession(const std::string &networkId)
578 {
579     auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
580     auto ret = connects_.Erase(uuid);
581     if (ret != 0) {
582         ConnectManager::GetInstance()->OnSessionClose(networkId);
583     }
584     return ret != 0;
585 }
586 
ReuseConnect(const PipeInfo & pipeInfo,const DeviceId & deviceId)587 Status SoftBusAdapter::ReuseConnect(const PipeInfo &pipeInfo, const DeviceId &deviceId)
588 {
589     bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId.deviceId);
590     if (!isOHOSType) {
591         return Status::NOT_SUPPORT;
592     }
593     uint32_t qosType = SoftBusClient::QOS_HML;
594     std::shared_ptr<SoftBusClient> conn = GetConnect(pipeInfo, deviceId, qosType);
595     if (conn == nullptr) {
596         return Status::ERROR;
597     }
598     auto status = conn->ReuseConnect(&clientListener_);
599     if (status != Status::SUCCESS) {
600         return status;
601     }
602     // Avoid being cleared by scheduled tasks
603     connects_.Compute(deviceId.deviceId, [&conn, qosType](const auto &key,
604         std::vector<std::shared_ptr<SoftBusClient>> &connects) -> bool {
605         for (auto &connect : connects) {
606             if (connect == nullptr) {
607                 continue;
608             }
609             if (connect->GetQoSType() == qosType) {
610                 return true;
611             }
612         }
613         connects.emplace_back(conn);
614         return true;
615     });
616     StartCloseSessionTask(deviceId.deviceId);
617     return Status::SUCCESS;
618 }
619 } // namespace AppDistributedKv
620 } // namespace OHOS