• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <mutex>
16 #include <string>
17 #include <thread>
18 
19 #include "communicator_context.h"
20 #include "communication/connect_manager.h"
21 #include "data_level.h"
22 #include "device_manager_adapter.h"
23 #include "dfx_types.h"
24 #include "kvstore_utils.h"
25 #include "log_print.h"
26 #include "reporter.h"
27 #include "securec.h"
28 #include "session.h"
29 #include "softbus_adapter.h"
30 #include "softbus_bus_center.h"
31 #include "softbus_error_code.h"
32 #ifdef LOG_TAG
33 #undef LOG_TAG
34 #endif
35 #define LOG_TAG "SoftBusAdapter"
36 
37 namespace OHOS {
38 namespace AppDistributedKv {
39 using Context = DistributedData::CommunicatorContext;
40 constexpr uint32_t DEFAULT_MTU_SIZE = 4096 * 1024u;
41 constexpr uint32_t DEFAULT_TIMEOUT = 30 * 1000;
42 using namespace std;
43 using namespace OHOS::DistributedDataDfx;
44 using namespace OHOS::DistributedKv;
45 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
46 
47 class AppDataListenerWrap {
48 public:
49     static void SetDataHandler(SoftBusAdapter *handler);
50 
51     static void OnClientShutdown(int32_t socket, ShutdownReason reason);
52     static void OnClientBytesReceived(int32_t socket, const void *data, uint32_t dataLen);
53 
54     static void OnServerBind(int32_t socket, PeerSocketInfo info);
55     static void OnServerShutdown(int32_t socket, ShutdownReason reason);
56     static void OnServerBytesReceived(int32_t socket, const void *data, uint32_t dataLen);
57 
58 public:
59     // notify all listeners when received message
60     static void NotifyDataListeners(const uint8_t *data, const int size, const std::string &deviceId,
61         const PipeInfo &pipeInfo);
62     static std::string GetPipeId(const std::string &name);
63 
64 private:
65     static SoftBusAdapter *softBusAdapter_;
66 };
67 SoftBusAdapter *AppDataListenerWrap::softBusAdapter_;
68 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::instance_;
69 
70 namespace {
OnDataLevelChanged(const char * networkId,const DataLevel dataLevel)71 void OnDataLevelChanged(const char* networkId, const DataLevel dataLevel)
72 {
73     if (networkId == nullptr) {
74         return;
75     }
76     LevelInfo level = {
77         .dynamic = dataLevel.dynamicLevel,
78         .statics = dataLevel.staticLevel,
79         .switches = dataLevel.switchLevel,
80         .switchesLen = dataLevel.switchLength,
81     };
82     auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
83     SoftBusAdapter::GetInstance()->OnBroadcast({ uuid }, std::move(level));
84 }
85 
86 IDataLevelCb g_callback = {
87     .onDataLevelChanged = OnDataLevelChanged,
88 };
89 } // namespace
SoftBusAdapter()90 SoftBusAdapter::SoftBusAdapter()
91 {
92     ZLOGI("begin");
93     AppDataListenerWrap::SetDataHandler(this);
94 
95     clientListener_.OnShutdown = AppDataListenerWrap::OnClientShutdown;
96     clientListener_.OnBytes = AppDataListenerWrap::OnClientBytesReceived;
97     clientListener_.OnMessage = AppDataListenerWrap::OnClientBytesReceived;
98 
99     serverListener_.OnBind = AppDataListenerWrap::OnServerBind;
100     serverListener_.OnShutdown = AppDataListenerWrap::OnServerShutdown;
101     serverListener_.OnBytes = AppDataListenerWrap::OnServerBytesReceived;
102     serverListener_.OnMessage = AppDataListenerWrap::OnServerBytesReceived;
103 
104     auto status = DmAdapter::GetInstance().StartWatchDeviceChange(this, { "softBusAdapter" });
105     if (status != Status::SUCCESS) {
106         ZLOGW("register device change failed, status:%d", static_cast<int>(status));
107     }
108 
109     Context::GetInstance().SetSessionListener([this](const std::string &deviceId) {
110         StartCloseSessionTask(deviceId);
111     });
112 
113     ConnectManager::GetInstance()->RegisterCloseSessionTask([this](const std::string &networkId) {
114         return CloseSession(networkId);
115     });
116     ConnectManager::GetInstance()->RegisterSessionCloseListener("context", [](const std::string &networkId) {
117         auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
118         Context::GetInstance().NotifySessionClose(uuid);
119     });
120     ConnectManager::GetInstance()->OnStart();
121 }
122 
~SoftBusAdapter()123 SoftBusAdapter::~SoftBusAdapter()
124 {
125     ZLOGI("begin");
126     if (onBroadcast_) {
127         UnregDataLevelChangeCb(PKG_NAME);
128     }
129     connects_.Clear();
130     ConnectManager::GetInstance()->OnDestory();
131 }
132 
GetInstance()133 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::GetInstance()
134 {
135     static std::once_flag onceFlag;
136     std::call_once(onceFlag, [&] {
137         instance_ = std::make_shared<SoftBusAdapter>();
138     });
139     return instance_;
140 }
141 
StartWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)142 Status SoftBusAdapter::StartWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo)
143 {
144     ZLOGD("begin");
145     if (observer == nullptr) {
146         return Status::INVALID_ARGUMENT;
147     }
148 
149     auto ret = dataChangeListeners_.Insert(pipeInfo.pipeId, observer);
150     if (!ret) {
151         ZLOGW("Add listener error or repeated adding.");
152         return Status::ERROR;
153     }
154 
155     return Status::SUCCESS;
156 }
157 
StopWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)158 Status SoftBusAdapter::StopWatchDataChange(__attribute__((unused)) const AppDataChangeListener *observer,
159     const PipeInfo &pipeInfo)
160 {
161     ZLOGD("begin");
162     if (dataChangeListeners_.Erase(pipeInfo.pipeId) != 0) {
163         return Status::SUCCESS;
164     }
165     ZLOGW("stop data observer error, pipeInfo:%{public}s", pipeInfo.pipeId.c_str());
166     return Status::ERROR;
167 }
168 
Reuse(const PipeInfo & pipeInfo,const DeviceId & deviceId,uint32_t qosType,std::shared_ptr<SoftBusClient> & conn)169 void SoftBusAdapter::Reuse(const PipeInfo &pipeInfo, const DeviceId &deviceId,
170     uint32_t qosType, std::shared_ptr<SoftBusClient> &conn)
171 {
172     std::vector<std::shared_ptr<SoftBusClient>> connects;
173     auto connect = std::make_shared<SoftBusClient>(pipeInfo, deviceId, qosType);
174     connect->isReuse = true;
175     connects.emplace_back(connect);
176     conn = connect;
177     connects_.Insert(deviceId.deviceId, connects);
178     ZLOGI("reuse connect:%{public}s", KvStoreUtils::ToBeAnonymous(deviceId.deviceId).c_str());
179 }
180 
GetExpireTime(std::shared_ptr<SoftBusClient> & conn)181 void SoftBusAdapter::GetExpireTime(std::shared_ptr<SoftBusClient> &conn)
182 {
183     Time now = std::chrono::steady_clock::now();
184     auto expireTime = conn->GetExpireTime() > now ? conn->GetExpireTime() : now;
185     lock_guard<decltype(taskMutex_)> lock(taskMutex_);
186     if (taskId_ != ExecutorPool::INVALID_TASK_ID && expireTime < next_) {
187         taskId_ = Context::GetInstance().GetThreadPool()->Reset(taskId_, expireTime - now);
188         next_ = expireTime;
189     }
190 }
191 
SendData(const PipeInfo & pipeInfo,const DeviceId & deviceId,const DataInfo & dataInfo,uint32_t length,const MessageInfo & info)192 Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
193     uint32_t length, const MessageInfo &info)
194 {
195     std::shared_ptr<SoftBusClient> conn;
196     bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId.deviceId);
197     uint32_t qosType = isOHOSType ? SoftBusClient::QOS_HML : SoftBusClient::QOS_BR;
198     bool isReuse = false;
199     connects_.Compute(deviceId.deviceId, [&pipeInfo, &deviceId, &conn, qosType, isOHOSType, &isReuse](const auto &key,
200         std::vector<std::shared_ptr<SoftBusClient>> &connects) -> bool {
201         for (auto &connect : connects) {
202             if (connect->GetQoSType() != qosType) {
203                 continue;
204             }
205             if (!isOHOSType && connect->needRemove) {
206                 isReuse = true;
207                 return false;
208             }
209             conn = connect;
210             return true;
211         }
212         auto connect = std::make_shared<SoftBusClient>(pipeInfo, deviceId, qosType);
213         connects.emplace_back(connect);
214         conn = connect;
215         return true;
216     });
217     if (!isOHOSType && isReuse) {
218         Reuse(pipeInfo, deviceId, qosType, conn);
219     }
220     if (conn == nullptr) {
221         return Status::ERROR;
222     }
223     auto status = conn->CheckStatus();
224     if (status == Status::RATE_LIMIT) {
225         return Status::RATE_LIMIT;
226     }
227     if (status != Status::SUCCESS) {
228         auto task = [this, connect = std::weak_ptr<SoftBusClient>(conn)]() {
229             auto conn = connect.lock();
230             if (conn != nullptr) {
231                 conn->OpenConnect(&clientListener_);
232             }
233         };
234         auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId.deviceId).networkId;
235         ConnectManager::GetInstance()->ApplyConnect(networkId, task);
236         return Status::RATE_LIMIT;
237     }
238 
239     status = conn->SendData(dataInfo, &clientListener_);
240     if ((status != Status::NETWORK_ERROR) && (status != Status::RATE_LIMIT)) {
241         GetExpireTime(conn);
242     }
243     return status;
244 }
245 
StartCloseSessionTask(const std::string & deviceId)246 void SoftBusAdapter::StartCloseSessionTask(const std::string &deviceId)
247 {
248     std::shared_ptr<SoftBusClient> conn;
249     bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId);
250     uint32_t qosType = isOHOSType ? SoftBusClient::QOS_HML : SoftBusClient::QOS_BR;
251     auto connects = connects_.Find(deviceId);
252     if (!connects.first) {
253         return;
254     }
255     for (auto &connect : connects.second) {
256         if (connect->GetQoSType() == qosType) {
257             conn = connect;
258             break;
259         }
260     }
261     if (conn == nullptr) {
262         return;
263     }
264     Time now = std::chrono::steady_clock::now();
265     auto expireTime = conn->GetExpireTime() > now ? conn->GetExpireTime() : now;
266     lock_guard<decltype(taskMutex_)> lock(taskMutex_);
267     if (taskId_ == ExecutorPool::INVALID_TASK_ID) {
268         ZLOGI("Start close session, deviceId:%{public}s", KvStoreUtils::ToBeAnonymous(deviceId).c_str());
269         taskId_ = Context::GetInstance().GetThreadPool()->Schedule(expireTime - now, GetCloseSessionTask());
270         next_ = expireTime;
271     }
272 }
273 
GetCloseSessionTask()274 SoftBusAdapter::Task SoftBusAdapter::GetCloseSessionTask()
275 {
276     return [this]() mutable {
277         Time now = std::chrono::steady_clock::now();
278         std::vector<std::shared_ptr<SoftBusClient>> connToClose;
279         connects_.ForEach([&now, &connToClose](const auto &key, auto &connects) -> bool {
280             std::vector<std::shared_ptr<SoftBusClient>> holdConnects;
281             for (auto conn : connects) {
282                 if (conn == nullptr) {
283                     continue;
284                 }
285                 auto expireTime = conn->GetExpireTime();
286                 if (expireTime <= now) {
287                     ZLOGI("[timeout] close session socket:%{public}d", conn->GetSocket());
288                     connToClose.emplace_back(conn);
289                 } else {
290                     holdConnects.emplace_back(conn);
291                 }
292             }
293             connects = std::move(holdConnects);
294             return false;
295         });
296         connects_.EraseIf([](const auto &key, const auto &conn) -> bool {
297             if (conn.empty()) {
298                 ConnectManager::GetInstance()->OnSessionClose(DmAdapter::GetInstance().GetDeviceInfo(key).networkId);
299             }
300             return conn.empty();
301         });
302         Time next = INVALID_NEXT;
303         lock_guard<decltype(taskMutex_)> lg(taskMutex_);
304         connects_.ForEach([&next](const auto &key, auto &connects) -> bool {
305             for (auto conn : connects) {
306                 if (conn == nullptr) {
307                     continue;
308                 }
309                 auto expireTime = conn->GetExpireTime();
310                 if (expireTime < next) {
311                     next = expireTime;
312                 }
313             }
314             return false;
315         });
316         if (next == INVALID_NEXT) {
317             taskId_ = ExecutorPool::INVALID_TASK_ID;
318             return;
319         }
320         taskId_ = Context::GetInstance().GetThreadPool()->Schedule(
321             next > now ? next - now : ExecutorPool::INVALID_DELAY, GetCloseSessionTask());
322         next_ = next;
323     };
324 }
325 
GetMtuSize(const DeviceId & deviceId)326 uint32_t SoftBusAdapter::GetMtuSize(const DeviceId &deviceId)
327 {
328     uint32_t mtuSize = DEFAULT_MTU_SIZE;
329     connects_.ComputeIfPresent(deviceId.deviceId, [&mtuSize](auto, auto &connects) {
330         uint32_t mtu = 0;
331         for (auto conn : connects) {
332             if (conn == nullptr) {
333                 continue;
334             }
335             if (mtu < conn->GetMtuSize()) {
336                 mtu = conn->GetMtuSize();
337             }
338         }
339         if (mtu != 0) {
340             mtuSize = mtu;
341         }
342         return true;
343     });
344     return mtuSize;
345 }
346 
GetTimeout(const DeviceId & deviceId)347 uint32_t SoftBusAdapter::GetTimeout(const DeviceId &deviceId)
348 {
349     uint32_t interval = DEFAULT_TIMEOUT;
350     connects_.ComputeIfPresent(deviceId.deviceId, [&interval](auto, auto &connects) {
351         uint32_t time = 0;
352         for (auto conn : connects) {
353             if (conn == nullptr) {
354                 continue;
355             }
356             if (time < conn->GetTimeout()) {
357                 time = conn->GetTimeout();
358             }
359         }
360         if (time != 0) {
361             interval = time;
362         }
363         return true;
364     });
365     return interval;
366 }
367 
DelConnect(int32_t socket)368 std::string SoftBusAdapter::DelConnect(int32_t socket)
369 {
370     std::string name;
371     std::set<std::string> closedConnect;
372     connects_.EraseIf([socket, &name, &closedConnect](const auto &deviceId, auto &connects) -> bool {
373         for (auto iter = connects.begin(); iter != connects.end();) {
374             if (*iter != nullptr && **iter == socket) {
375                 name += deviceId;
376                 name += " ";
377                 iter = connects.erase(iter);
378             } else {
379                 iter++;
380             }
381         }
382         if (connects.empty()) {
383             closedConnect.insert(deviceId);
384             return true;
385         }
386         return false;
387     });
388     for (const auto &deviceId : closedConnect) {
389         auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId).networkId;
390         ConnectManager::GetInstance()->OnSessionClose(networkId);
391     }
392     return name;
393 }
394 
OnClientShutdown(int32_t socket)395 std::string SoftBusAdapter::OnClientShutdown(int32_t socket)
396 {
397     return DelConnect(socket);
398 }
399 
IsSameStartedOnPeer(const struct PipeInfo & pipeInfo,const struct DeviceId & peer)400 bool SoftBusAdapter::IsSameStartedOnPeer(const struct PipeInfo &pipeInfo,
401     __attribute__((unused)) const struct DeviceId &peer)
402 {
403     ZLOGI("pipeInfo:%{public}s deviceId:%{public}s", pipeInfo.pipeId.c_str(),
404         KvStoreUtils::ToBeAnonymous(peer.deviceId).c_str());
405     return true;
406 }
407 
SetMessageTransFlag(const PipeInfo & pipeInfo,bool flag)408 void SoftBusAdapter::SetMessageTransFlag(const PipeInfo &pipeInfo, bool flag)
409 {
410     ZLOGI("pipeInfo: %s flag: %d", pipeInfo.pipeId.c_str(), flag);
411     flag_ = flag;
412 }
413 
CreateSessionServerAdapter(const std::string & sessionName)414 int SoftBusAdapter::CreateSessionServerAdapter(const std::string &sessionName)
415 {
416     ZLOGD("begin");
417     SocketInfo socketInfo;
418     std::string sessionServerName = sessionName;
419     socketInfo.name = const_cast<char *>(sessionServerName.c_str());
420     std::string pkgName = "ohos.distributeddata";
421     socketInfo.pkgName = pkgName.data();
422     socket_ = Socket(socketInfo);
423     return Listen(socket_, Qos, QOS_COUNT, &serverListener_);
424 }
425 
RemoveSessionServerAdapter(const std::string & sessionName) const426 int SoftBusAdapter::RemoveSessionServerAdapter(const std::string &sessionName) const
427 {
428     ZLOGD("begin");
429     Shutdown(socket_);
430     return 0;
431 }
432 
NotifyDataListeners(const uint8_t * data,int size,const std::string & deviceId,const PipeInfo & pipeInfo)433 void SoftBusAdapter::NotifyDataListeners(const uint8_t *data, int size, const std::string &deviceId,
434     const PipeInfo &pipeInfo)
435 {
436     ZLOGD("begin");
437     auto ret = dataChangeListeners_.ComputeIfPresent(pipeInfo.pipeId,
438         [&data, &size, &deviceId, &pipeInfo](const auto &key, const AppDataChangeListener *&value) {
439             ZLOGD("ready to notify, pipeName:%{public}s, deviceId:%{public}s.", pipeInfo.pipeId.c_str(),
440                 KvStoreUtils::ToBeAnonymous(deviceId).c_str());
441             DeviceInfo deviceInfo = DmAdapter::GetInstance().GetDeviceInfo(deviceId);
442             value->OnMessage(deviceInfo, data, size, pipeInfo);
443             TrafficStat ts{ pipeInfo.pipeId, deviceId, 0, size };
444             Reporter::GetInstance()->TrafficStatistic()->Report(ts);
445             return true;
446         });
447     if (!ret) {
448         ZLOGW("no listener %{public}s.", pipeInfo.pipeId.c_str());
449     }
450 }
451 
Broadcast(const PipeInfo & pipeInfo,const LevelInfo & levelInfo)452 Status SoftBusAdapter::Broadcast(const PipeInfo &pipeInfo, const LevelInfo &levelInfo)
453 {
454     DataLevel level = {
455         .dynamicLevel = levelInfo.dynamic,
456         .staticLevel = levelInfo.statics,
457         .switchLevel = levelInfo.switches,
458         .switchLength = levelInfo.switchesLen,
459     };
460     auto status = SetDataLevel(&level);
461     if (status == SOFTBUS_FUNC_NOT_SUPPORT) {
462         return Status::NOT_SUPPORT_BROADCAST;
463     }
464     return status ? Status::ERROR : Status::SUCCESS;
465 }
466 
OnBroadcast(const DeviceId & device,const LevelInfo & levelInfo)467 void SoftBusAdapter::OnBroadcast(const DeviceId &device, const LevelInfo &levelInfo)
468 {
469     ZLOGI("device:%{public}s", KvStoreUtils::ToBeAnonymous(device.deviceId).c_str());
470     if (!onBroadcast_) {
471         ZLOGW("no listener device:%{public}s", KvStoreUtils::ToBeAnonymous(device.deviceId).c_str());
472         return;
473     }
474     onBroadcast_(device.deviceId, levelInfo);
475 }
476 
ListenBroadcastMsg(const PipeInfo & pipeInfo,std::function<void (const std::string &,const LevelInfo &)> listener)477 int32_t SoftBusAdapter::ListenBroadcastMsg(const PipeInfo &pipeInfo,
478     std::function<void(const std::string &, const LevelInfo &)> listener)
479 {
480     if (onBroadcast_) {
481         return SOFTBUS_ALREADY_EXISTED;
482     }
483     onBroadcast_ = std::move(listener);
484     return RegDataLevelChangeCb(pipeInfo.pipeId.c_str(), &g_callback);
485 }
486 
SetDataHandler(SoftBusAdapter * handler)487 void AppDataListenerWrap::SetDataHandler(SoftBusAdapter *handler)
488 {
489     ZLOGI("begin");
490     softBusAdapter_ = handler;
491 }
492 
OnClientShutdown(int32_t socket,ShutdownReason reason)493 void AppDataListenerWrap::OnClientShutdown(int32_t socket, ShutdownReason reason)
494 {
495     // when the local close the session, this callback function will not be triggered;
496     // when the current function is called, soft bus has released the session resource, only connId is valid;
497     std::string name = softBusAdapter_->OnClientShutdown(socket);
498     ZLOGI("[shutdown] socket:%{public}d, name:%{public}s", socket, KvStoreUtils::ToBeAnonymous(name).c_str());
499 }
500 
OnClientBytesReceived(int32_t socket,const void * data,uint32_t dataLen)501 void AppDataListenerWrap::OnClientBytesReceived(int32_t socket, const void *data, uint32_t dataLen) {}
502 
OnServerBind(int32_t socket,PeerSocketInfo info)503 void AppDataListenerWrap::OnServerBind(int32_t socket, PeerSocketInfo info)
504 {
505     softBusAdapter_->OnBind(socket, info);
506     std::string peerDevUuid = DmAdapter::GetInstance().GetUuidByNetworkId(std::string(info.networkId));
507 
508     ZLOGI("[OnServerBind] socket:%{public}d, peer name:%{public}s, peer devId:%{public}s", socket, info.name,
509         KvStoreUtils::ToBeAnonymous(peerDevUuid).c_str());
510 }
511 
OnServerShutdown(int32_t socket,ShutdownReason reason)512 void AppDataListenerWrap::OnServerShutdown(int32_t socket, ShutdownReason reason)
513 {
514     softBusAdapter_->OnServerShutdown(socket);
515     ZLOGI("Shut down reason:%{public}d socket id:%{public}d", reason, socket);
516 }
517 
OnServerBytesReceived(int32_t socket,const void * data,uint32_t dataLen)518 void AppDataListenerWrap::OnServerBytesReceived(int32_t socket, const void *data, uint32_t dataLen)
519 {
520     SoftBusAdapter::ServerSocketInfo info;
521     if (!softBusAdapter_->GetPeerSocketInfo(socket, info)) {
522         ZLOGE("Get peer socket info failed, socket id %{public}d", socket);
523         return;
524     };
525     std::string peerDevUuid = DmAdapter::GetInstance().GetUuidByNetworkId(std::string(info.networkId));
526     ZLOGD("[OnBytesReceived] socket:%{public}d, peer name:%{public}s, peer devId:%{public}s, data len:%{public}u",
527         socket, info.name.c_str(), KvStoreUtils::ToBeAnonymous(peerDevUuid).c_str(), dataLen);
528 
529     std::string pipeId = GetPipeId(info.name);
530     if (pipeId.empty()) {
531         ZLOGE("pipId is invalid");
532         return;
533     }
534 
535     NotifyDataListeners(reinterpret_cast<const uint8_t *>(data), dataLen, peerDevUuid, { pipeId, "" });
536 }
537 
GetPipeId(const std::string & name)538 std::string AppDataListenerWrap::GetPipeId(const std::string &name)
539 {
540     auto pos = name.find('_');
541     if (pos != std::string::npos) {
542         return name.substr(0, pos);
543     }
544     return name;
545 }
546 
NotifyDataListeners(const uint8_t * data,const int size,const std::string & deviceId,const PipeInfo & pipeInfo)547 void AppDataListenerWrap::NotifyDataListeners(const uint8_t *data, const int size, const std::string &deviceId,
548     const PipeInfo &pipeInfo)
549 {
550     softBusAdapter_->NotifyDataListeners(data, size, deviceId, pipeInfo);
551 }
552 
GetPeerSocketInfo(int32_t socket,ServerSocketInfo & info)553 bool SoftBusAdapter::GetPeerSocketInfo(int32_t socket, ServerSocketInfo &info)
554 {
555     auto it = peerSocketInfos_.Find(socket);
556     if (it.first) {
557         info = it.second;
558         return true;
559     }
560     return false;
561 }
562 
OnBind(int32_t socket,PeerSocketInfo info)563 void SoftBusAdapter::OnBind(int32_t socket, PeerSocketInfo info)
564 {
565     ServerSocketInfo socketInfo;
566     socketInfo.name = info.name;
567     socketInfo.networkId = info.networkId;
568     socketInfo.pkgName = info.pkgName;
569     peerSocketInfos_.Insert(socket, socketInfo);
570     if (!DmAdapter::GetInstance().IsOHOSType(info.networkId)) {
571         auto uuid = DmAdapter::GetInstance().ToUUID(info.networkId);
572         auto connects = connects_.Find(uuid);
573         if (!connects.first) {
574             return;
575         }
576         for (auto &conn : connects.second) {
577             if (!conn->isReuse) {
578                 conn->needRemove = true;
579             }
580         }
581     }
582 }
583 
OnServerShutdown(int32_t socket)584 void SoftBusAdapter::OnServerShutdown(int32_t socket)
585 {
586     peerSocketInfos_.Erase(socket);
587 }
588 
OnDeviceChanged(const AppDistributedKv::DeviceInfo & info,const AppDistributedKv::DeviceChangeType & type) const589 void SoftBusAdapter::OnDeviceChanged(const AppDistributedKv::DeviceInfo &info,
590     const AppDistributedKv::DeviceChangeType &type) const
591 {
592     return;
593 }
594 
CloseSession(const std::string & networkId)595 bool SoftBusAdapter::CloseSession(const std::string &networkId)
596 {
597     auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
598     auto ret = connects_.Erase(uuid);
599     if (ret != 0) {
600         ConnectManager::GetInstance()->OnSessionClose(networkId);
601     }
602     return ret != 0;
603 }
604 } // namespace AppDistributedKv
605 } // namespace OHOS