1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <mutex>
16 #include <string>
17 #include <thread>
18
19 #include "communicator_context.h"
20 #include "communication/connect_manager.h"
21 #include "data_level.h"
22 #include "device_manager_adapter.h"
23 #include "dfx_types.h"
24 #include "kvstore_utils.h"
25 #include "log_print.h"
26 #include "reporter.h"
27 #include "securec.h"
28 #include "session.h"
29 #include "softbus_adapter.h"
30 #include "softbus_bus_center.h"
31 #include "softbus_error_code.h"
32 #ifdef LOG_TAG
33 #undef LOG_TAG
34 #endif
35 #define LOG_TAG "SoftBusAdapter"
36
37 namespace OHOS {
38 namespace AppDistributedKv {
39 using Context = DistributedData::CommunicatorContext;
40 constexpr uint32_t DEFAULT_MTU_SIZE = 4096 * 1024u;
41 constexpr uint32_t DEFAULT_TIMEOUT = 30 * 1000;
42 using namespace std;
43 using namespace OHOS::DistributedDataDfx;
44 using namespace OHOS::DistributedKv;
45 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
46
47 class AppDataListenerWrap {
48 public:
49 static void SetDataHandler(SoftBusAdapter *handler);
50
51 static void OnClientShutdown(int32_t socket, ShutdownReason reason);
52 static void OnClientBytesReceived(int32_t socket, const void *data, uint32_t dataLen);
53
54 static void OnServerBind(int32_t socket, PeerSocketInfo info);
55 static void OnServerShutdown(int32_t socket, ShutdownReason reason);
56 static void OnServerBytesReceived(int32_t socket, const void *data, uint32_t dataLen);
57
58 public:
59 // notify all listeners when received message
60 static void NotifyDataListeners(const uint8_t *data, const int size, const std::string &deviceId,
61 const PipeInfo &pipeInfo);
62 static std::string GetPipeId(const std::string &name);
63
64 private:
65 static SoftBusAdapter *softBusAdapter_;
66 };
67 SoftBusAdapter *AppDataListenerWrap::softBusAdapter_;
68 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::instance_;
69
70 namespace {
OnDataLevelChanged(const char * networkId,const DataLevel dataLevel)71 void OnDataLevelChanged(const char* networkId, const DataLevel dataLevel)
72 {
73 if (networkId == nullptr) {
74 return;
75 }
76 LevelInfo level = {
77 .dynamic = dataLevel.dynamicLevel,
78 .statics = dataLevel.staticLevel,
79 .switches = dataLevel.switchLevel,
80 .switchesLen = dataLevel.switchLength,
81 };
82 auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
83 SoftBusAdapter::GetInstance()->OnBroadcast({ uuid }, std::move(level));
84 }
85
86 IDataLevelCb g_callback = {
87 .onDataLevelChanged = OnDataLevelChanged,
88 };
89 } // namespace
SoftBusAdapter()90 SoftBusAdapter::SoftBusAdapter()
91 {
92 ZLOGI("begin");
93 AppDataListenerWrap::SetDataHandler(this);
94
95 clientListener_.OnShutdown = AppDataListenerWrap::OnClientShutdown;
96 clientListener_.OnBytes = AppDataListenerWrap::OnClientBytesReceived;
97 clientListener_.OnMessage = AppDataListenerWrap::OnClientBytesReceived;
98
99 serverListener_.OnBind = AppDataListenerWrap::OnServerBind;
100 serverListener_.OnShutdown = AppDataListenerWrap::OnServerShutdown;
101 serverListener_.OnBytes = AppDataListenerWrap::OnServerBytesReceived;
102 serverListener_.OnMessage = AppDataListenerWrap::OnServerBytesReceived;
103
104 auto status = DmAdapter::GetInstance().StartWatchDeviceChange(this, { "softBusAdapter" });
105 if (status != Status::SUCCESS) {
106 ZLOGW("register device change failed, status:%d", static_cast<int>(status));
107 }
108
109 Context::GetInstance().SetSessionListener([this](const std::string &deviceId) {
110 StartCloseSessionTask(deviceId);
111 });
112
113 ConnectManager::GetInstance()->RegisterCloseSessionTask([this](const std::string &networkId) {
114 return CloseSession(networkId);
115 });
116 ConnectManager::GetInstance()->RegisterSessionCloseListener("context", [](const std::string &networkId) {
117 auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
118 Context::GetInstance().NotifySessionClose(uuid);
119 });
120 ConnectManager::GetInstance()->OnStart();
121 }
122
~SoftBusAdapter()123 SoftBusAdapter::~SoftBusAdapter()
124 {
125 ZLOGI("begin");
126 if (onBroadcast_) {
127 UnregDataLevelChangeCb(PKG_NAME);
128 }
129 connects_.Clear();
130 ConnectManager::GetInstance()->OnDestory();
131 }
132
GetInstance()133 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::GetInstance()
134 {
135 static std::once_flag onceFlag;
136 std::call_once(onceFlag, [&] {
137 instance_ = std::make_shared<SoftBusAdapter>();
138 });
139 return instance_;
140 }
141
StartWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)142 Status SoftBusAdapter::StartWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo)
143 {
144 ZLOGD("begin");
145 if (observer == nullptr) {
146 return Status::INVALID_ARGUMENT;
147 }
148
149 auto ret = dataChangeListeners_.Insert(pipeInfo.pipeId, observer);
150 if (!ret) {
151 ZLOGW("Add listener error or repeated adding.");
152 return Status::ERROR;
153 }
154
155 return Status::SUCCESS;
156 }
157
StopWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)158 Status SoftBusAdapter::StopWatchDataChange(__attribute__((unused)) const AppDataChangeListener *observer,
159 const PipeInfo &pipeInfo)
160 {
161 ZLOGD("begin");
162 if (dataChangeListeners_.Erase(pipeInfo.pipeId) != 0) {
163 return Status::SUCCESS;
164 }
165 ZLOGW("stop data observer error, pipeInfo:%{public}s", pipeInfo.pipeId.c_str());
166 return Status::ERROR;
167 }
168
Reuse(const PipeInfo & pipeInfo,const DeviceId & deviceId,uint32_t qosType,std::shared_ptr<SoftBusClient> & conn)169 void SoftBusAdapter::Reuse(const PipeInfo &pipeInfo, const DeviceId &deviceId,
170 uint32_t qosType, std::shared_ptr<SoftBusClient> &conn)
171 {
172 std::vector<std::shared_ptr<SoftBusClient>> connects;
173 auto connect = std::make_shared<SoftBusClient>(pipeInfo, deviceId, qosType);
174 connect->isReuse = true;
175 connects.emplace_back(connect);
176 conn = connect;
177 connects_.Insert(deviceId.deviceId, connects);
178 ZLOGI("reuse connect:%{public}s", KvStoreUtils::ToBeAnonymous(deviceId.deviceId).c_str());
179 }
180
GetExpireTime(std::shared_ptr<SoftBusClient> & conn)181 void SoftBusAdapter::GetExpireTime(std::shared_ptr<SoftBusClient> &conn)
182 {
183 Time now = std::chrono::steady_clock::now();
184 auto expireTime = conn->GetExpireTime() > now ? conn->GetExpireTime() : now;
185 lock_guard<decltype(taskMutex_)> lock(taskMutex_);
186 if (taskId_ != ExecutorPool::INVALID_TASK_ID && expireTime < next_) {
187 taskId_ = Context::GetInstance().GetThreadPool()->Reset(taskId_, expireTime - now);
188 next_ = expireTime;
189 }
190 }
191
SendData(const PipeInfo & pipeInfo,const DeviceId & deviceId,const DataInfo & dataInfo,uint32_t length,const MessageInfo & info)192 Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
193 uint32_t length, const MessageInfo &info)
194 {
195 std::shared_ptr<SoftBusClient> conn;
196 bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId.deviceId);
197 uint32_t qosType = isOHOSType ? SoftBusClient::QOS_HML : SoftBusClient::QOS_BR;
198 bool isReuse = false;
199 connects_.Compute(deviceId.deviceId, [&pipeInfo, &deviceId, &conn, qosType, isOHOSType, &isReuse](const auto &key,
200 std::vector<std::shared_ptr<SoftBusClient>> &connects) -> bool {
201 for (auto &connect : connects) {
202 if (connect->GetQoSType() != qosType) {
203 continue;
204 }
205 if (!isOHOSType && connect->needRemove) {
206 isReuse = true;
207 return false;
208 }
209 conn = connect;
210 return true;
211 }
212 auto connect = std::make_shared<SoftBusClient>(pipeInfo, deviceId, qosType);
213 connects.emplace_back(connect);
214 conn = connect;
215 return true;
216 });
217 if (!isOHOSType && isReuse) {
218 Reuse(pipeInfo, deviceId, qosType, conn);
219 }
220 if (conn == nullptr) {
221 return Status::ERROR;
222 }
223 auto status = conn->CheckStatus();
224 if (status == Status::RATE_LIMIT) {
225 return Status::RATE_LIMIT;
226 }
227 if (status != Status::SUCCESS) {
228 auto task = [this, connect = std::weak_ptr<SoftBusClient>(conn)]() {
229 auto conn = connect.lock();
230 if (conn != nullptr) {
231 conn->OpenConnect(&clientListener_);
232 }
233 };
234 auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId.deviceId).networkId;
235 ConnectManager::GetInstance()->ApplyConnect(networkId, task);
236 return Status::RATE_LIMIT;
237 }
238
239 status = conn->SendData(dataInfo, &clientListener_);
240 if ((status != Status::NETWORK_ERROR) && (status != Status::RATE_LIMIT)) {
241 GetExpireTime(conn);
242 }
243 return status;
244 }
245
StartCloseSessionTask(const std::string & deviceId)246 void SoftBusAdapter::StartCloseSessionTask(const std::string &deviceId)
247 {
248 std::shared_ptr<SoftBusClient> conn;
249 bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId);
250 uint32_t qosType = isOHOSType ? SoftBusClient::QOS_HML : SoftBusClient::QOS_BR;
251 auto connects = connects_.Find(deviceId);
252 if (!connects.first) {
253 return;
254 }
255 for (auto &connect : connects.second) {
256 if (connect->GetQoSType() == qosType) {
257 conn = connect;
258 break;
259 }
260 }
261 if (conn == nullptr) {
262 return;
263 }
264 Time now = std::chrono::steady_clock::now();
265 auto expireTime = conn->GetExpireTime() > now ? conn->GetExpireTime() : now;
266 lock_guard<decltype(taskMutex_)> lock(taskMutex_);
267 if (taskId_ == ExecutorPool::INVALID_TASK_ID) {
268 ZLOGI("Start close session, deviceId:%{public}s", KvStoreUtils::ToBeAnonymous(deviceId).c_str());
269 taskId_ = Context::GetInstance().GetThreadPool()->Schedule(expireTime - now, GetCloseSessionTask());
270 next_ = expireTime;
271 }
272 }
273
GetCloseSessionTask()274 SoftBusAdapter::Task SoftBusAdapter::GetCloseSessionTask()
275 {
276 return [this]() mutable {
277 Time now = std::chrono::steady_clock::now();
278 std::vector<std::shared_ptr<SoftBusClient>> connToClose;
279 connects_.ForEach([&now, &connToClose](const auto &key, auto &connects) -> bool {
280 std::vector<std::shared_ptr<SoftBusClient>> holdConnects;
281 for (auto conn : connects) {
282 if (conn == nullptr) {
283 continue;
284 }
285 auto expireTime = conn->GetExpireTime();
286 if (expireTime <= now) {
287 ZLOGI("[timeout] close session socket:%{public}d", conn->GetSocket());
288 connToClose.emplace_back(conn);
289 } else {
290 holdConnects.emplace_back(conn);
291 }
292 }
293 connects = std::move(holdConnects);
294 return false;
295 });
296 connects_.EraseIf([](const auto &key, const auto &conn) -> bool {
297 if (conn.empty()) {
298 ConnectManager::GetInstance()->OnSessionClose(DmAdapter::GetInstance().GetDeviceInfo(key).networkId);
299 }
300 return conn.empty();
301 });
302 Time next = INVALID_NEXT;
303 lock_guard<decltype(taskMutex_)> lg(taskMutex_);
304 connects_.ForEach([&next](const auto &key, auto &connects) -> bool {
305 for (auto conn : connects) {
306 if (conn == nullptr) {
307 continue;
308 }
309 auto expireTime = conn->GetExpireTime();
310 if (expireTime < next) {
311 next = expireTime;
312 }
313 }
314 return false;
315 });
316 if (next == INVALID_NEXT) {
317 taskId_ = ExecutorPool::INVALID_TASK_ID;
318 return;
319 }
320 taskId_ = Context::GetInstance().GetThreadPool()->Schedule(
321 next > now ? next - now : ExecutorPool::INVALID_DELAY, GetCloseSessionTask());
322 next_ = next;
323 };
324 }
325
GetMtuSize(const DeviceId & deviceId)326 uint32_t SoftBusAdapter::GetMtuSize(const DeviceId &deviceId)
327 {
328 uint32_t mtuSize = DEFAULT_MTU_SIZE;
329 connects_.ComputeIfPresent(deviceId.deviceId, [&mtuSize](auto, auto &connects) {
330 uint32_t mtu = 0;
331 for (auto conn : connects) {
332 if (conn == nullptr) {
333 continue;
334 }
335 if (mtu < conn->GetMtuSize()) {
336 mtu = conn->GetMtuSize();
337 }
338 }
339 if (mtu != 0) {
340 mtuSize = mtu;
341 }
342 return true;
343 });
344 return mtuSize;
345 }
346
GetTimeout(const DeviceId & deviceId)347 uint32_t SoftBusAdapter::GetTimeout(const DeviceId &deviceId)
348 {
349 uint32_t interval = DEFAULT_TIMEOUT;
350 connects_.ComputeIfPresent(deviceId.deviceId, [&interval](auto, auto &connects) {
351 uint32_t time = 0;
352 for (auto conn : connects) {
353 if (conn == nullptr) {
354 continue;
355 }
356 if (time < conn->GetTimeout()) {
357 time = conn->GetTimeout();
358 }
359 }
360 if (time != 0) {
361 interval = time;
362 }
363 return true;
364 });
365 return interval;
366 }
367
DelConnect(int32_t socket)368 std::string SoftBusAdapter::DelConnect(int32_t socket)
369 {
370 std::string name;
371 std::set<std::string> closedConnect;
372 connects_.EraseIf([socket, &name, &closedConnect](const auto &deviceId, auto &connects) -> bool {
373 for (auto iter = connects.begin(); iter != connects.end();) {
374 if (*iter != nullptr && **iter == socket) {
375 name += deviceId;
376 name += " ";
377 iter = connects.erase(iter);
378 } else {
379 iter++;
380 }
381 }
382 if (connects.empty()) {
383 closedConnect.insert(deviceId);
384 return true;
385 }
386 return false;
387 });
388 for (const auto &deviceId : closedConnect) {
389 auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId).networkId;
390 ConnectManager::GetInstance()->OnSessionClose(networkId);
391 }
392 return name;
393 }
394
OnClientShutdown(int32_t socket)395 std::string SoftBusAdapter::OnClientShutdown(int32_t socket)
396 {
397 return DelConnect(socket);
398 }
399
IsSameStartedOnPeer(const struct PipeInfo & pipeInfo,const struct DeviceId & peer)400 bool SoftBusAdapter::IsSameStartedOnPeer(const struct PipeInfo &pipeInfo,
401 __attribute__((unused)) const struct DeviceId &peer)
402 {
403 ZLOGI("pipeInfo:%{public}s deviceId:%{public}s", pipeInfo.pipeId.c_str(),
404 KvStoreUtils::ToBeAnonymous(peer.deviceId).c_str());
405 return true;
406 }
407
SetMessageTransFlag(const PipeInfo & pipeInfo,bool flag)408 void SoftBusAdapter::SetMessageTransFlag(const PipeInfo &pipeInfo, bool flag)
409 {
410 ZLOGI("pipeInfo: %s flag: %d", pipeInfo.pipeId.c_str(), flag);
411 flag_ = flag;
412 }
413
CreateSessionServerAdapter(const std::string & sessionName)414 int SoftBusAdapter::CreateSessionServerAdapter(const std::string &sessionName)
415 {
416 ZLOGD("begin");
417 SocketInfo socketInfo;
418 std::string sessionServerName = sessionName;
419 socketInfo.name = const_cast<char *>(sessionServerName.c_str());
420 std::string pkgName = "ohos.distributeddata";
421 socketInfo.pkgName = pkgName.data();
422 socket_ = Socket(socketInfo);
423 return Listen(socket_, Qos, QOS_COUNT, &serverListener_);
424 }
425
RemoveSessionServerAdapter(const std::string & sessionName) const426 int SoftBusAdapter::RemoveSessionServerAdapter(const std::string &sessionName) const
427 {
428 ZLOGD("begin");
429 Shutdown(socket_);
430 return 0;
431 }
432
NotifyDataListeners(const uint8_t * data,int size,const std::string & deviceId,const PipeInfo & pipeInfo)433 void SoftBusAdapter::NotifyDataListeners(const uint8_t *data, int size, const std::string &deviceId,
434 const PipeInfo &pipeInfo)
435 {
436 ZLOGD("begin");
437 auto ret = dataChangeListeners_.ComputeIfPresent(pipeInfo.pipeId,
438 [&data, &size, &deviceId, &pipeInfo](const auto &key, const AppDataChangeListener *&value) {
439 ZLOGD("ready to notify, pipeName:%{public}s, deviceId:%{public}s.", pipeInfo.pipeId.c_str(),
440 KvStoreUtils::ToBeAnonymous(deviceId).c_str());
441 DeviceInfo deviceInfo = DmAdapter::GetInstance().GetDeviceInfo(deviceId);
442 value->OnMessage(deviceInfo, data, size, pipeInfo);
443 TrafficStat ts{ pipeInfo.pipeId, deviceId, 0, size };
444 Reporter::GetInstance()->TrafficStatistic()->Report(ts);
445 return true;
446 });
447 if (!ret) {
448 ZLOGW("no listener %{public}s.", pipeInfo.pipeId.c_str());
449 }
450 }
451
Broadcast(const PipeInfo & pipeInfo,const LevelInfo & levelInfo)452 Status SoftBusAdapter::Broadcast(const PipeInfo &pipeInfo, const LevelInfo &levelInfo)
453 {
454 DataLevel level = {
455 .dynamicLevel = levelInfo.dynamic,
456 .staticLevel = levelInfo.statics,
457 .switchLevel = levelInfo.switches,
458 .switchLength = levelInfo.switchesLen,
459 };
460 auto status = SetDataLevel(&level);
461 if (status == SOFTBUS_FUNC_NOT_SUPPORT) {
462 return Status::NOT_SUPPORT_BROADCAST;
463 }
464 return status ? Status::ERROR : Status::SUCCESS;
465 }
466
OnBroadcast(const DeviceId & device,const LevelInfo & levelInfo)467 void SoftBusAdapter::OnBroadcast(const DeviceId &device, const LevelInfo &levelInfo)
468 {
469 ZLOGI("device:%{public}s", KvStoreUtils::ToBeAnonymous(device.deviceId).c_str());
470 if (!onBroadcast_) {
471 ZLOGW("no listener device:%{public}s", KvStoreUtils::ToBeAnonymous(device.deviceId).c_str());
472 return;
473 }
474 onBroadcast_(device.deviceId, levelInfo);
475 }
476
ListenBroadcastMsg(const PipeInfo & pipeInfo,std::function<void (const std::string &,const LevelInfo &)> listener)477 int32_t SoftBusAdapter::ListenBroadcastMsg(const PipeInfo &pipeInfo,
478 std::function<void(const std::string &, const LevelInfo &)> listener)
479 {
480 if (onBroadcast_) {
481 return SOFTBUS_ALREADY_EXISTED;
482 }
483 onBroadcast_ = std::move(listener);
484 return RegDataLevelChangeCb(pipeInfo.pipeId.c_str(), &g_callback);
485 }
486
SetDataHandler(SoftBusAdapter * handler)487 void AppDataListenerWrap::SetDataHandler(SoftBusAdapter *handler)
488 {
489 ZLOGI("begin");
490 softBusAdapter_ = handler;
491 }
492
OnClientShutdown(int32_t socket,ShutdownReason reason)493 void AppDataListenerWrap::OnClientShutdown(int32_t socket, ShutdownReason reason)
494 {
495 // when the local close the session, this callback function will not be triggered;
496 // when the current function is called, soft bus has released the session resource, only connId is valid;
497 std::string name = softBusAdapter_->OnClientShutdown(socket);
498 ZLOGI("[shutdown] socket:%{public}d, name:%{public}s", socket, KvStoreUtils::ToBeAnonymous(name).c_str());
499 }
500
OnClientBytesReceived(int32_t socket,const void * data,uint32_t dataLen)501 void AppDataListenerWrap::OnClientBytesReceived(int32_t socket, const void *data, uint32_t dataLen) {}
502
OnServerBind(int32_t socket,PeerSocketInfo info)503 void AppDataListenerWrap::OnServerBind(int32_t socket, PeerSocketInfo info)
504 {
505 softBusAdapter_->OnBind(socket, info);
506 std::string peerDevUuid = DmAdapter::GetInstance().GetUuidByNetworkId(std::string(info.networkId));
507
508 ZLOGI("[OnServerBind] socket:%{public}d, peer name:%{public}s, peer devId:%{public}s", socket, info.name,
509 KvStoreUtils::ToBeAnonymous(peerDevUuid).c_str());
510 }
511
OnServerShutdown(int32_t socket,ShutdownReason reason)512 void AppDataListenerWrap::OnServerShutdown(int32_t socket, ShutdownReason reason)
513 {
514 softBusAdapter_->OnServerShutdown(socket);
515 ZLOGI("Shut down reason:%{public}d socket id:%{public}d", reason, socket);
516 }
517
OnServerBytesReceived(int32_t socket,const void * data,uint32_t dataLen)518 void AppDataListenerWrap::OnServerBytesReceived(int32_t socket, const void *data, uint32_t dataLen)
519 {
520 SoftBusAdapter::ServerSocketInfo info;
521 if (!softBusAdapter_->GetPeerSocketInfo(socket, info)) {
522 ZLOGE("Get peer socket info failed, socket id %{public}d", socket);
523 return;
524 };
525 std::string peerDevUuid = DmAdapter::GetInstance().GetUuidByNetworkId(std::string(info.networkId));
526 ZLOGD("[OnBytesReceived] socket:%{public}d, peer name:%{public}s, peer devId:%{public}s, data len:%{public}u",
527 socket, info.name.c_str(), KvStoreUtils::ToBeAnonymous(peerDevUuid).c_str(), dataLen);
528
529 std::string pipeId = GetPipeId(info.name);
530 if (pipeId.empty()) {
531 ZLOGE("pipId is invalid");
532 return;
533 }
534
535 NotifyDataListeners(reinterpret_cast<const uint8_t *>(data), dataLen, peerDevUuid, { pipeId, "" });
536 }
537
GetPipeId(const std::string & name)538 std::string AppDataListenerWrap::GetPipeId(const std::string &name)
539 {
540 auto pos = name.find('_');
541 if (pos != std::string::npos) {
542 return name.substr(0, pos);
543 }
544 return name;
545 }
546
NotifyDataListeners(const uint8_t * data,const int size,const std::string & deviceId,const PipeInfo & pipeInfo)547 void AppDataListenerWrap::NotifyDataListeners(const uint8_t *data, const int size, const std::string &deviceId,
548 const PipeInfo &pipeInfo)
549 {
550 softBusAdapter_->NotifyDataListeners(data, size, deviceId, pipeInfo);
551 }
552
GetPeerSocketInfo(int32_t socket,ServerSocketInfo & info)553 bool SoftBusAdapter::GetPeerSocketInfo(int32_t socket, ServerSocketInfo &info)
554 {
555 auto it = peerSocketInfos_.Find(socket);
556 if (it.first) {
557 info = it.second;
558 return true;
559 }
560 return false;
561 }
562
OnBind(int32_t socket,PeerSocketInfo info)563 void SoftBusAdapter::OnBind(int32_t socket, PeerSocketInfo info)
564 {
565 ServerSocketInfo socketInfo;
566 socketInfo.name = info.name;
567 socketInfo.networkId = info.networkId;
568 socketInfo.pkgName = info.pkgName;
569 peerSocketInfos_.Insert(socket, socketInfo);
570 if (!DmAdapter::GetInstance().IsOHOSType(info.networkId)) {
571 auto uuid = DmAdapter::GetInstance().ToUUID(info.networkId);
572 auto connects = connects_.Find(uuid);
573 if (!connects.first) {
574 return;
575 }
576 for (auto &conn : connects.second) {
577 if (!conn->isReuse) {
578 conn->needRemove = true;
579 }
580 }
581 }
582 }
583
OnServerShutdown(int32_t socket)584 void SoftBusAdapter::OnServerShutdown(int32_t socket)
585 {
586 peerSocketInfos_.Erase(socket);
587 }
588
OnDeviceChanged(const AppDistributedKv::DeviceInfo & info,const AppDistributedKv::DeviceChangeType & type) const589 void SoftBusAdapter::OnDeviceChanged(const AppDistributedKv::DeviceInfo &info,
590 const AppDistributedKv::DeviceChangeType &type) const
591 {
592 return;
593 }
594
CloseSession(const std::string & networkId)595 bool SoftBusAdapter::CloseSession(const std::string &networkId)
596 {
597 auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
598 auto ret = connects_.Erase(uuid);
599 if (ret != 0) {
600 ConnectManager::GetInstance()->OnSessionClose(networkId);
601 }
602 return ret != 0;
603 }
604 } // namespace AppDistributedKv
605 } // namespace OHOS