1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <mutex>
16 #include <thread>
17
18 #include "device_manager_adapter.h"
19 #include "dfx_types.h"
20 #include "kvstore_utils.h"
21 #include "log_print.h"
22 #include "reporter.h"
23 #include "securec.h"
24 #include "session.h"
25 #include "softbus_adapter.h"
26 #include "softbus_bus_center.h"
27 #include "softbus_error_code.h"
28 #ifdef LOG_TAG
29 #undef LOG_TAG
30 #endif
31 #define LOG_TAG "SoftBusAdapter"
32
33 namespace OHOS {
34 namespace AppDistributedKv {
35 enum SoftBusAdapterErrorCode : int32_t {
36 SESSION_ID_INVALID = 2,
37 MY_SESSION_NAME_INVALID,
38 PEER_SESSION_NAME_INVALID,
39 PEER_DEVICE_ID_INVALID,
40 SESSION_SIDE_INVALID,
41 ROUTE_TYPE_INVALID,
42 };
43 constexpr int32_t SESSION_NAME_SIZE_MAX = 65;
44 constexpr int32_t DEVICE_ID_SIZE_MAX = 65;
45 constexpr uint32_t DEFAULT_MTU_SIZE = 4096u;
46 using namespace std;
47 using namespace OHOS::DistributedDataDfx;
48 using namespace OHOS::DistributedKv;
49 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
50 using Strategy = CommunicationStrategy::Strategy;
51 struct ConnDetailsInfo {
52 char myName[SESSION_NAME_SIZE_MAX] = "";
53 char peerName[SESSION_NAME_SIZE_MAX] = "";
54 std::string peerDevUuid;
55 int32_t side = -1;
56 int32_t routeType = -1;
57 };
58 class AppDataListenerWrap {
59 public:
60 static void SetDataHandler(SoftBusAdapter *handler);
61 static int OnConnectOpened(int connId, int result);
62 static void OnConnectClosed(int connId);
63 static void OnBytesReceived(int connId, const void *data, unsigned int dataLen);
64
65 public:
66 // notify all listeners when received message
67 static void NotifyDataListeners(const uint8_t *data, const int size, const std::string &deviceId,
68 const PipeInfo &pipeInfo);
69
70 private:
71 static int GetConnDetailsInfo(int connId, ConnDetailsInfo &connInfo);
72 static SoftBusAdapter *softBusAdapter_;
73 };
74 SoftBusAdapter *AppDataListenerWrap::softBusAdapter_;
75 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::instance_;
76
77 namespace {
NotCareEvent(NodeBasicInfo * info)78 void NotCareEvent(NodeBasicInfo *info)
79 {
80 return;
81 }
82
NotCareEvent(NodeBasicInfoType type,NodeBasicInfo * info)83 void NotCareEvent(NodeBasicInfoType type, NodeBasicInfo *info)
84 {
85 return;
86 }
87
OnCareEvent(NodeStatusType type,NodeStatus * status)88 void OnCareEvent(NodeStatusType type, NodeStatus *status)
89 {
90 if (type != TYPE_DATABASE_STATUS || status == nullptr) {
91 return;
92 }
93 auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(status->basicInfo.networkId);
94 SoftBusAdapter::GetInstance()->OnBroadcast({ uuid }, status->dataBaseStatus);
95 }
96
97 INodeStateCb g_callback = {
98 .events = EVENT_NODE_STATUS_CHANGED,
99 .onNodeOnline = NotCareEvent,
100 .onNodeOffline = NotCareEvent,
101 .onNodeBasicInfoChanged = NotCareEvent,
102 .onNodeStatusChanged = OnCareEvent,
103 };
104 } // namespace
105 SoftBusAdapter::SofBusDeviceChangeListenerImpl SoftBusAdapter::listener_;
SoftBusAdapter()106 SoftBusAdapter::SoftBusAdapter()
107 {
108 ZLOGI("begin");
109 AppDataListenerWrap::SetDataHandler(this);
110
111 sessionListener_.OnSessionOpened = AppDataListenerWrap::OnConnectOpened;
112 sessionListener_.OnSessionClosed = AppDataListenerWrap::OnConnectClosed;
113 sessionListener_.OnBytesReceived = AppDataListenerWrap::OnBytesReceived;
114 sessionListener_.OnMessageReceived = AppDataListenerWrap::OnBytesReceived;
115
116 auto status = DmAdapter::GetInstance().StartWatchDeviceChange(&listener_, {"softBusAdapter"});
117 if (status != Status::SUCCESS) {
118 ZLOGW("register device change failed, status:%d", static_cast<int>(status));
119 }
120 }
121
~SoftBusAdapter()122 SoftBusAdapter::~SoftBusAdapter()
123 {
124 ZLOGI("begin");
125 if (onBroadcast_) {
126 UnregNodeDeviceStateCb(&g_callback);
127 }
128 connects_.clear();
129 }
130
GetInstance()131 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::GetInstance()
132 {
133 static std::once_flag onceFlag;
134 std::call_once(onceFlag, [&] { instance_ = std::make_shared<SoftBusAdapter>(); });
135 return instance_;
136 }
137
StartWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)138 Status SoftBusAdapter::StartWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo)
139 {
140 ZLOGD("begin");
141 if (observer == nullptr) {
142 return Status::INVALID_ARGUMENT;
143 }
144
145 auto ret = dataChangeListeners_.Insert(pipeInfo.pipeId, observer);
146 if (!ret) {
147 ZLOGW("Add listener error or repeated adding.");
148 return Status::ERROR;
149 }
150
151 return Status::SUCCESS;
152 }
153
StopWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)154 Status SoftBusAdapter::StopWatchDataChange(__attribute__((unused)) const AppDataChangeListener *observer,
155 const PipeInfo &pipeInfo)
156 {
157 ZLOGD("begin");
158 if (dataChangeListeners_.Erase(pipeInfo.pipeId) != 0) {
159 return Status::SUCCESS;
160 }
161 ZLOGW("stop data observer error, pipeInfo:%{public}s", pipeInfo.pipeId.c_str());
162 return Status::ERROR;
163 }
164
SendData(const PipeInfo & pipeInfo,const DeviceId & deviceId,const DataInfo & dataInfo,uint32_t totalLength,const MessageInfo & info)165 Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
166 uint32_t totalLength, const MessageInfo &info)
167 {
168 std::shared_ptr<SoftBusClient> conn;
169 {
170 lock_guard<mutex> lock(connMutex_);
171 std::string key = pipeInfo.pipeId + deviceId.deviceId;
172 if (connects_.find(key) == connects_.end()) {
173 connects_.emplace(key, std::make_shared<SoftBusClient>(pipeInfo, deviceId, [this](int32_t connId) {
174 return GetSessionStatus(connId);
175 }));
176 }
177 conn = connects_[key];
178 }
179
180 if (conn != nullptr) {
181 return conn->Send(dataInfo, totalLength);
182 }
183
184 return Status::ERROR;
185 }
186
GetConnect(const std::string & deviceId)187 std::shared_ptr<SoftBusClient> SoftBusAdapter::GetConnect(const std::string &deviceId)
188 {
189 lock_guard<mutex> lock(connMutex_);
190 for (const auto& conn : connects_) {
191 if (*conn.second == deviceId) {
192 return conn.second;
193 }
194 }
195 return nullptr;
196 }
197
GetMtuSize(const DeviceId & deviceId)198 uint32_t SoftBusAdapter::GetMtuSize(const DeviceId &deviceId)
199 {
200 std::shared_ptr<SoftBusClient> conn = GetConnect(deviceId.deviceId);
201 if (conn != nullptr) {
202 return conn->GetMtuSize();
203 }
204 return DEFAULT_MTU_SIZE;
205 }
206
DelConnect(int32_t connId)207 std::string SoftBusAdapter::DelConnect(int32_t connId)
208 {
209 lock_guard<mutex> lock(connMutex_);
210 std::string name;
211 for (const auto& conn : connects_) {
212 if (*conn.second == connId) {
213 name = conn.first;
214 connects_.erase(conn.first);
215 break;
216 }
217 }
218 return name;
219 }
220
DelSessionStatus(int32_t connId)221 void SoftBusAdapter::DelSessionStatus(int32_t connId)
222 {
223 lock_guard<mutex> lock(statusMutex_);
224 auto it = sessionsStatus_.find(connId);
225 if (it != sessionsStatus_.end()) {
226 it->second->Clear(SOFTBUS_ERR);
227 sessionsStatus_.erase(it);
228 }
229 }
230
GetSessionStatus(int32_t connId)231 int32_t SoftBusAdapter::GetSessionStatus(int32_t connId)
232 {
233 auto semaphore = GetSemaphore(connId);
234 return semaphore->GetValue();
235 }
236
OnSessionOpen(int32_t connId,int32_t status)237 void SoftBusAdapter::OnSessionOpen(int32_t connId, int32_t status)
238 {
239 auto semaphore = GetSemaphore(connId);
240 semaphore->SetValue(status);
241 }
242
OnSessionClose(int32_t connId)243 std::string SoftBusAdapter::OnSessionClose(int32_t connId)
244 {
245 DelSessionStatus(connId);
246 return DelConnect(connId);
247 }
248
GetSemaphore(int32_t connId)249 std::shared_ptr<BlockData<int32_t>> SoftBusAdapter::GetSemaphore(int32_t connId)
250 {
251 lock_guard<mutex> lock(statusMutex_);
252 if (sessionsStatus_.find(connId) == sessionsStatus_.end()) {
253 sessionsStatus_.emplace(connId, std::make_shared<BlockData<int32_t>>(WAIT_MAX_TIME, SOFTBUS_ERR));
254 }
255 return sessionsStatus_[connId];
256 }
257
IsSameStartedOnPeer(const struct PipeInfo & pipeInfo,const struct DeviceId & peer)258 bool SoftBusAdapter::IsSameStartedOnPeer(const struct PipeInfo &pipeInfo,
259 __attribute__((unused)) const struct DeviceId &peer)
260 {
261 ZLOGI("pipeInfo:%{public}s deviceId:%{public}s", pipeInfo.pipeId.c_str(),
262 KvStoreUtils::ToBeAnonymous(peer.deviceId).c_str());
263 return true;
264 }
265
SetMessageTransFlag(const PipeInfo & pipeInfo,bool flag)266 void SoftBusAdapter::SetMessageTransFlag(const PipeInfo &pipeInfo, bool flag)
267 {
268 ZLOGI("pipeInfo: %s flag: %d", pipeInfo.pipeId.c_str(), flag);
269 flag_ = flag;
270 }
271
CreateSessionServerAdapter(const std::string & sessionName)272 int SoftBusAdapter::CreateSessionServerAdapter(const std::string &sessionName)
273 {
274 ZLOGD("begin");
275 return CreateSessionServer("ohos.distributeddata", sessionName.c_str(), &sessionListener_);
276 }
277
RemoveSessionServerAdapter(const std::string & sessionName) const278 int SoftBusAdapter::RemoveSessionServerAdapter(const std::string &sessionName) const
279 {
280 ZLOGD("begin");
281 return RemoveSessionServer("ohos.distributeddata", sessionName.c_str());
282 }
283
NotifyDataListeners(const uint8_t * data,int size,const std::string & deviceId,const PipeInfo & pipeInfo)284 void SoftBusAdapter::NotifyDataListeners(const uint8_t *data, int size, const std::string &deviceId,
285 const PipeInfo &pipeInfo)
286 {
287 ZLOGD("begin");
288 auto ret = dataChangeListeners_.ComputeIfPresent(pipeInfo.pipeId,
289 [&data, &size, &deviceId, &pipeInfo](const auto &key, const AppDataChangeListener *&value) {
290 ZLOGD("ready to notify, pipeName:%{public}s, deviceId:%{public}s.", pipeInfo.pipeId.c_str(),
291 KvStoreUtils::ToBeAnonymous(deviceId).c_str());
292 DeviceInfo deviceInfo = DmAdapter::GetInstance().GetDeviceInfo(deviceId);
293 value->OnMessage(deviceInfo, data, size, pipeInfo);
294 TrafficStat ts{ pipeInfo.pipeId, deviceId, 0, size };
295 Reporter::GetInstance()->TrafficStatistic()->Report(ts);
296 return true;
297 });
298 if (!ret) {
299 ZLOGW("no listener %{public}s.", pipeInfo.pipeId.c_str());
300 }
301 }
302
Broadcast(const PipeInfo & pipeInfo,uint16_t mask)303 int32_t SoftBusAdapter::Broadcast(const PipeInfo &pipeInfo, uint16_t mask)
304 {
305 return SetNodeDataChangeFlag(pipeInfo.pipeId.c_str(), DmAdapter::GetInstance().GetLocalDevice().networkId.c_str(),
306 mask);
307 }
308
OnBroadcast(const DeviceId & device,uint16_t mask)309 void SoftBusAdapter::OnBroadcast(const DeviceId &device, uint16_t mask)
310 {
311 ZLOGI("device:%{public}s mask:0x%{public}x", KvStoreUtils::ToBeAnonymous(device.deviceId).c_str(), mask);
312 if (!onBroadcast_) {
313 ZLOGW("no listener device:%{public}s mask:0x%{public}x",
314 KvStoreUtils::ToBeAnonymous(device.deviceId).c_str(), mask);
315 return;
316 }
317 onBroadcast_(device.deviceId, mask);
318 }
319
ListenBroadcastMsg(const PipeInfo & pipeInfo,std::function<void (const std::string &,uint16_t)> listener)320 int32_t SoftBusAdapter::ListenBroadcastMsg(const PipeInfo &pipeInfo,
321 std::function<void(const std::string &, uint16_t)> listener)
322 {
323 if (onBroadcast_) {
324 return SOFTBUS_ALREADY_EXISTED;
325 }
326 onBroadcast_ = std::move(listener);
327 return RegNodeDeviceStateCb(pipeInfo.pipeId.c_str(), &g_callback);
328 }
329
SetDataHandler(SoftBusAdapter * handler)330 void AppDataListenerWrap::SetDataHandler(SoftBusAdapter *handler)
331 {
332 ZLOGI("begin");
333 softBusAdapter_ = handler;
334 }
335
GetConnDetailsInfo(int connId,ConnDetailsInfo & connInfo)336 int AppDataListenerWrap::GetConnDetailsInfo(int connId, ConnDetailsInfo &connInfo)
337 {
338 if (connId < 0) {
339 return SESSION_ID_INVALID;
340 }
341
342 int ret = GetMySessionName(connId, connInfo.myName, sizeof(connInfo.myName));
343 if (ret != SOFTBUS_OK) {
344 return MY_SESSION_NAME_INVALID;
345 }
346
347 ret = GetPeerSessionName(connId, connInfo.peerName, sizeof(connInfo.peerName));
348 if (ret != SOFTBUS_OK) {
349 return PEER_SESSION_NAME_INVALID;
350 }
351
352 char peerDevId[DEVICE_ID_SIZE_MAX] = "";
353 ret = GetPeerDeviceId(connId, peerDevId, sizeof(peerDevId));
354 if (ret != SOFTBUS_OK) {
355 return PEER_DEVICE_ID_INVALID;
356 }
357 connInfo.peerDevUuid = DmAdapter::GetInstance().GetUuidByNetworkId(std::string(peerDevId));
358
359 connInfo.side = GetSessionSide(connId);
360 if (connInfo.side < 0) {
361 return SESSION_SIDE_INVALID;
362 }
363
364 int32_t routeType = RouteType::INVALID_ROUTE_TYPE;
365 ret = GetSessionOption(connId, SESSION_OPTION_LINK_TYPE, &routeType, sizeof(routeType));
366 if (ret != SOFTBUS_OK) {
367 return ROUTE_TYPE_INVALID;
368 }
369 connInfo.routeType = routeType;
370
371 return SOFTBUS_OK;
372 }
373
OnConnectOpened(int connId,int result)374 int AppDataListenerWrap::OnConnectOpened(int connId, int result)
375 {
376 ZLOGI("[SessionOpen] connId:%{public}d, result:%{public}d", connId, result);
377 softBusAdapter_->OnSessionOpen(connId, result);
378 if (result != SOFTBUS_OK) {
379 ZLOGW("session %{public}d open failed, result:%{public}d.", connId, result);
380 return result;
381 }
382
383 ConnDetailsInfo connInfo;
384 int ret = GetConnDetailsInfo(connId, connInfo);
385 if (ret != SOFTBUS_OK) {
386 ZLOGE("[SessionOpened] session id:%{public}d get info fail error: %{public}d", connId, ret);
387 return ret;
388 }
389
390 ZLOGD("[OnConnectOpened] conn id:%{public}d, my name:%{public}s, peer name:%{public}s, "
391 "peer devId:%{public}s, side:%{public}d, routeType:%{public}d", connId, connInfo.myName, connInfo.peerName,
392 KvStoreUtils::ToBeAnonymous(connInfo.peerDevUuid).c_str(), connInfo.side, connInfo.routeType);
393 return 0;
394 }
395
OnConnectClosed(int connId)396 void AppDataListenerWrap::OnConnectClosed(int connId)
397 {
398 // when the local close the session, this callback function will not be triggered;
399 // when the current function is called, soft bus has released the session resource, only connId is valid;
400 std::string name = softBusAdapter_->OnSessionClose(connId);
401 ZLOGI("[SessionClosed] connId:%{public}d, name:%{public}s", connId, KvStoreUtils::ToBeAnonymous(name).c_str());
402 }
403
OnBytesReceived(int connId,const void * data,unsigned int dataLen)404 void AppDataListenerWrap::OnBytesReceived(int connId, const void *data, unsigned int dataLen)
405 {
406 ConnDetailsInfo connInfo;
407 int ret = GetConnDetailsInfo(connId, connInfo);
408 if (ret != SOFTBUS_OK) {
409 ZLOGE("[OnBytesReceived] session id:%{public}d get info fail error: %{public}d", connId, ret);
410 return;
411 }
412
413 ZLOGD("[OnBytesReceived] conn id:%{public}d, peer name:%{public}s, "
414 "peer devId:%{public}s, side:%{public}d, data len:%{public}u", connId, connInfo.peerName,
415 KvStoreUtils::ToBeAnonymous(connInfo.peerDevUuid).c_str(), connInfo.side, dataLen);
416
417 NotifyDataListeners(reinterpret_cast<const uint8_t *>(data), dataLen, connInfo.peerDevUuid,
418 { std::string(connInfo.peerName), "" });
419 }
420
NotifyDataListeners(const uint8_t * data,const int size,const std::string & deviceId,const PipeInfo & pipeInfo)421 void AppDataListenerWrap::NotifyDataListeners(const uint8_t *data, const int size, const std::string &deviceId,
422 const PipeInfo &pipeInfo)
423 {
424 softBusAdapter_->NotifyDataListeners(data, size, deviceId, pipeInfo);
425 }
426
OnDeviceChanged(const AppDistributedKv::DeviceInfo & info,const AppDistributedKv::DeviceChangeType & type) const427 void SoftBusAdapter::SofBusDeviceChangeListenerImpl::OnDeviceChanged(const AppDistributedKv::DeviceInfo &info,
428 const AppDistributedKv::DeviceChangeType &type) const
429 {
430 Strategy strategy = Strategy::BUTT;
431 switch (type) {
432 case AppDistributedKv::DeviceChangeType::DEVICE_ONLINE:
433 strategy = Strategy::ON_LINE_SELECT_CHANNEL;
434 break;
435 case AppDistributedKv::DeviceChangeType::DEVICE_ONREADY:
436 strategy = Strategy::DEFAULT;
437 break;
438 default:
439 break;
440 }
441
442 if (strategy >= Strategy::BUTT) {
443 return;
444 }
445
446 CommunicationStrategy::GetInstance().SetStrategy(info.uuid, strategy,
447 [this](const std::string &deviceId, Strategy strategy) {
448 std::shared_ptr<SoftBusClient> conn = SoftBusAdapter::GetInstance()->GetConnect(deviceId);
449 if (conn != nullptr) {
450 conn->AfterStrategyUpdate(strategy);
451 }
452 });
453 }
454 } // namespace AppDistributedKv
455 } // namespace OHOS
456