1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "virtual_communicator_aggregator.h"
16
17 #include <cstdint>
18 #include <thread>
19 #include <utility>
20
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "runtime_context.h"
25
26 namespace DistributedDB {
Initialize(IAdapter * inAdapter,const std::shared_ptr<DBStatusAdapter> & statusAdapter)27 int VirtualCommunicatorAggregator::Initialize(IAdapter *inAdapter,
28 const std::shared_ptr<DBStatusAdapter> &statusAdapter)
29 {
30 return E_OK;
31 }
32
Finalize()33 void VirtualCommunicatorAggregator::Finalize()
34 {
35 }
36
37 // If not success, return nullptr and set outErrorNo
AllocCommunicator(uint64_t commLabel,int & outErrorNo,const std::string & userId)38 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo,
39 const std::string &userId)
40 {
41 if (isEnable_) {
42 return AllocCommunicator(remoteDeviceId_, outErrorNo);
43 }
44 return nullptr;
45 }
46
AllocCommunicator(const LabelType & commLabel,int & outErrorNo,const std::string & userId)47 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const LabelType &commLabel, int &outErrorNo,
48 const std::string &userId)
49 {
50 LOGI("[VirtualCommunicatorAggregator][Alloc] Label=%.6s.", VEC_TO_STR(commLabel));
51 if (allocCommunicatorCallback_) {
52 allocCommunicatorCallback_(userId);
53 }
54 if (commLabel.size() != COMM_LABEL_LENGTH) {
55 outErrorNo = -E_INVALID_ARGS;
56 return nullptr;
57 }
58
59 if (isEnable_) {
60 return AllocCommunicator(remoteDeviceId_, outErrorNo);
61 }
62 return nullptr;
63 }
64
ReleaseCommunicator(ICommunicator * inCommunicator,const std::string & userId)65 void VirtualCommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator, const std::string &userId)
66 {
67 if (releaseCommunicatorCallback_) {
68 releaseCommunicatorCallback_(userId);
69 }
70 // Called in main thread only
71 VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(inCommunicator);
72 OfflineDevice(communicator->GetDeviceId());
73 {
74 std::lock_guard<std::mutex> lock(communicatorsLock_);
75 communicators_.erase(communicator->GetDeviceId());
76 }
77 RefObject::KillAndDecObjRef(communicator);
78 communicator = nullptr;
79 }
80
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)81 int VirtualCommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
82 const Finalizer &inOper)
83 {
84 onCommLack_ = onCommLack;
85 return E_OK;
86 }
87
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)88 int VirtualCommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
89 {
90 onConnect_ = onConnect;
91 RunOnConnectCallback("deviceId", true);
92 return E_OK;
93 }
94
RunCommunicatorLackCallback(const LabelType & commLabel)95 void VirtualCommunicatorAggregator::RunCommunicatorLackCallback(const LabelType &commLabel)
96 {
97 if (onCommLack_) {
98 onCommLack_(commLabel, userId_);
99 }
100 }
101
RunOnConnectCallback(const std::string & target,bool isConnect)102 void VirtualCommunicatorAggregator::RunOnConnectCallback(const std::string &target, bool isConnect)
103 {
104 if (onConnect_) {
105 onConnect_(target, isConnect);
106 }
107 }
108
GetLocalIdentity(std::string & outTarget) const109 int VirtualCommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
110 {
111 // no work with using virtual communicator
112 std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
113 if (localDeviceId_.empty()) {
114 outTarget = "DEVICES_A";
115 } else {
116 outTarget = localDeviceId_;
117 }
118 return getLocalDeviceRet_;
119 }
120
OnlineDevice(const std::string & deviceId) const121 void VirtualCommunicatorAggregator::OnlineDevice(const std::string &deviceId) const
122 {
123 if (!isEnable_) {
124 return;
125 }
126
127 // Called in main thread only
128 for (const auto &iter : communicators_) {
129 VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
130 if (iter.first != deviceId) {
131 communicatorTmp->CallbackOnConnect(deviceId, true);
132 }
133 }
134 }
135
OfflineDevice(const std::string & deviceId) const136 void VirtualCommunicatorAggregator::OfflineDevice(const std::string &deviceId) const
137 {
138 if (!isEnable_) {
139 return;
140 }
141
142 // Called in main thread only
143 for (const auto &iter : communicators_) {
144 VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
145 if (iter.first != deviceId) {
146 communicatorTmp->CallbackOnConnect(deviceId, false);
147 }
148 }
149 }
150
AllocCommunicator(const std::string & deviceId,int & outErrorNo)151 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const std::string &deviceId, int &outErrorNo)
152 {
153 // Called in main thread only
154 VirtualCommunicator *communicator = new (std::nothrow) VirtualCommunicator(deviceId, this);
155 if (communicator == nullptr) {
156 outErrorNo = -E_OUT_OF_MEMORY;
157 }
158 {
159 std::lock_guard<std::mutex> lock(communicatorsLock_);
160 communicators_.insert(std::pair<std::string, VirtualCommunicator *>(deviceId, communicator));
161 }
162 OnlineDevice(deviceId);
163 return communicator;
164 }
165
GetCommunicator(const std::string & deviceId) const166 ICommunicator *VirtualCommunicatorAggregator::GetCommunicator(const std::string &deviceId) const
167 {
168 std::lock_guard<std::mutex> lock(communicatorsLock_);
169 auto iter = communicators_.find(deviceId);
170 if (iter != communicators_.end()) {
171 VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
172 return communicator;
173 }
174 return nullptr;
175 }
176
DispatchMessage(const std::string & srcTarget,const std::string & dstTarget,const Message * inMsg,const OnSendEnd & onEnd)177 void VirtualCommunicatorAggregator::DispatchMessage(const std::string &srcTarget, const std::string &dstTarget,
178 const Message *inMsg, const OnSendEnd &onEnd)
179 {
180 if (VirtualCommunicatorAggregator::GetBlockValue()) {
181 std::unique_lock<std::mutex> lock(blockLock_);
182 conditionVar_.wait(lock);
183 }
184
185 if (!isEnable_) {
186 LOGD("[VirtualCommunicatorAggregator] DispatchMessage, VirtualCommunicatorAggregator is disabled");
187 delete inMsg;
188 inMsg = nullptr;
189 return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
190 }
191 if (beforeDispatch_) {
192 beforeDispatch_(dstTarget, inMsg);
193 }
194 DispatchMessageInner(srcTarget, dstTarget, inMsg, onEnd);
195 }
196
DispatchMessageInner(const std::string & srcTarget,const std::string & dstTarget,const Message * inMsg,const OnSendEnd & onEnd)197 void VirtualCommunicatorAggregator::DispatchMessageInner(const std::string &srcTarget, const std::string &dstTarget,
198 const Message *inMsg, const OnSendEnd &onEnd)
199 {
200 std::lock_guard<std::mutex> lock(communicatorsLock_);
201 auto iter = communicators_.find(dstTarget);
202 if (iter != communicators_.end()) {
203 LOGI("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s", dstTarget.c_str());
204 VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
205 if (!communicator->IsEnabled()) {
206 LOGE("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s disabled", dstTarget.c_str());
207 delete inMsg;
208 inMsg = nullptr;
209 return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
210 }
211 uint32_t messageId = inMsg->GetMessageId();
212 Message *msg = const_cast<Message *>(inMsg);
213 msg->SetTarget(srcTarget);
214 msg->SetSenderUserId(communicator->GetTargetUserId({}));
215 RefObject::IncObjRef(communicator);
216 auto onDispatch = onDispatch_;
217 bool isNeedDelay = ((sendDelayTime_ > 0) && (delayTimes_ > 0) && (messageId == delayMessageId_) &&
218 (delayDevices_.count(dstTarget) > 0) && (skipTimes_ == 0));
219 uint32_t sendDelayTime = sendDelayTime_;
220 std::thread thread([communicator, srcTarget, dstTarget, msg, isNeedDelay, sendDelayTime, onDispatch]() {
221 if (isNeedDelay) {
222 std::this_thread::sleep_for(std::chrono::milliseconds(sendDelayTime));
223 }
224 if (onDispatch) {
225 onDispatch(dstTarget, msg);
226 }
227 communicator->CallbackOnMessage(srcTarget, msg);
228 RefObject::DecObjRef(communicator);
229 });
230 DelayTimeHandle(messageId, dstTarget);
231 thread.detach();
232 CallSendEnd(E_OK, onEnd);
233 } else {
234 LOGE("[VirtualCommunicatorAggregator] DispatchMessage, can't find dstTarget %s", dstTarget.c_str());
235 delete inMsg;
236 inMsg = nullptr;
237 CallSendEnd(-E_NOT_FOUND, onEnd);
238 }
239 }
240
SetBlockValue(bool value)241 void VirtualCommunicatorAggregator::SetBlockValue(bool value)
242 {
243 std::unique_lock<std::mutex> lock(blockLock_);
244 isBlock_ = value;
245 if (!value) {
246 conditionVar_.notify_all();
247 }
248 }
249
GetBlockValue() const250 bool VirtualCommunicatorAggregator::GetBlockValue() const
251 {
252 return isBlock_;
253 }
254
Disable()255 void VirtualCommunicatorAggregator::Disable()
256 {
257 isEnable_ = false;
258 }
259
Enable()260 void VirtualCommunicatorAggregator::Enable()
261 {
262 LOGD("[VirtualCommunicatorAggregator] enable");
263 isEnable_ = true;
264 }
265
CallSendEnd(int errCode,const OnSendEnd & onEnd)266 void VirtualCommunicatorAggregator::CallSendEnd(int errCode, const OnSendEnd &onEnd)
267 {
268 if (commErrCodeMock_ != E_OK) {
269 errCode = commErrCodeMock_;
270 }
271 if (onEnd) {
272 (void)RuntimeContext::GetInstance()->ScheduleTask([errCode, onEnd, this]() {
273 onEnd(errCode, isDirectEnd_);
274 });
275 }
276 }
277
RegOnDispatch(const std::function<void (const std::string &,Message * inMsg)> & onDispatch)278 void VirtualCommunicatorAggregator::RegOnDispatch(
279 const std::function<void(const std::string&, Message *inMsg)> &onDispatch)
280 {
281 onDispatch_ = onDispatch;
282 }
283
SetCurrentUserId(const std::string & userId)284 void VirtualCommunicatorAggregator::SetCurrentUserId(const std::string &userId)
285 {
286 userId_ = userId;
287 }
288
SetTimeout(const std::string & deviceId,uint32_t timeout)289 void VirtualCommunicatorAggregator::SetTimeout(const std::string &deviceId, uint32_t timeout)
290 {
291 std::lock_guard<std::mutex> lock(communicatorsLock_);
292 if (communicators_.find(deviceId) != communicators_.end()) {
293 communicators_[deviceId]->SetTimeout(timeout);
294 }
295 }
296
SetDropMessageTypeByDevice(const std::string & deviceId,MessageId msgid,uint32_t dropTimes)297 void VirtualCommunicatorAggregator::SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid,
298 uint32_t dropTimes)
299 {
300 std::lock_guard<std::mutex> lock(communicatorsLock_);
301 if (communicators_.find(deviceId) != communicators_.end()) {
302 communicators_[deviceId]->SetDropMessageTypeByDevice(msgid, dropTimes);
303 }
304 }
305
SetDeviceMtuSize(const std::string & deviceId,uint32_t mtuSize)306 void VirtualCommunicatorAggregator::SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize)
307 {
308 std::lock_guard<std::mutex> lock(communicatorsLock_);
309 if (communicators_.find(deviceId) != communicators_.end()) {
310 communicators_[deviceId]->SetCommunicatorMtuSize(mtuSize);
311 }
312 }
313
SetSendDelayInfo(uint32_t sendDelayTime,uint32_t delayMessageId,uint32_t delayTimes,uint32_t skipTimes,std::set<std::string> & delayDevices)314 void VirtualCommunicatorAggregator::SetSendDelayInfo(uint32_t sendDelayTime, uint32_t delayMessageId,
315 uint32_t delayTimes, uint32_t skipTimes, std::set<std::string> &delayDevices)
316 {
317 sendDelayTime_ = sendDelayTime;
318 delayMessageId_ = delayMessageId;
319 delayTimes_ = delayTimes;
320 delayDevices_ = delayDevices;
321 skipTimes_ = skipTimes;
322 }
323
ResetSendDelayInfo()324 void VirtualCommunicatorAggregator::ResetSendDelayInfo()
325 {
326 sendDelayTime_ = 0;
327 delayMessageId_ = INVALID_MESSAGE_ID;
328 delayTimes_ = 0;
329 skipTimes_ = 0;
330 delayDevices_.clear();
331 }
332
DelayTimeHandle(uint32_t messageId,const std::string & dstTarget)333 void VirtualCommunicatorAggregator::DelayTimeHandle(uint32_t messageId, const std::string &dstTarget)
334 {
335 if ((skipTimes_ == 0) && delayTimes_ > 0 && (messageId == delayMessageId_) &&
336 (delayDevices_.count(dstTarget) > 0)) {
337 delayTimes_--;
338 }
339 if (skipTimes_ > 0 && (messageId == delayMessageId_) && (delayDevices_.count(dstTarget) > 0)) {
340 skipTimes_--;
341 }
342 }
343
GetOnlineDevices()344 std::set<std::string> VirtualCommunicatorAggregator::GetOnlineDevices()
345 {
346 std::lock_guard<std::mutex> lock(communicatorsLock_);
347 std::set<std::string> onlineDevices;
348 for (const auto &item: communicators_) {
349 onlineDevices.insert(item.first);
350 }
351 return onlineDevices;
352 }
353
DisableCommunicator()354 void VirtualCommunicatorAggregator::DisableCommunicator()
355 {
356 std::lock_guard<std::mutex> lock(communicatorsLock_);
357 for (const auto &communicator: communicators_) {
358 communicator.second->Disable();
359 }
360 }
361
EnableCommunicator()362 void VirtualCommunicatorAggregator::EnableCommunicator()
363 {
364 std::lock_guard<std::mutex> lock(communicatorsLock_);
365 for (const auto &communicator: communicators_) {
366 communicator.second->Disable();
367 }
368 }
369
RegBeforeDispatch(const std::function<void (const std::string &,const Message *)> & beforeDispatch)370 void VirtualCommunicatorAggregator::RegBeforeDispatch(
371 const std::function<void(const std::string &, const Message *)> &beforeDispatch)
372 {
373 beforeDispatch_ = beforeDispatch;
374 }
375
SetLocalDeviceId(const std::string & deviceId)376 void VirtualCommunicatorAggregator::SetLocalDeviceId(const std::string &deviceId)
377 {
378 std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
379 localDeviceId_ = deviceId;
380 }
381
MockGetLocalDeviceRes(int mockRes)382 void VirtualCommunicatorAggregator::MockGetLocalDeviceRes(int mockRes)
383 {
384 std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
385 getLocalDeviceRet_ = mockRes;
386 }
387
SetAllocCommunicatorCallback(AllocCommunicatorCallback allocCommunicatorCallback)388 void VirtualCommunicatorAggregator::SetAllocCommunicatorCallback(AllocCommunicatorCallback allocCommunicatorCallback)
389 {
390 allocCommunicatorCallback_ = allocCommunicatorCallback;
391 }
392
SetReleaseCommunicatorCallback(ReleaseCommunicatorCallback releaseCommunicatorCallback)393 void VirtualCommunicatorAggregator::SetReleaseCommunicatorCallback(
394 ReleaseCommunicatorCallback releaseCommunicatorCallback)
395 {
396 releaseCommunicatorCallback_ = releaseCommunicatorCallback;
397 }
398
MockCommErrCode(int mockErrCode)399 void VirtualCommunicatorAggregator::MockCommErrCode(int mockErrCode)
400 {
401 std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
402 commErrCodeMock_ = mockErrCode;
403 }
404
MockDirectEndFlag(bool isDirectEnd)405 void VirtualCommunicatorAggregator::MockDirectEndFlag(bool isDirectEnd)
406 {
407 isDirectEnd_ = isDirectEnd;
408 }
409
ClearOnlineLabel()410 void VirtualCommunicatorAggregator::ClearOnlineLabel()
411 {
412 }
413
SetRemoteDeviceId(const std::string & dev)414 void VirtualCommunicatorAggregator::SetRemoteDeviceId(const std::string &dev)
415 {
416 std::lock_guard<std::mutex> autoLock(communicatorsLock_);
417 remoteDeviceId_ = dev;
418 LOGI("[VirtualCommunicatorAggregator] Set dev %s", dev.c_str());
419 }
420
GetAllSendMsgSize() const421 uint64_t VirtualCommunicatorAggregator::GetAllSendMsgSize() const
422 {
423 uint64_t size = 0;
424 std::lock_guard<std::mutex> lock(communicatorsLock_);
425 for (const auto &communicator : communicators_) {
426 size += communicator.second->GetSendMsgSize();
427 }
428 return size;
429 }
430 } // namespace DistributedDB
431