1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <mutex>
16 #include <string>
17 #include <thread>
18
19 #include "communicator_context.h"
20 #include "communication/connect_manager.h"
21 #include "data_level.h"
22 #include "device_manager_adapter.h"
23 #include "dfx_types.h"
24 #include "kvstore_utils.h"
25 #include "log_print.h"
26 #include "reporter.h"
27 #include "securec.h"
28 #include "session.h"
29 #include "softbus_adapter.h"
30 #include "softbus_bus_center.h"
31 #include "softbus_error_code.h"
32 #ifdef LOG_TAG
33 #undef LOG_TAG
34 #endif
35 #define LOG_TAG "SoftBusAdapter"
36
37 namespace OHOS {
38 namespace AppDistributedKv {
39 using Context = DistributedData::CommunicatorContext;
40 constexpr uint32_t DEFAULT_MTU_SIZE = 4096 * 1024u;
41 constexpr uint32_t DEFAULT_TIMEOUT = 30 * 1000;
42 using namespace std;
43 using namespace OHOS::DistributedDataDfx;
44 using namespace OHOS::DistributedKv;
45 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
46
47 class AppDataListenerWrap {
48 public:
49 static void SetDataHandler(SoftBusAdapter *handler);
50
51 static void OnClientShutdown(int32_t socket, ShutdownReason reason);
52 static void OnClientBytesReceived(int32_t socket, const void *data, uint32_t dataLen);
53 static void OnClientSocketChanged(int32_t socket, QoSEvent eventId, const QosTV *qos, uint32_t qosCount);
54
55 static void OnServerBind(int32_t socket, PeerSocketInfo info);
56 static void OnServerShutdown(int32_t socket, ShutdownReason reason);
57 static void OnServerBytesReceived(int32_t socket, const void *data, uint32_t dataLen);
58
59 private:
60 // notify all listeners when received message
61 static void NotifyDataListeners(const uint8_t *data, const int size, const std::string &deviceId,
62 const PipeInfo &pipeInfo);
63 static std::string GetPipeId(const std::string &name);
64
65 static SoftBusAdapter *softBusAdapter_;
66 };
67
68 SoftBusAdapter *AppDataListenerWrap::softBusAdapter_;
69 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::instance_;
70
71 namespace {
OnDataLevelChanged(const char * networkId,const DataLevel dataLevel)72 void OnDataLevelChanged(const char* networkId, const DataLevel dataLevel)
73 {
74 if (networkId == nullptr) {
75 return;
76 }
77 LevelInfo level = {
78 .dynamic = dataLevel.dynamicLevel,
79 .statics = dataLevel.staticLevel,
80 .switches = dataLevel.switchLevel,
81 .switchesLen = dataLevel.switchLength,
82 };
83 auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
84 SoftBusAdapter::GetInstance()->OnBroadcast({ uuid }, std::move(level));
85 }
86
87 IDataLevelCb g_callback = {
88 .onDataLevelChanged = OnDataLevelChanged,
89 };
90 } // namespace
SoftBusAdapter()91 SoftBusAdapter::SoftBusAdapter()
92 {
93 ZLOGI("begin");
94 AppDataListenerWrap::SetDataHandler(this);
95
96 clientListener_.OnShutdown = AppDataListenerWrap::OnClientShutdown;
97 clientListener_.OnBytes = AppDataListenerWrap::OnClientBytesReceived;
98 clientListener_.OnMessage = AppDataListenerWrap::OnClientBytesReceived;
99 clientListener_.OnQos = AppDataListenerWrap::OnClientSocketChanged;
100
101 serverListener_.OnBind = AppDataListenerWrap::OnServerBind;
102 serverListener_.OnShutdown = AppDataListenerWrap::OnServerShutdown;
103 serverListener_.OnBytes = AppDataListenerWrap::OnServerBytesReceived;
104 serverListener_.OnMessage = AppDataListenerWrap::OnServerBytesReceived;
105
106 auto status = DmAdapter::GetInstance().StartWatchDeviceChange(this, { "softBusAdapter" });
107 if (status != Status::SUCCESS) {
108 ZLOGW("register device change failed, status:%d", static_cast<int>(status));
109 }
110
111 Context::GetInstance().SetSessionListener([this](const std::string &deviceId) {
112 StartCloseSessionTask(deviceId);
113 });
114
115 ConnectManager::GetInstance()->RegisterCloseSessionTask([this](const std::string &networkId) {
116 return CloseSession(networkId);
117 });
118 ConnectManager::GetInstance()->RegisterSessionCloseListener("context", [](const std::string &networkId) {
119 auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
120 Context::GetInstance().NotifySessionClose(uuid);
121 });
122 ConnectManager::GetInstance()->OnStart();
123 }
124
~SoftBusAdapter()125 SoftBusAdapter::~SoftBusAdapter()
126 {
127 ZLOGI("begin");
128 if (onBroadcast_) {
129 UnregDataLevelChangeCb(PKG_NAME);
130 }
131 connects_.Clear();
132 ConnectManager::GetInstance()->OnDestory();
133 }
134
GetInstance()135 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::GetInstance()
136 {
137 static std::once_flag onceFlag;
138 std::call_once(onceFlag, [&] {
139 instance_ = std::make_shared<SoftBusAdapter>();
140 });
141 return instance_;
142 }
143
StartWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)144 Status SoftBusAdapter::StartWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo)
145 {
146 ZLOGD("begin");
147 if (observer == nullptr) {
148 return Status::INVALID_ARGUMENT;
149 }
150
151 auto ret = dataChangeListeners_.Insert(pipeInfo.pipeId, observer);
152 if (!ret) {
153 ZLOGW("Add listener error or repeated adding.");
154 return Status::ERROR;
155 }
156
157 return Status::SUCCESS;
158 }
159
StopWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)160 Status SoftBusAdapter::StopWatchDataChange(__attribute__((unused)) const AppDataChangeListener *observer,
161 const PipeInfo &pipeInfo)
162 {
163 ZLOGD("begin");
164 if (dataChangeListeners_.Erase(pipeInfo.pipeId) != 0) {
165 return Status::SUCCESS;
166 }
167 ZLOGW("stop data observer error, pipeInfo:%{public}s", pipeInfo.pipeId.c_str());
168 return Status::ERROR;
169 }
170
GetExpireTime(std::shared_ptr<SoftBusClient> & conn)171 void SoftBusAdapter::GetExpireTime(std::shared_ptr<SoftBusClient> &conn)
172 {
173 Time now = std::chrono::steady_clock::now();
174 auto expireTime = conn->GetExpireTime() > now ? conn->GetExpireTime() : now;
175 lock_guard<decltype(taskMutex_)> lock(taskMutex_);
176 if (taskId_ != ExecutorPool::INVALID_TASK_ID && expireTime < next_) {
177 taskId_ = Context::GetInstance().GetThreadPool()->Reset(taskId_, expireTime - now);
178 next_ = expireTime;
179 }
180 }
181
SendData(const PipeInfo & pipeInfo,const DeviceId & deviceId,const DataInfo & dataInfo,uint32_t length,const MessageInfo & info)182 std::pair<Status, int32_t> SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
183 const DataInfo &dataInfo, uint32_t length, const MessageInfo &info)
184 {
185 bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId.deviceId);
186 uint32_t qosType = isOHOSType ? SoftBusClient::QOS_HML : SoftBusClient::QOS_BR;
187 std::shared_ptr<SoftBusClient> conn = GetConnect(pipeInfo, deviceId, qosType);
188 if (conn == nullptr) {
189 return std::make_pair(Status::ERROR, 0);
190 }
191 auto status = conn->CheckStatus();
192 if (status == Status::RATE_LIMIT) {
193 return std::make_pair(Status::RATE_LIMIT, 0);
194 }
195 if (status != Status::SUCCESS) {
196 return OpenConnect(conn, deviceId);
197 }
198 status = conn->SendData(dataInfo, &clientListener_);
199 if ((status != Status::NETWORK_ERROR) && (status != Status::RATE_LIMIT)) {
200 GetExpireTime(conn);
201 }
202 auto errCode = conn->GetSoftBusError();
203 return std::make_pair(status, errCode);
204 }
205
GetConnect(const PipeInfo & pipeInfo,const DeviceId & deviceId,uint32_t qosType)206 std::shared_ptr<SoftBusClient> SoftBusAdapter::GetConnect(const PipeInfo &pipeInfo, const DeviceId &deviceId,
207 uint32_t qosType)
208 {
209 std::shared_ptr<SoftBusClient> conn;
210 std::string networkId = DmAdapter::GetInstance().ToNetworkID(deviceId.deviceId);
211 connects_.Compute(deviceId.deviceId, [&pipeInfo, &deviceId, &conn, qosType, &networkId](const auto &key,
212 std::vector<std::shared_ptr<SoftBusClient>> &connects) -> bool {
213 for (auto &connect : connects) {
214 if (connect == nullptr) {
215 continue;
216 }
217 if (connect->GetQoSType() == qosType) {
218 conn = connect;
219 return true;
220 }
221 }
222 auto connect = std::make_shared<SoftBusClient>(pipeInfo, deviceId, networkId, qosType);
223 connects.emplace_back(connect);
224 conn = connect;
225 return true;
226 });
227 return conn;
228 }
229
OpenConnect(const std::shared_ptr<SoftBusClient> & conn,const DeviceId & deviceId)230 std::pair<Status, int32_t> SoftBusAdapter::OpenConnect(const std::shared_ptr<SoftBusClient> &conn,
231 const DeviceId &deviceId)
232 {
233 auto task = [this, connect = std::weak_ptr<SoftBusClient>(conn)]() {
234 auto conn = connect.lock();
235 if (conn != nullptr) {
236 conn->OpenConnect(&clientListener_);
237 }
238 };
239 auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId.deviceId).networkId;
240 ConnectManager::GetInstance()->ApplyConnect(networkId, task);
241 return std::make_pair(Status::RATE_LIMIT, 0);
242 }
243
StartCloseSessionTask(const std::string & deviceId)244 void SoftBusAdapter::StartCloseSessionTask(const std::string &deviceId)
245 {
246 std::shared_ptr<SoftBusClient> conn;
247 bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId);
248 uint32_t qosType = isOHOSType ? SoftBusClient::QOS_HML : SoftBusClient::QOS_BR;
249 auto connects = connects_.Find(deviceId);
250 if (!connects.first) {
251 return;
252 }
253 for (auto &connect : connects.second) {
254 if (connect->GetQoSType() == qosType) {
255 conn = connect;
256 break;
257 }
258 }
259 if (conn == nullptr) {
260 return;
261 }
262 Time now = std::chrono::steady_clock::now();
263 auto expireTime = conn->GetExpireTime() > now ? conn->GetExpireTime() : now;
264 lock_guard<decltype(taskMutex_)> lock(taskMutex_);
265 if (taskId_ == ExecutorPool::INVALID_TASK_ID) {
266 ZLOGI("Start close session, deviceId:%{public}s", KvStoreUtils::ToBeAnonymous(deviceId).c_str());
267 taskId_ = Context::GetInstance().GetThreadPool()->Schedule(expireTime - now, GetCloseSessionTask());
268 next_ = expireTime;
269 }
270 }
271
GetCloseSessionTask()272 SoftBusAdapter::Task SoftBusAdapter::GetCloseSessionTask()
273 {
274 return [this]() mutable {
275 Time now = std::chrono::steady_clock::now();
276 std::vector<std::shared_ptr<SoftBusClient>> connToClose;
277 connects_.ForEach([&now, &connToClose](const auto &key, auto &connects) -> bool {
278 std::vector<std::shared_ptr<SoftBusClient>> holdConnects;
279 for (auto conn : connects) {
280 if (conn == nullptr) {
281 continue;
282 }
283 auto expireTime = conn->GetExpireTime();
284 if (expireTime <= now) {
285 ZLOGI("[timeout] close session socket:%{public}d", conn->GetSocket());
286 connToClose.emplace_back(conn);
287 } else {
288 holdConnects.emplace_back(conn);
289 }
290 }
291 connects = std::move(holdConnects);
292 return false;
293 });
294 connects_.EraseIf([](const auto &key, const auto &conn) -> bool {
295 if (conn.empty()) {
296 ConnectManager::GetInstance()->OnSessionClose(DmAdapter::GetInstance().GetDeviceInfo(key).networkId);
297 }
298 return conn.empty();
299 });
300 Time next = INVALID_NEXT;
301 lock_guard<decltype(taskMutex_)> lg(taskMutex_);
302 connects_.ForEach([&next](const auto &key, auto &connects) -> bool {
303 for (auto conn : connects) {
304 if (conn == nullptr) {
305 continue;
306 }
307 auto expireTime = conn->GetExpireTime();
308 if (expireTime < next) {
309 next = expireTime;
310 }
311 }
312 return false;
313 });
314 if (next == INVALID_NEXT) {
315 taskId_ = ExecutorPool::INVALID_TASK_ID;
316 return;
317 }
318 taskId_ = Context::GetInstance().GetThreadPool()->Schedule(
319 next > now ? next - now : ExecutorPool::INVALID_DELAY, GetCloseSessionTask());
320 next_ = next;
321 };
322 }
323
GetMtuSize(const DeviceId & deviceId)324 uint32_t SoftBusAdapter::GetMtuSize(const DeviceId &deviceId)
325 {
326 uint32_t mtuSize = DEFAULT_MTU_SIZE;
327 connects_.ComputeIfPresent(deviceId.deviceId, [&mtuSize](auto, auto &connects) {
328 uint32_t mtu = 0;
329 for (auto conn : connects) {
330 if (conn == nullptr) {
331 continue;
332 }
333 if (mtu < conn->GetMtuSize()) {
334 mtu = conn->GetMtuSize();
335 }
336 }
337 if (mtu != 0) {
338 mtuSize = mtu;
339 }
340 return true;
341 });
342 return mtuSize;
343 }
344
GetTimeout(const DeviceId & deviceId)345 uint32_t SoftBusAdapter::GetTimeout(const DeviceId &deviceId)
346 {
347 return DEFAULT_TIMEOUT;
348 }
349
DelConnect(int32_t socket,bool isForce)350 std::string SoftBusAdapter::DelConnect(int32_t socket, bool isForce)
351 {
352 std::string name;
353 std::set<std::string> closedConnect;
354 connects_.EraseIf([socket, isForce, &name, &closedConnect](const auto &deviceId, auto &connects) -> bool {
355 if (!isForce && DmAdapter::GetInstance().IsOHOSType(deviceId)) {
356 return false;
357 }
358 std::string networkId;
359 for (auto iter = connects.begin(); iter != connects.end();) {
360 if (*iter != nullptr && **iter == socket) {
361 name += deviceId;
362 name += " ";
363 networkId = (*iter)->GetNetworkId();
364 iter = connects.erase(iter);
365 } else {
366 iter++;
367 }
368 }
369 if (connects.empty() && !networkId.empty()) {
370 closedConnect.insert(std::move(networkId));
371 return true;
372 }
373 return false;
374 });
375 for (const auto &networkId : closedConnect) {
376 ConnectManager::GetInstance()->OnSessionClose(networkId);
377 }
378 return name;
379 }
380
OnClientShutdown(int32_t socket,bool isForce)381 std::string SoftBusAdapter::OnClientShutdown(int32_t socket, bool isForce)
382 {
383 return DelConnect(socket, isForce);
384 }
385
IsSameStartedOnPeer(const struct PipeInfo & pipeInfo,const struct DeviceId & peer)386 bool SoftBusAdapter::IsSameStartedOnPeer(const struct PipeInfo &pipeInfo,
387 __attribute__((unused)) const struct DeviceId &peer)
388 {
389 ZLOGI("pipeInfo:%{public}s deviceId:%{public}s", pipeInfo.pipeId.c_str(),
390 KvStoreUtils::ToBeAnonymous(peer.deviceId).c_str());
391 return true;
392 }
393
SetMessageTransFlag(const PipeInfo & pipeInfo,bool flag)394 void SoftBusAdapter::SetMessageTransFlag(const PipeInfo &pipeInfo, bool flag)
395 {
396 ZLOGI("pipeInfo: %s flag: %d", pipeInfo.pipeId.c_str(), flag);
397 flag_ = flag;
398 }
399
CreateSessionServerAdapter(const std::string & sessionName)400 int SoftBusAdapter::CreateSessionServerAdapter(const std::string &sessionName)
401 {
402 ZLOGD("begin");
403 SocketInfo socketInfo;
404 std::string sessionServerName = sessionName;
405 socketInfo.name = const_cast<char *>(sessionServerName.c_str());
406 std::string pkgName = "ohos.distributeddata";
407 socketInfo.pkgName = pkgName.data();
408 socket_ = Socket(socketInfo);
409 return Listen(socket_, Qos, QOS_COUNT, &serverListener_);
410 }
411
RemoveSessionServerAdapter(const std::string & sessionName) const412 int SoftBusAdapter::RemoveSessionServerAdapter(const std::string &sessionName) const
413 {
414 ZLOGD("begin");
415 Shutdown(socket_);
416 return 0;
417 }
418
NotifyDataListeners(const uint8_t * data,int size,const std::string & deviceId,const PipeInfo & pipeInfo)419 void SoftBusAdapter::NotifyDataListeners(const uint8_t *data, int size, const std::string &deviceId,
420 const PipeInfo &pipeInfo)
421 {
422 ZLOGD("begin");
423 auto ret = dataChangeListeners_.ComputeIfPresent(pipeInfo.pipeId,
424 [&data, &size, &deviceId, &pipeInfo](const auto &key, const AppDataChangeListener *&value) {
425 ZLOGD("ready to notify, pipeName:%{public}s, deviceId:%{public}s.", pipeInfo.pipeId.c_str(),
426 KvStoreUtils::ToBeAnonymous(deviceId).c_str());
427 DeviceInfo deviceInfo = DmAdapter::GetInstance().GetDeviceInfo(deviceId);
428 value->OnMessage(deviceInfo, data, size, pipeInfo);
429 TrafficStat ts{ pipeInfo.pipeId, deviceId, 0, size };
430 Reporter::GetInstance()->TrafficStatistic()->Report(ts);
431 return true;
432 });
433 if (!ret) {
434 ZLOGW("no listener %{public}s.", pipeInfo.pipeId.c_str());
435 }
436 }
437
Broadcast(const PipeInfo & pipeInfo,const LevelInfo & levelInfo)438 Status SoftBusAdapter::Broadcast(const PipeInfo &pipeInfo, const LevelInfo &levelInfo)
439 {
440 DataLevel level = {
441 .dynamicLevel = levelInfo.dynamic,
442 .staticLevel = levelInfo.statics,
443 .switchLevel = levelInfo.switches,
444 .switchLength = levelInfo.switchesLen,
445 };
446 auto status = SetDataLevel(&level);
447 if (status == SOFTBUS_FUNC_NOT_SUPPORT) {
448 return Status::NOT_SUPPORT_BROADCAST;
449 }
450 return status ? Status::ERROR : Status::SUCCESS;
451 }
452
OnBroadcast(const DeviceId & device,const LevelInfo & levelInfo)453 void SoftBusAdapter::OnBroadcast(const DeviceId &device, const LevelInfo &levelInfo)
454 {
455 ZLOGI("device:%{public}s", KvStoreUtils::ToBeAnonymous(device.deviceId).c_str());
456 if (!onBroadcast_) {
457 ZLOGW("no listener device:%{public}s", KvStoreUtils::ToBeAnonymous(device.deviceId).c_str());
458 return;
459 }
460 onBroadcast_(device.deviceId, levelInfo);
461 }
462
ListenBroadcastMsg(const PipeInfo & pipeInfo,std::function<void (const std::string &,const LevelInfo &)> listener)463 int32_t SoftBusAdapter::ListenBroadcastMsg(const PipeInfo &pipeInfo,
464 std::function<void(const std::string &, const LevelInfo &)> listener)
465 {
466 if (onBroadcast_) {
467 return SOFTBUS_ALREADY_EXISTED;
468 }
469 onBroadcast_ = std::move(listener);
470 return RegDataLevelChangeCb(pipeInfo.pipeId.c_str(), &g_callback);
471 }
472
SetDataHandler(SoftBusAdapter * handler)473 void AppDataListenerWrap::SetDataHandler(SoftBusAdapter *handler)
474 {
475 ZLOGI("begin");
476 softBusAdapter_ = handler;
477 }
478
OnClientShutdown(int32_t socket,ShutdownReason reason)479 void AppDataListenerWrap::OnClientShutdown(int32_t socket, ShutdownReason reason)
480 {
481 // when the local close the session, this callback function will not be triggered;
482 // when the current function is called, soft bus has released the session resource, only connId is valid;
483 std::string name = softBusAdapter_->OnClientShutdown(socket);
484 ZLOGI("[shutdown] socket:%{public}d, name:%{public}s", socket, KvStoreUtils::ToBeAnonymous(name).c_str());
485 }
486
OnClientBytesReceived(int32_t socket,const void * data,uint32_t dataLen)487 void AppDataListenerWrap::OnClientBytesReceived(int32_t socket, const void *data, uint32_t dataLen) {}
488
OnClientSocketChanged(int32_t socket,QoSEvent eventId,const QosTV * qos,uint32_t qosCount)489 void AppDataListenerWrap::OnClientSocketChanged(int32_t socket, QoSEvent eventId, const QosTV *qos, uint32_t qosCount)
490 {
491 if (eventId == QoSEvent::QOS_SATISFIED && qos != nullptr && qos[0].qos == QOS_TYPE_MIN_BW && qosCount == 1) {
492 auto name = softBusAdapter_->OnClientShutdown(socket, false);
493 ZLOGI("[SocketChanged] socket:%{public}d, name:%{public}s", socket, KvStoreUtils::ToBeAnonymous(name).c_str());
494 }
495 }
496
OnServerBind(int32_t socket,PeerSocketInfo info)497 void AppDataListenerWrap::OnServerBind(int32_t socket, PeerSocketInfo info)
498 {
499 softBusAdapter_->OnBind(socket, info);
500 std::string peerDevUuid = DmAdapter::GetInstance().GetUuidByNetworkId(std::string(info.networkId));
501
502 ZLOGI("[OnServerBind] socket:%{public}d, peer name:%{public}s, peer devId:%{public}s", socket, info.name,
503 KvStoreUtils::ToBeAnonymous(peerDevUuid).c_str());
504 }
505
OnServerShutdown(int32_t socket,ShutdownReason reason)506 void AppDataListenerWrap::OnServerShutdown(int32_t socket, ShutdownReason reason)
507 {
508 softBusAdapter_->OnServerShutdown(socket);
509 ZLOGI("Shut down reason:%{public}d socket id:%{public}d", reason, socket);
510 }
511
OnServerBytesReceived(int32_t socket,const void * data,uint32_t dataLen)512 void AppDataListenerWrap::OnServerBytesReceived(int32_t socket, const void *data, uint32_t dataLen)
513 {
514 SoftBusAdapter::ServerSocketInfo info;
515 if (!softBusAdapter_->GetPeerSocketInfo(socket, info)) {
516 ZLOGE("Get peer socket info failed, socket id %{public}d", socket);
517 return;
518 };
519 std::string peerDevUuid = DmAdapter::GetInstance().GetUuidByNetworkId(std::string(info.networkId));
520 ZLOGD("[OnBytesReceived] socket:%{public}d, peer name:%{public}s, peer devId:%{public}s, data len:%{public}u",
521 socket, info.name.c_str(), KvStoreUtils::ToBeAnonymous(peerDevUuid).c_str(), dataLen);
522
523 std::string pipeId = GetPipeId(info.name);
524 if (pipeId.empty()) {
525 ZLOGE("pipId is invalid");
526 return;
527 }
528
529 NotifyDataListeners(reinterpret_cast<const uint8_t *>(data), dataLen, peerDevUuid, { pipeId, "" });
530 }
531
GetPipeId(const std::string & name)532 std::string AppDataListenerWrap::GetPipeId(const std::string &name)
533 {
534 auto pos = name.find('_');
535 if (pos != std::string::npos) {
536 return name.substr(0, pos);
537 }
538 return name;
539 }
540
NotifyDataListeners(const uint8_t * data,const int size,const std::string & deviceId,const PipeInfo & pipeInfo)541 void AppDataListenerWrap::NotifyDataListeners(const uint8_t *data, const int size, const std::string &deviceId,
542 const PipeInfo &pipeInfo)
543 {
544 softBusAdapter_->NotifyDataListeners(data, size, deviceId, pipeInfo);
545 }
546
GetPeerSocketInfo(int32_t socket,ServerSocketInfo & info)547 bool SoftBusAdapter::GetPeerSocketInfo(int32_t socket, ServerSocketInfo &info)
548 {
549 auto it = peerSocketInfos_.Find(socket);
550 if (it.first) {
551 info = it.second;
552 return true;
553 }
554 return false;
555 }
556
OnBind(int32_t socket,PeerSocketInfo info)557 void SoftBusAdapter::OnBind(int32_t socket, PeerSocketInfo info)
558 {
559 ServerSocketInfo socketInfo;
560 socketInfo.name = info.name;
561 socketInfo.networkId = info.networkId;
562 socketInfo.pkgName = info.pkgName;
563 peerSocketInfos_.Insert(socket, socketInfo);
564 }
565
OnServerShutdown(int32_t socket)566 void SoftBusAdapter::OnServerShutdown(int32_t socket)
567 {
568 peerSocketInfos_.Erase(socket);
569 }
570
OnDeviceChanged(const AppDistributedKv::DeviceInfo & info,const AppDistributedKv::DeviceChangeType & type) const571 void SoftBusAdapter::OnDeviceChanged(const AppDistributedKv::DeviceInfo &info,
572 const AppDistributedKv::DeviceChangeType &type) const
573 {
574 return;
575 }
576
CloseSession(const std::string & networkId)577 bool SoftBusAdapter::CloseSession(const std::string &networkId)
578 {
579 auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
580 auto ret = connects_.Erase(uuid);
581 if (ret != 0) {
582 ConnectManager::GetInstance()->OnSessionClose(networkId);
583 }
584 return ret != 0;
585 }
586
ReuseConnect(const PipeInfo & pipeInfo,const DeviceId & deviceId)587 Status SoftBusAdapter::ReuseConnect(const PipeInfo &pipeInfo, const DeviceId &deviceId)
588 {
589 bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId.deviceId);
590 if (!isOHOSType) {
591 return Status::NOT_SUPPORT;
592 }
593 uint32_t qosType = SoftBusClient::QOS_HML;
594 std::shared_ptr<SoftBusClient> conn = GetConnect(pipeInfo, deviceId, qosType);
595 if (conn == nullptr) {
596 return Status::ERROR;
597 }
598 auto status = conn->ReuseConnect(&clientListener_);
599 if (status != Status::SUCCESS) {
600 return status;
601 }
602 // Avoid being cleared by scheduled tasks
603 connects_.Compute(deviceId.deviceId, [&conn, qosType](const auto &key,
604 std::vector<std::shared_ptr<SoftBusClient>> &connects) -> bool {
605 for (auto &connect : connects) {
606 if (connect == nullptr) {
607 continue;
608 }
609 if (connect->GetQoSType() == qosType) {
610 return true;
611 }
612 }
613 connects.emplace_back(conn);
614 return true;
615 });
616 StartCloseSessionTask(deviceId.deviceId);
617 return Status::SUCCESS;
618 }
619 } // namespace AppDistributedKv
620 } // namespace OHOS