• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <logger.h>
17 
18 #include <mutex>
19 #include <thread>
20 
21 #include "dev_manager.h"
22 #include "kv_store_delegate_manager.h"
23 #include "process_communicator_impl.h"
24 #include "securec.h"
25 #include "session.h"
26 #include "softbus_adapter.h"
27 #include "softbus_bus_center.h"
28 
29 namespace OHOS {
30 namespace ObjectStore {
31 constexpr int32_t HEAD_SIZE = 3;
32 constexpr int32_t END_SIZE = 3;
33 constexpr int32_t MIN_SIZE = HEAD_SIZE + END_SIZE + 3;
34 constexpr const char *REPLACE_CHAIN = "***";
35 constexpr const char *DEFAULT_ANONYMOUS = "******";
36 constexpr int32_t SOFTBUS_OK = 0;
37 constexpr int32_t SOFTBUS_ERR = 1;
38 constexpr int32_t INVALID_SESSION_ID = -1;
39 constexpr int32_t SESSION_NAME_SIZE_MAX = 65;
40 constexpr int32_t DEVICE_ID_SIZE_MAX = 65;
41 constexpr size_t TASK_CAPACITY_MAX = 15;
42 constexpr int32_t SELF_SIDE = 1;
43 
44 using namespace std;
45 SoftBusAdapter *AppDataListenerWrap::softBusAdapter_;
46 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::instance_;
47 
SoftBusAdapter()48 SoftBusAdapter::SoftBusAdapter()
49 {
50     LOG_INFO("begin");
51     taskQueue_ = std::make_shared<TaskScheduler>(TASK_CAPACITY_MAX, "data_object");
52     AppDataListenerWrap::SetDataHandler(this);
53 
54     sessionListener_.OnSessionOpened = AppDataListenerWrap::OnSessionOpened;
55     sessionListener_.OnSessionClosed = AppDataListenerWrap::OnSessionClosed;
56     sessionListener_.OnBytesReceived = AppDataListenerWrap::OnBytesReceived;
57     sessionListener_.OnMessageReceived = AppDataListenerWrap::OnMessageReceived;
58 }
59 
~SoftBusAdapter()60 SoftBusAdapter::~SoftBusAdapter()
61 {
62     if (taskQueue_ != nullptr) {
63         taskQueue_->Clean();
64         taskQueue_ = nullptr;
65     }
66     dataCaches_.clear();
67     sessionIds_.clear();
68 }
69 
StartWatchDeviceChange(const AppDeviceStatusChangeListener * observer,const PipeInfo & pipeInfo)70 Status SoftBusAdapter::StartWatchDeviceChange(
71     const AppDeviceStatusChangeListener *observer, __attribute__((unused)) const PipeInfo &pipeInfo)
72 {
73     LOG_INFO("begin");
74     if (observer == nullptr) {
75         LOG_WARN("observer is null.");
76         return Status::ERROR;
77     }
78     std::lock_guard<std::mutex> lock(deviceChangeMutex_);
79     auto result = listeners_.insert(observer);
80     if (!result.second) {
81         LOG_WARN("Add listener error.");
82         return Status::ERROR;
83     }
84     LOG_INFO("end");
85     return Status::SUCCESS;
86 }
87 
StopWatchDeviceChange(const AppDeviceStatusChangeListener * observer,const PipeInfo & pipeInfo)88 Status SoftBusAdapter::StopWatchDeviceChange(
89     const AppDeviceStatusChangeListener *observer, __attribute__((unused)) const PipeInfo &pipeInfo)
90 {
91     LOG_INFO("begin");
92     if (observer == nullptr) {
93         LOG_WARN("observer is null.");
94         return Status::ERROR;
95     }
96     std::lock_guard<std::mutex> lock(deviceChangeMutex_);
97     auto result = listeners_.erase(observer);
98     if (result <= 0) {
99         return Status::ERROR;
100     }
101     LOG_INFO("end");
102     return Status::SUCCESS;
103 }
104 
NotifyAll(const DeviceInfo & deviceInfo,const DeviceChangeType & type)105 void SoftBusAdapter::NotifyAll(const DeviceInfo &deviceInfo, const DeviceChangeType &type)
106 {
107     std::thread th = std::thread([this, deviceInfo, type]() {
108         pthread_setname_np(pthread_self(), "Data_Object_NotifyAll");
109         std::vector<const AppDeviceStatusChangeListener *> listeners;
110         {
111             std::lock_guard<std::mutex> lock(deviceChangeMutex_);
112             for (const auto &listener : listeners_) {
113                 listeners.push_back(listener);
114             }
115         }
116         LOG_DEBUG("high");
117         std::string uuid = DevManager::GetInstance()->GetUuidByNodeId(deviceInfo.deviceId);
118         LOG_DEBUG("[Notify] to DB from: %{public}s, type:%{public}d", ToBeAnonymous(uuid).c_str(), type);
119         UpdateRelationship(deviceInfo.deviceId, type);
120         for (const auto &device : listeners) {
121             if (device == nullptr) {
122                 continue;
123             }
124             if (device->GetChangeLevelType() == ChangeLevelType::HIGH) {
125                 DeviceInfo di = { uuid, deviceInfo.deviceName, deviceInfo.deviceType };
126                 device->OnDeviceChanged(di, type);
127                 break;
128             }
129         }
130         LOG_DEBUG("low");
131         for (const auto &device : listeners) {
132             if (device == nullptr) {
133                 continue;
134             }
135             if (device->GetChangeLevelType() == ChangeLevelType::LOW) {
136                 DeviceInfo di = { uuid, deviceInfo.deviceName, deviceInfo.deviceType };
137                 device->OnDeviceChanged(di, DeviceChangeType::DEVICE_OFFLINE);
138                 device->OnDeviceChanged(di, type);
139             }
140         }
141         LOG_DEBUG("min");
142         for (const auto &device : listeners) {
143             if (device == nullptr) {
144                 continue;
145             }
146             if (device->GetChangeLevelType() == ChangeLevelType::MIN) {
147                 DeviceInfo di = { uuid, deviceInfo.deviceName, deviceInfo.deviceType };
148                 device->OnDeviceChanged(di, type);
149             }
150         }
151     });
152     th.detach();
153 }
154 
GetDeviceList() const155 std::vector<DeviceInfo> SoftBusAdapter::GetDeviceList() const
156 {
157     std::vector<DeviceInfo> dis;
158     NodeBasicInfo *info = nullptr;
159     int32_t infoNum = 0;
160     dis.clear();
161 
162     int32_t ret = GetAllNodeDeviceInfo("ohos.objectstore", &info, &infoNum);
163     if (ret != SOFTBUS_OK) {
164         LOG_ERROR("GetAllNodeDeviceInfo error");
165         return dis;
166     }
167     LOG_INFO("GetAllNodeDeviceInfo success infoNum=%{public}d", infoNum);
168 
169     for (int i = 0; i < infoNum; i++) {
170         std::string uuid = DevManager::GetInstance()->GetUuidByNodeId(std::string(info[i].networkId));
171         DeviceInfo deviceInfo = { uuid, std::string(info[i].deviceName), std::to_string(info[i].deviceTypeId) };
172         dis.push_back(deviceInfo);
173     }
174     if (info != nullptr) {
175         FreeNodeInfo(info);
176     }
177     return dis;
178 }
179 
GetLocalDevice()180 DeviceInfo SoftBusAdapter::GetLocalDevice()
181 {
182     if (!localInfo_.deviceId.empty()) {
183         return localInfo_;
184     }
185 
186     NodeBasicInfo info;
187     int32_t ret = GetLocalNodeDeviceInfo("ohos.objectstore", &info);
188     if (ret != SOFTBUS_OK) {
189         LOG_ERROR("GetLocalNodeDeviceInfo error");
190         return DeviceInfo();
191     }
192     std::string uuid = DevManager::GetInstance()->GetUuidByNodeId(std::string(info.networkId));
193     LOG_DEBUG("[LocalDevice] id:%{private}s, name:%{private}s, type:%{private}d", ToBeAnonymous(uuid).c_str(),
194         info.deviceName, info.deviceTypeId);
195     localInfo_ = { uuid, std::string(info.deviceName), std::to_string(info.deviceTypeId) };
196     return localInfo_;
197 }
198 
GetLocalBasicInfo() const199 DeviceInfo SoftBusAdapter::GetLocalBasicInfo() const
200 {
201     LOG_DEBUG("begin");
202     NodeBasicInfo info;
203     int32_t ret = GetLocalNodeDeviceInfo("ohos.objectstore", &info);
204     if (ret != SOFTBUS_OK) {
205         LOG_ERROR("GetLocalNodeDeviceInfo error");
206         return DeviceInfo();
207     }
208     LOG_DEBUG("[LocalBasicInfo] networkId:%{private}s, name:%{private}s, "
209               "type:%{private}d",
210         ToBeAnonymous(std::string(info.networkId)).c_str(), info.deviceName, info.deviceTypeId);
211     DeviceInfo localInfo = { std::string(info.networkId), std::string(info.deviceName),
212         std::to_string(info.deviceTypeId) };
213     return localInfo;
214 }
215 
GetRemoteNodesBasicInfo() const216 std::vector<DeviceInfo> SoftBusAdapter::GetRemoteNodesBasicInfo() const
217 {
218     LOG_DEBUG("begin");
219     std::vector<DeviceInfo> dis;
220     NodeBasicInfo *info = nullptr;
221     int32_t infoNum = 0;
222     dis.clear();
223 
224     int32_t ret = GetAllNodeDeviceInfo("ohos.objectstore", &info, &infoNum);
225     if (ret != SOFTBUS_OK) {
226         LOG_ERROR("GetAllNodeDeviceInfo error");
227         return dis;
228     }
229     LOG_DEBUG("GetAllNodeDeviceInfo success infoNum=%{public}d", infoNum);
230 
231     for (int i = 0; i < infoNum; i++) {
232         dis.push_back(
233             { std::string(info[i].networkId), std::string(info[i].deviceName), std::to_string(info[i].deviceTypeId) });
234     }
235     if (info != nullptr) {
236         FreeNodeInfo(info);
237     }
238     return dis;
239 }
240 
UpdateRelationship(const std::string & networkid,const DeviceChangeType & type)241 void SoftBusAdapter::UpdateRelationship(const std::string &networkid, const DeviceChangeType &type)
242 {
243     auto uuid = DevManager::GetInstance()->GetUuidByNodeId(networkid);
244     lock_guard<mutex> lock(networkMutex_);
245     switch (type) {
246         case DeviceChangeType::DEVICE_OFFLINE: {
247             auto size = this->networkId2Uuid_.erase(networkid);
248             if (size == 0) {
249                 LOG_WARN("not found id:%{public}s.", networkid.c_str());
250             }
251             break;
252         }
253         case DeviceChangeType::DEVICE_ONLINE: {
254             std::pair<std::string, std::string> value = { networkid, uuid };
255             auto res = this->networkId2Uuid_.insert(std::move(value));
256             if (!res.second) {
257                 LOG_WARN("insert failed.");
258             }
259             break;
260         }
261         default: {
262             LOG_WARN("unknown type.");
263             break;
264         }
265     }
266 }
ToNodeID(const std::string & nodeId) const267 std::string SoftBusAdapter::ToNodeID(const std::string &nodeId) const
268 {
269     {
270         lock_guard<mutex> lock(networkMutex_);
271         for (auto const &e : networkId2Uuid_) {
272             if (nodeId == e.second) { // id is uuid
273                 return e.first;
274             }
275         }
276     }
277 
278     LOG_WARN("get the network id from devices.");
279     std::vector<DeviceInfo> devices;
280     NodeBasicInfo *info = nullptr;
281     int32_t infoNum = 0;
282     std::string networkId;
283     int32_t ret = GetAllNodeDeviceInfo("ohos.objectstore", &info, &infoNum);
284     if (ret == SOFTBUS_OK) {
285         lock_guard<mutex> lock(networkMutex_);
286         for (int i = 0; i < infoNum; i++) {
287             if (networkId2Uuid_.find(info[i].networkId) != networkId2Uuid_.end()) {
288                 continue;
289             }
290             auto uuid = DevManager::GetInstance()->GetUuidByNodeId(std::string(info[i].networkId));
291             networkId2Uuid_.insert({ info[i].networkId, uuid });
292             if (uuid == nodeId) {
293                 networkId = info[i].networkId;
294             }
295         }
296     }
297     if (info != nullptr) {
298         FreeNodeInfo(info);
299     }
300     return networkId;
301 }
302 
ToBeAnonymous(const std::string & name)303 std::string SoftBusAdapter::ToBeAnonymous(const std::string &name)
304 {
305     if (name.length() <= HEAD_SIZE) {
306         return DEFAULT_ANONYMOUS;
307     }
308 
309     if (name.length() < MIN_SIZE) {
310         return (name.substr(0, HEAD_SIZE) + REPLACE_CHAIN);
311     }
312 
313     return (name.substr(0, HEAD_SIZE) + REPLACE_CHAIN + name.substr(name.length() - END_SIZE, END_SIZE));
314 }
315 
GetInstance()316 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::GetInstance()
317 {
318     static std::once_flag onceFlag;
319     std::call_once(onceFlag, [&] { instance_ = std::make_shared<SoftBusAdapter>(); });
320     return instance_;
321 }
322 
StartWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)323 Status SoftBusAdapter::StartWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo)
324 {
325     LOG_DEBUG("begin");
326     if (observer == nullptr) {
327         return Status::INVALID_ARGUMENT;
328     }
329     lock_guard<mutex> lock(dataChangeMutex_);
330     auto it = dataChangeListeners_.find(pipeInfo.pipeId);
331     if (it != dataChangeListeners_.end()) {
332         LOG_WARN("Add listener error or repeated adding.");
333         return Status::ERROR;
334     }
335     LOG_DEBUG("current appid %{public}s", pipeInfo.pipeId.c_str());
336     dataChangeListeners_.insert({ pipeInfo.pipeId, observer });
337     return Status::SUCCESS;
338 }
339 
StopWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)340 Status SoftBusAdapter::StopWatchDataChange(
341     __attribute__((unused)) const AppDataChangeListener *observer, const PipeInfo &pipeInfo)
342 {
343     LOG_DEBUG("begin");
344     lock_guard<mutex> lock(dataChangeMutex_);
345     if (dataChangeListeners_.erase(pipeInfo.pipeId)) {
346         return Status::SUCCESS;
347     }
348     LOG_WARN("stop data observer error, pipeInfo:%{public}s", pipeInfo.pipeId.c_str());
349     return Status::ERROR;
350 }
351 
CalcRecOperate(uint32_t dataSize,RouteType currentRouteType,bool & isP2P) const352 RecOperate SoftBusAdapter::CalcRecOperate(uint32_t dataSize, RouteType currentRouteType, bool &isP2P) const
353 {
354     if (currentRouteType == RouteType::WIFI_STA) {
355         return KEEP;
356     }
357     if (currentRouteType == RouteType::INVALID_ROUTE_TYPE || currentRouteType == BT_BR || currentRouteType == BT_BLE) {
358         if (dataSize > P2P_SIZE_THRESHOLD) {
359             isP2P = true;
360             return CHANGE_TO_P2P;
361         }
362     }
363     if (currentRouteType == WIFI_P2P) {
364         if (dataSize < P2P_SIZE_THRESHOLD * SWITCH_DELAY_FACTOR) {
365             return CHANGE_TO_BLUETOOTH;
366         }
367         isP2P = true;
368     }
369     return KEEP;
370 }
371 
GetSessionAttribute(bool isP2P)372 SessionAttribute SoftBusAdapter::GetSessionAttribute(bool isP2P)
373 {
374     SessionAttribute attr;
375     attr.dataType = TYPE_BYTES;
376     // If the dataType is BYTES, the default strategy is wifi_5G > wifi_2.4G > BR, without P2P;
377     if (!isP2P) {
378         return attr;
379     }
380 
381     int index = 0;
382     attr.linkType[index++] = LINK_TYPE_WIFI_WLAN_5G;
383     attr.linkType[index++] = LINK_TYPE_WIFI_WLAN_2G;
384     attr.linkType[index++] = LINK_TYPE_WIFI_P2P;
385     attr.linkType[index++] = LINK_TYPE_BR;
386     attr.linkTypeNum = index;
387     return attr;
388 }
389 
CacheData(const std::string & deviceId,const DataInfo & dataInfo)390 Status SoftBusAdapter::CacheData(const std::string &deviceId, const DataInfo &dataInfo)
391 {
392     uint8_t *data = new uint8_t[dataInfo.length];
393     if (data == nullptr) {
394         LOG_ERROR("[SendData] create buffer fail.");
395         return Status::INVALID_ARGUMENT;
396     }
397     if (memcpy_s(data, dataInfo.length, dataInfo.data, dataInfo.length) != EOK) {
398         LOG_ERROR("[SendData] memcpy_s fail.");
399         delete[] data;
400         return Status::INVALID_ARGUMENT;
401     }
402     BytesMsg bytesMsg = { data, dataInfo.length };
403     lock_guard<mutex> lock(deviceDataLock_);
404     auto deviceIdData = dataCaches_.find(deviceId);
405     if (deviceIdData == dataCaches_.end()) {
406         dataCaches_[deviceId] = { bytesMsg };
407     } else {
408         deviceIdData->second.push_back(bytesMsg);
409         if (deviceIdData->second.size() > VECTOR_SIZE_THRESHOLD) {
410             deviceIdData->second.erase(deviceIdData->second.begin());
411         }
412     }
413     return Status::SUCCESS;
414 }
415 
GetSize(const std::string & deviceId)416 uint32_t SoftBusAdapter::GetSize(const std::string &deviceId)
417 {
418     uint32_t dataSize = 0;
419     lock_guard<mutex> lock(deviceDataLock_);
420     auto it = dataCaches_.find(deviceId);
421     if (it == dataCaches_.end() || it->second.empty()) {
422         return dataSize;
423     }
424     for (const auto &msg : it->second) {
425         dataSize += msg.size;
426     }
427     return dataSize;
428 }
429 
GetRouteType(int sessionId)430 RouteType SoftBusAdapter::GetRouteType(int sessionId)
431 {
432     RouteType routeType = RouteType::INVALID_ROUTE_TYPE;
433     int ret = GetSessionOption(sessionId, SESSION_OPTION_LINK_TYPE, &routeType, sizeof(routeType));
434     if (ret != SOFTBUS_OK) {
435         LOG_ERROR("getSessionOption failed, ret is %{public}d.", ret);
436     }
437     return routeType;
438 }
439 
SendData(const PipeInfo & pipeInfo,const DeviceId & deviceId,const DataInfo & dataInfo,uint32_t totalLength,const MessageInfo & info)440 Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
441     uint32_t totalLength, const MessageInfo &info)
442 {
443     lock_guard<mutex> lock(sendDataMutex_);
444     auto result = CacheData(deviceId.deviceId, dataInfo);
445     if (result != Status::SUCCESS) {
446         return result;
447     }
448     uint32_t dataSize = GetSize(deviceId.deviceId) + totalLength;
449     int sessionId = GetSessionId(deviceId.deviceId);
450     bool isP2P = false;
451     RouteType currentRouteType = GetRouteType(sessionId);
452     RecOperate recOperate = CalcRecOperate(dataSize, currentRouteType, isP2P);
453     if (recOperate != KEEP && sessionId != INVALID_SESSION_ID) {
454         LOG_INFO("close session : %{public}d, currentRouteType : %{public}d, recOperate : %{public}d", sessionId,
455             currentRouteType, recOperate);
456         CloseSession(sessionId);
457     }
458     SessionAttribute attr = GetSessionAttribute(isP2P);
459     OpenSession(pipeInfo.pipeId.c_str(), pipeInfo.pipeId.c_str(), ToNodeID(deviceId.deviceId).c_str(),
460         "GROUP_ID", &attr);
461     return Status::SUCCESS;
462 }
463 
CacheSession(int32_t sessionId)464 void SoftBusAdapter::CacheSession(int32_t sessionId)
465 {
466     std::string deviceId;
467     if (GetDeviceId(sessionId, deviceId) == SOFTBUS_OK) {
468         lock_guard<mutex> lock(deviceSessionLock_);
469         sessionIds_[deviceId] = sessionId;
470     }
471 }
472 
DoSend()473 void SoftBusAdapter::DoSend()
474 {
475     auto task([this]() {
476         lock_guard<mutex> lockSession(deviceSessionLock_);
477         for (auto &it : sessionIds_) {
478             if (it.second == INVALID_SESSION_ID) {
479                 continue;
480             }
481             lock_guard<mutex> lock(deviceDataLock_);
482             auto dataCache = dataCaches_.find(it.first);
483             if (dataCache == dataCaches_.end()) {
484                 continue;
485             }
486             for (auto msg = dataCache->second.begin(); msg != dataCache->second.end();) {
487                 auto &byteMsg = *msg;
488                 int32_t ret = SendBytes(it.second, byteMsg.ptr, byteMsg.size);
489                 if (ret != SOFTBUS_OK) {
490                     LOG_ERROR("[SendBytes] to %{public}d failed, ret:%{public}d.", it.second, ret);
491                 }
492                 LOG_DEBUG("[SendBytes] delete msgPtr.");
493                 delete[] byteMsg.ptr;
494                 msg = dataCache->second.erase(msg);
495             }
496         }
497     });
498     taskQueue_->Execute(std::move(task));
499 }
500 
GetDeviceId(int sessionId,std::string & deviceId)501 int SoftBusAdapter::GetDeviceId(int sessionId, std::string &deviceId)
502 {
503     char peerDevId[DEVICE_ID_SIZE_MAX] = "";
504     int ret = GetPeerDeviceId(sessionId, peerDevId, sizeof(peerDevId));
505     if (ret != SOFTBUS_OK) {
506         LOG_ERROR("get peerDeviceId failed, sessionId :%{public}d", sessionId);
507         return ret;
508     }
509     lock_guard<mutex> lock(networkMutex_);
510     auto it = networkId2Uuid_.find(std::string(peerDevId));
511     if (it != networkId2Uuid_.end()) {
512         deviceId = it->second;
513     }
514     return ret;
515 }
516 
GetSessionId(const std::string & deviceId)517 int SoftBusAdapter::GetSessionId(const std::string &deviceId)
518 {
519     lock_guard<mutex> lock(deviceSessionLock_);
520     auto it = sessionIds_.find(deviceId);
521     if (it != sessionIds_.end()) {
522         return it->second;
523     }
524     return INVALID_SESSION_ID;
525 }
526 
OnSessionOpen(int32_t sessionId,int32_t status)527 void SoftBusAdapter::OnSessionOpen(int32_t sessionId, int32_t status)
528 {
529     if (status != SOFTBUS_OK) {
530         LOG_DEBUG("[OnSessionOpen] OpenSession failed");
531         return;
532     }
533     if (GetSessionSide(sessionId) != SELF_SIDE) {
534         return;
535     }
536     CacheSession(sessionId);
537     DoSend();
538 }
539 
OnSessionClose(int32_t sessionId)540 void SoftBusAdapter::OnSessionClose(int32_t sessionId)
541 {
542     std::string deviceId;
543     if (GetSessionSide(sessionId) != SELF_SIDE) {
544         return;
545     }
546     if (GetDeviceId(sessionId, deviceId) == SOFTBUS_OK) {
547         lock_guard<mutex> lock(deviceSessionLock_);
548         if (sessionIds_.find(deviceId) != sessionIds_.end()) {
549             sessionIds_.erase(deviceId);
550         }
551     }
552     DoSend();
553 }
554 
IsSameStartedOnPeer(const struct PipeInfo & pipeInfo,const struct DeviceId & peer)555 bool SoftBusAdapter::IsSameStartedOnPeer(
556     const struct PipeInfo &pipeInfo, __attribute__((unused)) const struct DeviceId &peer)
557 {
558     LOG_INFO(
559         "pipeInfo:%{public}s peer.deviceId:%{public}s", pipeInfo.pipeId.c_str(), ToBeAnonymous(peer.deviceId).c_str());
560     {
561         lock_guard<mutex> lock(busSessionMutex_);
562         if (busSessionMap_.find(pipeInfo.pipeId + peer.deviceId) != busSessionMap_.end()) {
563             LOG_INFO("Found session in map. Return true.");
564             return true;
565         }
566     }
567     SessionAttribute attr;
568     attr.dataType = TYPE_BYTES;
569     int sessionId = OpenSession(
570         pipeInfo.pipeId.c_str(), pipeInfo.pipeId.c_str(), ToNodeID(peer.deviceId).c_str(), "GROUP_ID", &attr);
571     LOG_INFO("[IsSameStartedOnPeer] sessionId=%{public}d", sessionId);
572     if (sessionId == INVALID_SESSION_ID) {
573         LOG_ERROR("OpenSession return null, pipeInfo:%{public}s. Return false.", pipeInfo.pipeId.c_str());
574         return false;
575     }
576     LOG_INFO("session started, pipeInfo:%{public}s. sessionId:%{public}d Return "
577              "true. ",
578         pipeInfo.pipeId.c_str(), sessionId);
579     return true;
580 }
581 
SetMessageTransFlag(const PipeInfo & pipeInfo,bool flag)582 void SoftBusAdapter::SetMessageTransFlag(const PipeInfo &pipeInfo, bool flag)
583 {
584     LOG_INFO("pipeInfo: %{public}s flag: %{public}d", pipeInfo.pipeId.c_str(), static_cast<bool>(flag));
585     flag_ = flag;
586 }
587 
CreateSessionServerAdapter(const std::string & sessionName)588 int SoftBusAdapter::CreateSessionServerAdapter(const std::string &sessionName)
589 {
590     LOG_DEBUG("begin");
591     return CreateSessionServer("ohos.objectstore", sessionName.c_str(), &sessionListener_);
592 }
593 
RemoveSessionServerAdapter(const std::string & sessionName) const594 int SoftBusAdapter::RemoveSessionServerAdapter(const std::string &sessionName) const
595 {
596     LOG_DEBUG("begin");
597     return RemoveSessionServer("ohos.objectstore", sessionName.c_str());
598 }
599 
InsertSession(const std::string & sessionName)600 void SoftBusAdapter::InsertSession(const std::string &sessionName)
601 {
602     lock_guard<mutex> lock(busSessionMutex_);
603     busSessionMap_.insert({ sessionName, true });
604 }
605 
DeleteSession(const std::string & sessionName)606 void SoftBusAdapter::DeleteSession(const std::string &sessionName)
607 {
608     lock_guard<mutex> lock(busSessionMutex_);
609     busSessionMap_.erase(sessionName);
610 }
611 
NotifyDataListeners(const uint8_t * ptr,const int size,const std::string & deviceId,const PipeInfo & pipeInfo)612 void SoftBusAdapter::NotifyDataListeners(
613     const uint8_t *ptr, const int size, const std::string &deviceId, const PipeInfo &pipeInfo)
614 {
615     LOG_DEBUG("begin");
616     lock_guard<mutex> lock(dataChangeMutex_);
617     auto it = dataChangeListeners_.find(pipeInfo.pipeId);
618     if (it != dataChangeListeners_.end()) {
619         LOG_DEBUG("ready to notify, pipeName:%{public}s, deviceId:%{public}s.", pipeInfo.pipeId.c_str(),
620             ToBeAnonymous(deviceId).c_str());
621         DeviceInfo deviceInfo = { deviceId, "", "" };
622         it->second->OnMessage(deviceInfo, ptr, size, pipeInfo);
623         return;
624     }
625     LOG_WARN("no listener %{public}s.", pipeInfo.pipeId.c_str());
626 }
627 
SetDataHandler(SoftBusAdapter * handler)628 void AppDataListenerWrap::SetDataHandler(SoftBusAdapter *handler)
629 {
630     LOG_INFO("begin");
631     softBusAdapter_ = handler;
632 }
633 
OnSessionOpened(int sessionId,int result)634 int AppDataListenerWrap::OnSessionOpened(int sessionId, int result)
635 {
636     LOG_INFO("[SessionOpen] sessionId:%{public}d, result:%{public}d", sessionId, result);
637     char mySessionName[SESSION_NAME_SIZE_MAX] = "";
638     char peerSessionName[SESSION_NAME_SIZE_MAX] = "";
639     char peerDevId[DEVICE_ID_SIZE_MAX] = "";
640     softBusAdapter_->OnSessionOpen(sessionId, result);
641     if (result != SOFTBUS_OK) {
642         LOG_WARN("session %{public}d open failed, result:%{public}d.", sessionId, result);
643         return result;
644     }
645     int ret = GetMySessionName(sessionId, mySessionName, sizeof(mySessionName));
646     if (ret != SOFTBUS_OK) {
647         LOG_WARN("get my session name failed, session id is %{public}d.", sessionId);
648         return SOFTBUS_ERR;
649     }
650     ret = GetPeerSessionName(sessionId, peerSessionName, sizeof(peerSessionName));
651     if (ret != SOFTBUS_OK) {
652         LOG_WARN("get my peer session name failed, session id is %{public}d.", sessionId);
653         return SOFTBUS_ERR;
654     }
655     ret = GetPeerDeviceId(sessionId, peerDevId, sizeof(peerDevId));
656     if (ret != SOFTBUS_OK) {
657         LOG_WARN("get my peer device id failed, session id is %{public}d.", sessionId);
658         return SOFTBUS_ERR;
659     }
660     std::string peerUuid = DevManager::GetInstance()->GetUuidByNodeId(std::string(peerDevId));
661     LOG_DEBUG("[SessionOpen] mySessionName:%{public}s, "
662               "peerSessionName:%{public}s, peerDevId:%{public}s",
663         mySessionName, peerSessionName, SoftBusAdapter::ToBeAnonymous(peerUuid).c_str());
664 
665     if (strlen(peerSessionName) < 1) {
666         softBusAdapter_->InsertSession(std::string(mySessionName) + peerUuid);
667     } else {
668         softBusAdapter_->InsertSession(std::string(peerSessionName) + peerUuid);
669     }
670     return 0;
671 }
672 
OnSessionClosed(int sessionId)673 void AppDataListenerWrap::OnSessionClosed(int sessionId)
674 {
675     LOG_INFO("[SessionClosed] sessionId:%{public}d", sessionId);
676     char mySessionName[SESSION_NAME_SIZE_MAX] = "";
677     char peerSessionName[SESSION_NAME_SIZE_MAX] = "";
678     char peerDevId[DEVICE_ID_SIZE_MAX] = "";
679 
680     softBusAdapter_->OnSessionClose(sessionId);
681     int ret = GetMySessionName(sessionId, mySessionName, sizeof(mySessionName));
682     if (ret != SOFTBUS_OK) {
683         LOG_WARN("get my session name failed, session id is %{public}d.", sessionId);
684         return;
685     }
686     ret = GetPeerSessionName(sessionId, peerSessionName, sizeof(peerSessionName));
687     if (ret != SOFTBUS_OK) {
688         LOG_WARN("get my peer session name failed, session id is %{public}d.", sessionId);
689         return;
690     }
691     ret = GetPeerDeviceId(sessionId, peerDevId, sizeof(peerDevId));
692     if (ret != SOFTBUS_OK) {
693         LOG_WARN("get my peer device id failed, session id is %{public}d.", sessionId);
694         return;
695     }
696     std::string peerUuid = DevManager::GetInstance()->GetUuidByNodeId(std::string(peerDevId));
697     LOG_DEBUG("[SessionClosed] mySessionName:%{public}s, "
698               "peerSessionName:%{public}s, peerDevId:%{public}s",
699         mySessionName, peerSessionName, SoftBusAdapter::ToBeAnonymous(peerUuid).c_str());
700     if (strlen(peerSessionName) < 1) {
701         softBusAdapter_->DeleteSession(std::string(mySessionName) + peerUuid);
702     } else {
703         softBusAdapter_->DeleteSession(std::string(peerSessionName) + peerUuid);
704     }
705 }
706 
OnMessageReceived(int sessionId,const void * data,unsigned int dataLen)707 void AppDataListenerWrap::OnMessageReceived(int sessionId, const void *data, unsigned int dataLen)
708 {
709     LOG_INFO("begin");
710     if (sessionId == INVALID_SESSION_ID) {
711         return;
712     }
713     char peerSessionName[SESSION_NAME_SIZE_MAX] = "";
714     char peerDevId[DEVICE_ID_SIZE_MAX] = "";
715     int ret = GetPeerSessionName(sessionId, peerSessionName, sizeof(peerSessionName));
716     if (ret != SOFTBUS_OK) {
717         LOG_WARN("get my peer session name failed, session id is %{public}d.", sessionId);
718         return;
719     }
720     ret = GetPeerDeviceId(sessionId, peerDevId, sizeof(peerDevId));
721     if (ret != SOFTBUS_OK) {
722         LOG_WARN("get my peer device id failed, session id is %{public}d.", sessionId);
723         return;
724     }
725     std::string peerUuid = DevManager::GetInstance()->GetUuidByNodeId(std::string(peerDevId));
726     LOG_DEBUG("[MessageReceived] session id:%{public}d, "
727               "peerSessionName:%{public}s, peerDevId:%{public}s",
728         sessionId, peerSessionName, SoftBusAdapter::ToBeAnonymous(peerUuid).c_str());
729     NotifyDataListeners(reinterpret_cast<const uint8_t *>(data), dataLen, peerUuid, { std::string(peerSessionName) });
730 }
731 
OnBytesReceived(int sessionId,const void * data,unsigned int dataLen)732 void AppDataListenerWrap::OnBytesReceived(int sessionId, const void *data, unsigned int dataLen)
733 {
734     LOG_INFO("begin");
735     if (sessionId == INVALID_SESSION_ID) {
736         return;
737     }
738     char peerSessionName[SESSION_NAME_SIZE_MAX] = "";
739     char peerDevId[DEVICE_ID_SIZE_MAX] = "";
740     int ret = GetPeerSessionName(sessionId, peerSessionName, sizeof(peerSessionName));
741     if (ret != SOFTBUS_OK) {
742         LOG_WARN("get my peer session name failed, session id is %{public}d.", sessionId);
743         return;
744     }
745     ret = GetPeerDeviceId(sessionId, peerDevId, sizeof(peerDevId));
746     if (ret != SOFTBUS_OK) {
747         LOG_WARN("get my peer device id failed, session id is %{public}d.", sessionId);
748         return;
749     }
750     std::string peerUuid = DevManager::GetInstance()->GetUuidByNodeId(std::string(peerDevId));
751     LOG_DEBUG("[BytesReceived] session id:%{public}d, peerSessionName:%{public}s, "
752               "peerDevId:%{public}s",
753         sessionId, peerSessionName, SoftBusAdapter::ToBeAnonymous(peerUuid).c_str());
754     NotifyDataListeners(reinterpret_cast<const uint8_t *>(data), dataLen, peerUuid, { std::string(peerSessionName) });
755 }
756 
NotifyDataListeners(const uint8_t * ptr,const int size,const std::string & deviceId,const PipeInfo & pipeInfo)757 void AppDataListenerWrap::NotifyDataListeners(
758     const uint8_t *ptr, const int size, const std::string &deviceId, const PipeInfo &pipeInfo)
759 {
760     return softBusAdapter_->NotifyDataListeners(ptr, size, deviceId, pipeInfo);
761 }
762 } // namespace ObjectStore
763 } // namespace OHOS
764