1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "virtual_communicator_aggregator.h"
16
17 #include <cstdint>
18 #include <thread>
19 #include <utility>
20
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "runtime_context.h"
25
26 namespace DistributedDB {
Initialize(IAdapter * inAdapter,const std::shared_ptr<DBStatusAdapter> & statusAdapter)27 int VirtualCommunicatorAggregator::Initialize(IAdapter *inAdapter,
28 const std::shared_ptr<DBStatusAdapter> &statusAdapter)
29 {
30 return E_OK;
31 }
32
Finalize()33 void VirtualCommunicatorAggregator::Finalize()
34 {
35 }
36
37 // If not success, return nullptr and set outErrorNo
AllocCommunicator(uint64_t commLabel,int & outErrorNo,const std::string & userId)38 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo,
39 const std::string &userId)
40 {
41 if (isEnable_) {
42 return AllocCommunicator(remoteDeviceId_, outErrorNo);
43 }
44 return nullptr;
45 }
46
AllocCommunicator(const LabelType & commLabel,int & outErrorNo,const std::string & userId)47 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const LabelType &commLabel, int &outErrorNo,
48 const std::string &userId)
49 {
50 LOGI("[VirtualCommunicatorAggregator][Alloc] Label=%.6s.", VEC_TO_STR(commLabel));
51 if (allocCommunicatorCallback_) {
52 allocCommunicatorCallback_(userId);
53 }
54 if (commLabel.size() != COMM_LABEL_LENGTH) {
55 outErrorNo = -E_INVALID_ARGS;
56 return nullptr;
57 }
58
59 if (isEnable_) {
60 return AllocCommunicator(remoteDeviceId_, outErrorNo);
61 }
62 return nullptr;
63 }
64
ReleaseCommunicator(ICommunicator * inCommunicator,const std::string & userId)65 void VirtualCommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator, const std::string &userId)
66 {
67 if (releaseCommunicatorCallback_) {
68 releaseCommunicatorCallback_(userId);
69 }
70 // Called in main thread only
71 VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(inCommunicator);
72 OfflineDevice(communicator->GetDeviceId());
73 {
74 std::lock_guard<std::mutex> lock(communicatorsLock_);
75 communicators_.erase(communicator->GetDeviceId());
76 }
77 RefObject::KillAndDecObjRef(communicator);
78 communicator = nullptr;
79 }
80
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)81 int VirtualCommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
82 const Finalizer &inOper)
83 {
84 onCommLack_ = onCommLack;
85 return E_OK;
86 }
87
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)88 int VirtualCommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
89 {
90 onConnect_ = onConnect;
91 RunOnConnectCallback("deviceId", true);
92 return E_OK;
93 }
94
RunCommunicatorLackCallback(const LabelType & commLabel)95 void VirtualCommunicatorAggregator::RunCommunicatorLackCallback(const LabelType &commLabel)
96 {
97 if (onCommLack_) {
98 onCommLack_(commLabel, userId_);
99 }
100 }
101
RunOnConnectCallback(const std::string & target,bool isConnect)102 void VirtualCommunicatorAggregator::RunOnConnectCallback(const std::string &target, bool isConnect)
103 {
104 if (onConnect_) {
105 onConnect_(target, isConnect);
106 }
107 }
108
GetLocalIdentity(std::string & outTarget) const109 int VirtualCommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
110 {
111 std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
112 if (localDeviceId_.empty()) {
113 outTarget = "DEVICES_A";
114 } else {
115 outTarget = localDeviceId_;
116 }
117 return getLocalDeviceRet_;
118 }
119
OnlineDevice(const std::string & deviceId) const120 void VirtualCommunicatorAggregator::OnlineDevice(const std::string &deviceId) const
121 {
122 if (!isEnable_) {
123 return;
124 }
125
126 // Called in main thread only
127 for (const auto &iter : communicators_) {
128 VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
129 if (iter.first != deviceId) {
130 communicatorTmp->CallbackOnConnect(deviceId, true);
131 }
132 }
133 }
134
OfflineDevice(const std::string & deviceId) const135 void VirtualCommunicatorAggregator::OfflineDevice(const std::string &deviceId) const
136 {
137 if (!isEnable_) {
138 return;
139 }
140
141 // Called in main thread only
142 for (const auto &iter : communicators_) {
143 VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
144 if (iter.first != deviceId) {
145 communicatorTmp->CallbackOnConnect(deviceId, false);
146 }
147 }
148 }
149
AllocCommunicator(const std::string & deviceId,int & outErrorNo)150 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const std::string &deviceId, int &outErrorNo)
151 {
152 // Called in main thread only
153 VirtualCommunicator *communicator = new (std::nothrow) VirtualCommunicator(deviceId, this);
154 if (communicator == nullptr) {
155 outErrorNo = -E_OUT_OF_MEMORY;
156 }
157 {
158 std::lock_guard<std::mutex> lock(communicatorsLock_);
159 communicators_.insert(std::pair<std::string, VirtualCommunicator *>(deviceId, communicator));
160 }
161 OnlineDevice(deviceId);
162 return communicator;
163 }
164
GetCommunicator(const std::string & deviceId) const165 ICommunicator *VirtualCommunicatorAggregator::GetCommunicator(const std::string &deviceId) const
166 {
167 std::lock_guard<std::mutex> lock(communicatorsLock_);
168 auto iter = communicators_.find(deviceId);
169 if (iter != communicators_.end()) {
170 VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
171 return communicator;
172 }
173 return nullptr;
174 }
175
DispatchMessage(const std::string & srcTarget,const std::string & dstTarget,const Message * inMsg,const OnSendEnd & onEnd)176 void VirtualCommunicatorAggregator::DispatchMessage(const std::string &srcTarget, const std::string &dstTarget,
177 const Message *inMsg, const OnSendEnd &onEnd)
178 {
179 if (VirtualCommunicatorAggregator::GetBlockValue()) {
180 std::unique_lock<std::mutex> lock(blockLock_);
181 conditionVar_.wait(lock);
182 }
183
184 if (!isEnable_) {
185 LOGD("[VirtualCommunicatorAggregator] DispatchMessage, VirtualCommunicatorAggregator is disabled");
186 delete inMsg;
187 inMsg = nullptr;
188 return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
189 }
190 if (beforeDispatch_) {
191 beforeDispatch_(dstTarget, inMsg);
192 }
193 DispatchMessageInner(srcTarget, dstTarget, inMsg, onEnd);
194 }
195
DispatchMessageInner(const std::string & srcTarget,const std::string & dstTarget,const Message * inMsg,const OnSendEnd & onEnd)196 void VirtualCommunicatorAggregator::DispatchMessageInner(const std::string &srcTarget, const std::string &dstTarget,
197 const Message *inMsg, const OnSendEnd &onEnd)
198 {
199 std::lock_guard<std::mutex> lock(communicatorsLock_);
200 auto iter = communicators_.find(dstTarget);
201 if (iter != communicators_.end()) {
202 LOGI("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s", dstTarget.c_str());
203 VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
204 if (!communicator->IsEnabled()) {
205 LOGE("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s disabled", dstTarget.c_str());
206 delete inMsg;
207 inMsg = nullptr;
208 return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
209 }
210 uint32_t messageId = inMsg->GetMessageId();
211 Message *msg = const_cast<Message *>(inMsg);
212 msg->SetTarget(srcTarget);
213 RefObject::IncObjRef(communicator);
214 auto onDispatch = onDispatch_;
215 bool isNeedDelay = ((sendDelayTime_ > 0) && (delayTimes_ > 0) && (messageId == delayMessageId_) &&
216 (delayDevices_.count(dstTarget) > 0) && (skipTimes_ == 0));
217 uint32_t sendDelayTime = sendDelayTime_;
218 std::thread thread([communicator, srcTarget, dstTarget, msg, isNeedDelay, sendDelayTime, onDispatch]() {
219 if (isNeedDelay) {
220 std::this_thread::sleep_for(std::chrono::milliseconds(sendDelayTime));
221 }
222 if (onDispatch) {
223 onDispatch(dstTarget, msg);
224 }
225 communicator->CallbackOnMessage(srcTarget, msg);
226 RefObject::DecObjRef(communicator);
227 });
228 DelayTimeHandle(messageId, dstTarget);
229 thread.detach();
230 CallSendEnd(E_OK, onEnd);
231 } else {
232 LOGE("[VirtualCommunicatorAggregator] DispatchMessage, can't find dstTarget %s", dstTarget.c_str());
233 delete inMsg;
234 inMsg = nullptr;
235 CallSendEnd(-E_NOT_FOUND, onEnd);
236 }
237 }
238
SetBlockValue(bool value)239 void VirtualCommunicatorAggregator::SetBlockValue(bool value)
240 {
241 std::unique_lock<std::mutex> lock(blockLock_);
242 isBlock_ = value;
243 if (!value) {
244 conditionVar_.notify_all();
245 }
246 }
247
GetBlockValue() const248 bool VirtualCommunicatorAggregator::GetBlockValue() const
249 {
250 return isBlock_;
251 }
252
Disable()253 void VirtualCommunicatorAggregator::Disable()
254 {
255 isEnable_ = false;
256 }
257
Enable()258 void VirtualCommunicatorAggregator::Enable()
259 {
260 LOGD("[VirtualCommunicatorAggregator] enable");
261 isEnable_ = true;
262 }
263
CallSendEnd(int errCode,const OnSendEnd & onEnd)264 void VirtualCommunicatorAggregator::CallSendEnd(int errCode, const OnSendEnd &onEnd)
265 {
266 if (commErrCodeMock_ != E_OK) {
267 errCode = commErrCodeMock_;
268 }
269 if (onEnd) {
270 (void)RuntimeContext::GetInstance()->ScheduleTask([errCode, onEnd, this]() {
271 onEnd(errCode, isDirectEnd_);
272 });
273 }
274 }
275
RegOnDispatch(const std::function<void (const std::string &,Message * inMsg)> & onDispatch)276 void VirtualCommunicatorAggregator::RegOnDispatch(
277 const std::function<void(const std::string&, Message *inMsg)> &onDispatch)
278 {
279 onDispatch_ = onDispatch;
280 }
281
SetCurrentUserId(const std::string & userId)282 void VirtualCommunicatorAggregator::SetCurrentUserId(const std::string &userId)
283 {
284 userId_ = userId;
285 }
286
SetTimeout(const std::string & deviceId,uint32_t timeout)287 void VirtualCommunicatorAggregator::SetTimeout(const std::string &deviceId, uint32_t timeout)
288 {
289 std::lock_guard<std::mutex> lock(communicatorsLock_);
290 if (communicators_.find(deviceId) != communicators_.end()) {
291 communicators_[deviceId]->SetTimeout(timeout);
292 }
293 }
294
SetDropMessageTypeByDevice(const std::string & deviceId,MessageId msgid,uint32_t dropTimes)295 void VirtualCommunicatorAggregator::SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid,
296 uint32_t dropTimes)
297 {
298 std::lock_guard<std::mutex> lock(communicatorsLock_);
299 if (communicators_.find(deviceId) != communicators_.end()) {
300 communicators_[deviceId]->SetDropMessageTypeByDevice(msgid, dropTimes);
301 }
302 }
303
SetDeviceMtuSize(const std::string & deviceId,uint32_t mtuSize)304 void VirtualCommunicatorAggregator::SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize)
305 {
306 std::lock_guard<std::mutex> lock(communicatorsLock_);
307 if (communicators_.find(deviceId) != communicators_.end()) {
308 communicators_[deviceId]->SetCommunicatorMtuSize(mtuSize);
309 }
310 }
311
SetSendDelayInfo(uint32_t sendDelayTime,uint32_t delayMessageId,uint32_t delayTimes,uint32_t skipTimes,std::set<std::string> & delayDevices)312 void VirtualCommunicatorAggregator::SetSendDelayInfo(uint32_t sendDelayTime, uint32_t delayMessageId,
313 uint32_t delayTimes, uint32_t skipTimes, std::set<std::string> &delayDevices)
314 {
315 sendDelayTime_ = sendDelayTime;
316 delayMessageId_ = delayMessageId;
317 delayTimes_ = delayTimes;
318 delayDevices_ = delayDevices;
319 skipTimes_ = skipTimes;
320 }
321
ResetSendDelayInfo()322 void VirtualCommunicatorAggregator::ResetSendDelayInfo()
323 {
324 sendDelayTime_ = 0;
325 delayMessageId_ = INVALID_MESSAGE_ID;
326 delayTimes_ = 0;
327 skipTimes_ = 0;
328 delayDevices_.clear();
329 }
330
DelayTimeHandle(uint32_t messageId,const std::string & dstTarget)331 void VirtualCommunicatorAggregator::DelayTimeHandle(uint32_t messageId, const std::string &dstTarget)
332 {
333 if ((skipTimes_ == 0) && delayTimes_ > 0 && (messageId == delayMessageId_) &&
334 (delayDevices_.count(dstTarget) > 0)) {
335 delayTimes_--;
336 }
337 if (skipTimes_ > 0 && (messageId == delayMessageId_) && (delayDevices_.count(dstTarget) > 0)) {
338 skipTimes_--;
339 }
340 }
341
GetOnlineDevices()342 std::set<std::string> VirtualCommunicatorAggregator::GetOnlineDevices()
343 {
344 std::lock_guard<std::mutex> lock(communicatorsLock_);
345 std::set<std::string> onlineDevices;
346 for (const auto &item: communicators_) {
347 onlineDevices.insert(item.first);
348 }
349 return onlineDevices;
350 }
351
DisableCommunicator()352 void VirtualCommunicatorAggregator::DisableCommunicator()
353 {
354 std::lock_guard<std::mutex> lock(communicatorsLock_);
355 for (const auto &communicator: communicators_) {
356 communicator.second->Disable();
357 }
358 }
359
EnableCommunicator()360 void VirtualCommunicatorAggregator::EnableCommunicator()
361 {
362 std::lock_guard<std::mutex> lock(communicatorsLock_);
363 for (const auto &communicator: communicators_) {
364 communicator.second->Disable();
365 }
366 }
367
RegBeforeDispatch(const std::function<void (const std::string &,const Message *)> & beforeDispatch)368 void VirtualCommunicatorAggregator::RegBeforeDispatch(
369 const std::function<void(const std::string &, const Message *)> &beforeDispatch)
370 {
371 beforeDispatch_ = beforeDispatch;
372 }
373
SetLocalDeviceId(const std::string & deviceId)374 void VirtualCommunicatorAggregator::SetLocalDeviceId(const std::string &deviceId)
375 {
376 std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
377 localDeviceId_ = deviceId;
378 }
379
MockGetLocalDeviceRes(int mockRes)380 void VirtualCommunicatorAggregator::MockGetLocalDeviceRes(int mockRes)
381 {
382 std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
383 getLocalDeviceRet_ = mockRes;
384 }
385
SetAllocCommunicatorCallback(AllocCommunicatorCallback allocCommunicatorCallback)386 void VirtualCommunicatorAggregator::SetAllocCommunicatorCallback(AllocCommunicatorCallback allocCommunicatorCallback)
387 {
388 allocCommunicatorCallback_ = allocCommunicatorCallback;
389 }
390
SetReleaseCommunicatorCallback(ReleaseCommunicatorCallback releaseCommunicatorCallback)391 void VirtualCommunicatorAggregator::SetReleaseCommunicatorCallback(
392 ReleaseCommunicatorCallback releaseCommunicatorCallback)
393 {
394 releaseCommunicatorCallback_ = releaseCommunicatorCallback;
395 }
396
MockCommErrCode(int mockErrCode)397 void VirtualCommunicatorAggregator::MockCommErrCode(int mockErrCode)
398 {
399 std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
400 commErrCodeMock_ = mockErrCode;
401 }
402
MockDirectEndFlag(bool isDirectEnd)403 void VirtualCommunicatorAggregator::MockDirectEndFlag(bool isDirectEnd)
404 {
405 isDirectEnd_ = isDirectEnd;
406 }
407 } // namespace DistributedDB
408