• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <mutex>
16 #include <thread>
17 
18 #include "device_manager_adapter.h"
19 #include "dfx_types.h"
20 #include "kvstore_utils.h"
21 #include "log_print.h"
22 #include "reporter.h"
23 #include "securec.h"
24 #include "session.h"
25 #include "softbus_adapter.h"
26 #include "softbus_bus_center.h"
27 #include "softbus_error_code.h"
28 #ifdef LOG_TAG
29 #undef LOG_TAG
30 #endif
31 #define LOG_TAG "SoftBusAdapter"
32 
33 namespace OHOS {
34 namespace AppDistributedKv {
35 enum SoftBusAdapterErrorCode : int32_t {
36     SESSION_ID_INVALID = 2,
37     MY_SESSION_NAME_INVALID,
38     PEER_SESSION_NAME_INVALID,
39     PEER_DEVICE_ID_INVALID,
40     SESSION_SIDE_INVALID,
41     ROUTE_TYPE_INVALID,
42 };
43 constexpr int32_t SESSION_NAME_SIZE_MAX = 65;
44 constexpr int32_t DEVICE_ID_SIZE_MAX = 65;
45 constexpr uint32_t DEFAULT_MTU_SIZE = 4096u;
46 using namespace std;
47 using namespace OHOS::DistributedDataDfx;
48 using namespace OHOS::DistributedKv;
49 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
50 using Strategy = CommunicationStrategy::Strategy;
51 struct ConnDetailsInfo {
52     char myName[SESSION_NAME_SIZE_MAX] = "";
53     char peerName[SESSION_NAME_SIZE_MAX] = "";
54     std::string peerDevUuid;
55     int32_t side = -1;
56     int32_t routeType = -1;
57 };
58 class AppDataListenerWrap {
59 public:
60     static void SetDataHandler(SoftBusAdapter *handler);
61     static int OnConnectOpened(int connId, int result);
62     static void OnConnectClosed(int connId);
63     static void OnBytesReceived(int connId, const void *data, unsigned int dataLen);
64 
65 public:
66     // notify all listeners when received message
67     static void NotifyDataListeners(const uint8_t *data, const int size, const std::string &deviceId,
68         const PipeInfo &pipeInfo);
69 
70 private:
71     static int GetConnDetailsInfo(int connId, ConnDetailsInfo &connInfo);
72     static SoftBusAdapter *softBusAdapter_;
73 };
74 SoftBusAdapter *AppDataListenerWrap::softBusAdapter_;
75 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::instance_;
76 
77 namespace {
NotCareEvent(NodeBasicInfo * info)78 void NotCareEvent(NodeBasicInfo *info)
79 {
80     return;
81 }
82 
NotCareEvent(NodeBasicInfoType type,NodeBasicInfo * info)83 void NotCareEvent(NodeBasicInfoType type, NodeBasicInfo *info)
84 {
85     return;
86 }
87 
OnCareEvent(NodeStatusType type,NodeStatus * status)88 void OnCareEvent(NodeStatusType type, NodeStatus *status)
89 {
90     if (type != TYPE_DATABASE_STATUS || status == nullptr) {
91         return;
92     }
93     auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(status->basicInfo.networkId);
94     SoftBusAdapter::GetInstance()->OnBroadcast({ uuid }, status->dataBaseStatus);
95 }
96 
97 INodeStateCb g_callback = {
98     .events = EVENT_NODE_STATUS_CHANGED,
99     .onNodeOnline = NotCareEvent,
100     .onNodeOffline = NotCareEvent,
101     .onNodeBasicInfoChanged = NotCareEvent,
102     .onNodeStatusChanged = OnCareEvent,
103 };
104 } // namespace
105 SoftBusAdapter::SofBusDeviceChangeListenerImpl SoftBusAdapter::listener_;
SoftBusAdapter()106 SoftBusAdapter::SoftBusAdapter()
107 {
108     ZLOGI("begin");
109     AppDataListenerWrap::SetDataHandler(this);
110 
111     sessionListener_.OnSessionOpened = AppDataListenerWrap::OnConnectOpened;
112     sessionListener_.OnSessionClosed = AppDataListenerWrap::OnConnectClosed;
113     sessionListener_.OnBytesReceived = AppDataListenerWrap::OnBytesReceived;
114     sessionListener_.OnMessageReceived = AppDataListenerWrap::OnBytesReceived;
115 
116     auto status = DmAdapter::GetInstance().StartWatchDeviceChange(&listener_, {"softBusAdapter"});
117     if (status != Status::SUCCESS) {
118         ZLOGW("register device change failed, status:%d", static_cast<int>(status));
119     }
120 }
121 
~SoftBusAdapter()122 SoftBusAdapter::~SoftBusAdapter()
123 {
124     ZLOGI("begin");
125     if (onBroadcast_) {
126         UnregNodeDeviceStateCb(&g_callback);
127     }
128     connects_.clear();
129 }
130 
GetInstance()131 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::GetInstance()
132 {
133     static std::once_flag onceFlag;
134     std::call_once(onceFlag, [&] { instance_ = std::make_shared<SoftBusAdapter>(); });
135     return instance_;
136 }
137 
StartWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)138 Status SoftBusAdapter::StartWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo)
139 {
140     ZLOGD("begin");
141     if (observer == nullptr) {
142         return Status::INVALID_ARGUMENT;
143     }
144 
145     auto ret = dataChangeListeners_.Insert(pipeInfo.pipeId, observer);
146     if (!ret) {
147         ZLOGW("Add listener error or repeated adding.");
148         return Status::ERROR;
149     }
150 
151     return Status::SUCCESS;
152 }
153 
StopWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)154 Status SoftBusAdapter::StopWatchDataChange(__attribute__((unused)) const AppDataChangeListener *observer,
155     const PipeInfo &pipeInfo)
156 {
157     ZLOGD("begin");
158     if (dataChangeListeners_.Erase(pipeInfo.pipeId) != 0) {
159         return Status::SUCCESS;
160     }
161     ZLOGW("stop data observer error, pipeInfo:%{public}s", pipeInfo.pipeId.c_str());
162     return Status::ERROR;
163 }
164 
SendData(const PipeInfo & pipeInfo,const DeviceId & deviceId,const DataInfo & dataInfo,uint32_t totalLength,const MessageInfo & info)165 Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
166     uint32_t totalLength, const MessageInfo &info)
167 {
168     std::shared_ptr<SoftBusClient> conn;
169     {
170         lock_guard<mutex> lock(connMutex_);
171         std::string key = pipeInfo.pipeId + deviceId.deviceId;
172         if (connects_.find(key) == connects_.end()) {
173             connects_.emplace(key, std::make_shared<SoftBusClient>(pipeInfo, deviceId, [this](int32_t connId) {
174                 return GetSessionStatus(connId);
175             }));
176         }
177         conn = connects_[key];
178     }
179 
180     if (conn != nullptr) {
181         return conn->Send(dataInfo, totalLength);
182     }
183 
184     return Status::ERROR;
185 }
186 
GetConnect(const std::string & deviceId)187 std::shared_ptr<SoftBusClient> SoftBusAdapter::GetConnect(const std::string &deviceId)
188 {
189     lock_guard<mutex> lock(connMutex_);
190     for (const auto& conn : connects_) {
191         if (*conn.second == deviceId) {
192             return conn.second;
193         }
194     }
195     return nullptr;
196 }
197 
GetMtuSize(const DeviceId & deviceId)198 uint32_t SoftBusAdapter::GetMtuSize(const DeviceId &deviceId)
199 {
200     std::shared_ptr<SoftBusClient> conn = GetConnect(deviceId.deviceId);
201     if (conn != nullptr) {
202         return conn->GetMtuSize();
203     }
204     return DEFAULT_MTU_SIZE;
205 }
206 
DelConnect(int32_t connId)207 std::string SoftBusAdapter::DelConnect(int32_t connId)
208 {
209     lock_guard<mutex> lock(connMutex_);
210     std::string name;
211     for (const auto& conn : connects_) {
212         if (*conn.second == connId) {
213             name = conn.first;
214             connects_.erase(conn.first);
215             break;
216         }
217     }
218     return name;
219 }
220 
DelSessionStatus(int32_t connId)221 void SoftBusAdapter::DelSessionStatus(int32_t connId)
222 {
223     lock_guard<mutex> lock(statusMutex_);
224     auto it = sessionsStatus_.find(connId);
225     if (it != sessionsStatus_.end()) {
226         it->second->Clear(SOFTBUS_ERR);
227         sessionsStatus_.erase(it);
228     }
229 }
230 
GetSessionStatus(int32_t connId)231 int32_t SoftBusAdapter::GetSessionStatus(int32_t connId)
232 {
233     auto semaphore = GetSemaphore(connId);
234     return semaphore->GetValue();
235 }
236 
OnSessionOpen(int32_t connId,int32_t status)237 void SoftBusAdapter::OnSessionOpen(int32_t connId, int32_t status)
238 {
239     auto semaphore = GetSemaphore(connId);
240     semaphore->SetValue(status);
241 }
242 
OnSessionClose(int32_t connId)243 std::string SoftBusAdapter::OnSessionClose(int32_t connId)
244 {
245     DelSessionStatus(connId);
246     return DelConnect(connId);
247 }
248 
GetSemaphore(int32_t connId)249 std::shared_ptr<BlockData<int32_t>> SoftBusAdapter::GetSemaphore(int32_t connId)
250 {
251     lock_guard<mutex> lock(statusMutex_);
252     if (sessionsStatus_.find(connId) == sessionsStatus_.end()) {
253         sessionsStatus_.emplace(connId, std::make_shared<BlockData<int32_t>>(WAIT_MAX_TIME, SOFTBUS_ERR));
254     }
255     return sessionsStatus_[connId];
256 }
257 
IsSameStartedOnPeer(const struct PipeInfo & pipeInfo,const struct DeviceId & peer)258 bool SoftBusAdapter::IsSameStartedOnPeer(const struct PipeInfo &pipeInfo,
259     __attribute__((unused)) const struct DeviceId &peer)
260 {
261     ZLOGI("pipeInfo:%{public}s deviceId:%{public}s", pipeInfo.pipeId.c_str(),
262           KvStoreUtils::ToBeAnonymous(peer.deviceId).c_str());
263     return true;
264 }
265 
SetMessageTransFlag(const PipeInfo & pipeInfo,bool flag)266 void SoftBusAdapter::SetMessageTransFlag(const PipeInfo &pipeInfo, bool flag)
267 {
268     ZLOGI("pipeInfo: %s flag: %d", pipeInfo.pipeId.c_str(), flag);
269     flag_ = flag;
270 }
271 
CreateSessionServerAdapter(const std::string & sessionName)272 int SoftBusAdapter::CreateSessionServerAdapter(const std::string &sessionName)
273 {
274     ZLOGD("begin");
275     return CreateSessionServer("ohos.distributeddata", sessionName.c_str(), &sessionListener_);
276 }
277 
RemoveSessionServerAdapter(const std::string & sessionName) const278 int SoftBusAdapter::RemoveSessionServerAdapter(const std::string &sessionName) const
279 {
280     ZLOGD("begin");
281     return RemoveSessionServer("ohos.distributeddata", sessionName.c_str());
282 }
283 
NotifyDataListeners(const uint8_t * data,int size,const std::string & deviceId,const PipeInfo & pipeInfo)284 void SoftBusAdapter::NotifyDataListeners(const uint8_t *data, int size, const std::string &deviceId,
285     const PipeInfo &pipeInfo)
286 {
287     ZLOGD("begin");
288     auto ret = dataChangeListeners_.ComputeIfPresent(pipeInfo.pipeId,
289         [&data, &size, &deviceId, &pipeInfo](const auto &key, const AppDataChangeListener *&value) {
290             ZLOGD("ready to notify, pipeName:%{public}s, deviceId:%{public}s.", pipeInfo.pipeId.c_str(),
291                   KvStoreUtils::ToBeAnonymous(deviceId).c_str());
292             DeviceInfo deviceInfo = DmAdapter::GetInstance().GetDeviceInfo(deviceId);
293             value->OnMessage(deviceInfo, data, size, pipeInfo);
294             TrafficStat ts{ pipeInfo.pipeId, deviceId, 0, size };
295             Reporter::GetInstance()->TrafficStatistic()->Report(ts);
296             return true;
297         });
298     if (!ret) {
299         ZLOGW("no listener %{public}s.", pipeInfo.pipeId.c_str());
300     }
301 }
302 
Broadcast(const PipeInfo & pipeInfo,uint16_t mask)303 int32_t SoftBusAdapter::Broadcast(const PipeInfo &pipeInfo, uint16_t mask)
304 {
305     return SetNodeDataChangeFlag(pipeInfo.pipeId.c_str(), DmAdapter::GetInstance().GetLocalDevice().networkId.c_str(),
306         mask);
307 }
308 
OnBroadcast(const DeviceId & device,uint16_t mask)309 void SoftBusAdapter::OnBroadcast(const DeviceId &device, uint16_t mask)
310 {
311     ZLOGI("device:%{public}s mask:0x%{public}x", KvStoreUtils::ToBeAnonymous(device.deviceId).c_str(), mask);
312     if (!onBroadcast_) {
313         ZLOGW("no listener device:%{public}s mask:0x%{public}x",
314               KvStoreUtils::ToBeAnonymous(device.deviceId).c_str(), mask);
315         return;
316     }
317     onBroadcast_(device.deviceId, mask);
318 }
319 
ListenBroadcastMsg(const PipeInfo & pipeInfo,std::function<void (const std::string &,uint16_t)> listener)320 int32_t SoftBusAdapter::ListenBroadcastMsg(const PipeInfo &pipeInfo,
321     std::function<void(const std::string &, uint16_t)> listener)
322 {
323     if (onBroadcast_) {
324         return SOFTBUS_ALREADY_EXISTED;
325     }
326     onBroadcast_ = std::move(listener);
327     return RegNodeDeviceStateCb(pipeInfo.pipeId.c_str(), &g_callback);
328 }
329 
SetDataHandler(SoftBusAdapter * handler)330 void AppDataListenerWrap::SetDataHandler(SoftBusAdapter *handler)
331 {
332     ZLOGI("begin");
333     softBusAdapter_ = handler;
334 }
335 
GetConnDetailsInfo(int connId,ConnDetailsInfo & connInfo)336 int AppDataListenerWrap::GetConnDetailsInfo(int connId, ConnDetailsInfo &connInfo)
337 {
338     if (connId < 0) {
339         return SESSION_ID_INVALID;
340     }
341 
342     int ret = GetMySessionName(connId, connInfo.myName, sizeof(connInfo.myName));
343     if (ret != SOFTBUS_OK) {
344         return MY_SESSION_NAME_INVALID;
345     }
346 
347     ret = GetPeerSessionName(connId, connInfo.peerName, sizeof(connInfo.peerName));
348     if (ret != SOFTBUS_OK) {
349         return PEER_SESSION_NAME_INVALID;
350     }
351 
352     char peerDevId[DEVICE_ID_SIZE_MAX] = "";
353     ret = GetPeerDeviceId(connId, peerDevId, sizeof(peerDevId));
354     if (ret != SOFTBUS_OK) {
355         return PEER_DEVICE_ID_INVALID;
356     }
357     connInfo.peerDevUuid = DmAdapter::GetInstance().GetUuidByNetworkId(std::string(peerDevId));
358 
359     connInfo.side = GetSessionSide(connId);
360     if (connInfo.side < 0) {
361         return SESSION_SIDE_INVALID;
362     }
363 
364     int32_t routeType = RouteType::INVALID_ROUTE_TYPE;
365     ret = GetSessionOption(connId, SESSION_OPTION_LINK_TYPE, &routeType, sizeof(routeType));
366     if (ret != SOFTBUS_OK) {
367         return ROUTE_TYPE_INVALID;
368     }
369     connInfo.routeType = routeType;
370 
371     return SOFTBUS_OK;
372 }
373 
OnConnectOpened(int connId,int result)374 int AppDataListenerWrap::OnConnectOpened(int connId, int result)
375 {
376     ZLOGI("[SessionOpen] connId:%{public}d, result:%{public}d", connId, result);
377     softBusAdapter_->OnSessionOpen(connId, result);
378     if (result != SOFTBUS_OK) {
379         ZLOGW("session %{public}d open failed, result:%{public}d.", connId, result);
380         return result;
381     }
382 
383     ConnDetailsInfo connInfo;
384     int ret = GetConnDetailsInfo(connId, connInfo);
385     if (ret != SOFTBUS_OK) {
386         ZLOGE("[SessionOpened] session id:%{public}d get info fail error: %{public}d", connId, ret);
387         return ret;
388     }
389 
390     ZLOGD("[OnConnectOpened] conn id:%{public}d, my name:%{public}s, peer name:%{public}s, "
391           "peer devId:%{public}s, side:%{public}d, routeType:%{public}d", connId, connInfo.myName, connInfo.peerName,
392           KvStoreUtils::ToBeAnonymous(connInfo.peerDevUuid).c_str(), connInfo.side, connInfo.routeType);
393     return 0;
394 }
395 
OnConnectClosed(int connId)396 void AppDataListenerWrap::OnConnectClosed(int connId)
397 {
398     // when the local close the session, this callback function will not be triggered;
399     // when the current function is called, soft bus has released the session resource, only connId is valid;
400     std::string name = softBusAdapter_->OnSessionClose(connId);
401     ZLOGI("[SessionClosed] connId:%{public}d, name:%{public}s", connId, KvStoreUtils::ToBeAnonymous(name).c_str());
402 }
403 
OnBytesReceived(int connId,const void * data,unsigned int dataLen)404 void AppDataListenerWrap::OnBytesReceived(int connId, const void *data, unsigned int dataLen)
405 {
406     ConnDetailsInfo connInfo;
407     int ret = GetConnDetailsInfo(connId, connInfo);
408     if (ret != SOFTBUS_OK) {
409         ZLOGE("[OnBytesReceived] session id:%{public}d get info fail error: %{public}d", connId, ret);
410         return;
411     }
412 
413     ZLOGD("[OnBytesReceived] conn id:%{public}d, peer name:%{public}s, "
414           "peer devId:%{public}s, side:%{public}d, data len:%{public}u", connId, connInfo.peerName,
415           KvStoreUtils::ToBeAnonymous(connInfo.peerDevUuid).c_str(), connInfo.side, dataLen);
416 
417     NotifyDataListeners(reinterpret_cast<const uint8_t *>(data), dataLen, connInfo.peerDevUuid,
418         { std::string(connInfo.peerName), "" });
419 }
420 
NotifyDataListeners(const uint8_t * data,const int size,const std::string & deviceId,const PipeInfo & pipeInfo)421 void AppDataListenerWrap::NotifyDataListeners(const uint8_t *data, const int size, const std::string &deviceId,
422     const PipeInfo &pipeInfo)
423 {
424     softBusAdapter_->NotifyDataListeners(data, size, deviceId, pipeInfo);
425 }
426 
OnDeviceChanged(const AppDistributedKv::DeviceInfo & info,const AppDistributedKv::DeviceChangeType & type) const427 void SoftBusAdapter::SofBusDeviceChangeListenerImpl::OnDeviceChanged(const AppDistributedKv::DeviceInfo &info,
428     const AppDistributedKv::DeviceChangeType &type) const
429 {
430     Strategy strategy = Strategy::BUTT;
431     switch (type) {
432         case AppDistributedKv::DeviceChangeType::DEVICE_ONLINE:
433             strategy = Strategy::ON_LINE_SELECT_CHANNEL;
434             break;
435         case AppDistributedKv::DeviceChangeType::DEVICE_ONREADY:
436             strategy = Strategy::DEFAULT;
437             break;
438         default:
439             break;
440     }
441 
442     if (strategy >= Strategy::BUTT) {
443         return;
444     }
445 
446     CommunicationStrategy::GetInstance().SetStrategy(info.uuid, strategy,
447         [this](const std::string &deviceId, Strategy strategy) {
448         std::shared_ptr<SoftBusClient> conn = SoftBusAdapter::GetInstance()->GetConnect(deviceId);
449         if (conn != nullptr) {
450             conn->AfterStrategyUpdate(strategy);
451         }
452     });
453 }
454 } // namespace AppDistributedKv
455 } // namespace OHOS
456