• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "virtual_communicator_aggregator.h"
16 
17 #include <cstdint>
18 #include <thread>
19 #include <utility>
20 
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "runtime_context.h"
25 
26 namespace DistributedDB {
Initialize(IAdapter * inAdapter,const std::shared_ptr<DBStatusAdapter> & statusAdapter)27 int VirtualCommunicatorAggregator::Initialize(IAdapter *inAdapter,
28     const std::shared_ptr<DBStatusAdapter> &statusAdapter)
29 {
30     return E_OK;
31 }
32 
Finalize()33 void VirtualCommunicatorAggregator::Finalize()
34 {
35 }
36 
37 // If not success, return nullptr and set outErrorNo
AllocCommunicator(uint64_t commLabel,int & outErrorNo,const std::string & userId)38 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo,
39     const std::string &userId)
40 {
41     if (isEnable_) {
42         return AllocCommunicator(remoteDeviceId_, outErrorNo);
43     }
44     return nullptr;
45 }
46 
AllocCommunicator(const LabelType & commLabel,int & outErrorNo,const std::string & userId)47 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const LabelType &commLabel, int &outErrorNo,
48     const std::string &userId)
49 {
50     LOGI("[VirtualCommunicatorAggregator][Alloc] Label=%.6s.", VEC_TO_STR(commLabel));
51     if (allocCommunicatorCallback_) {
52         allocCommunicatorCallback_(userId);
53     }
54     if (commLabel.size() != COMM_LABEL_LENGTH) {
55         outErrorNo = -E_INVALID_ARGS;
56         return nullptr;
57     }
58 
59     if (isEnable_) {
60         return AllocCommunicator(remoteDeviceId_, outErrorNo);
61     }
62     return nullptr;
63 }
64 
ReleaseCommunicator(ICommunicator * inCommunicator,const std::string & userId)65 void VirtualCommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator, const std::string &userId)
66 {
67     if (releaseCommunicatorCallback_) {
68         releaseCommunicatorCallback_(userId);
69     }
70     // Called in main thread only
71     VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(inCommunicator);
72     OfflineDevice(communicator->GetDeviceId());
73     {
74         std::lock_guard<std::mutex> lock(communicatorsLock_);
75         communicators_.erase(communicator->GetDeviceId());
76     }
77     RefObject::KillAndDecObjRef(communicator);
78     communicator = nullptr;
79 }
80 
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)81 int VirtualCommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
82     const Finalizer &inOper)
83 {
84     onCommLack_ = onCommLack;
85     return E_OK;
86 }
87 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)88 int VirtualCommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
89 {
90     onConnect_ = onConnect;
91     RunOnConnectCallback("deviceId", true);
92     return E_OK;
93 }
94 
RunCommunicatorLackCallback(const LabelType & commLabel)95 void VirtualCommunicatorAggregator::RunCommunicatorLackCallback(const LabelType &commLabel)
96 {
97     if (onCommLack_) {
98         onCommLack_(commLabel, userId_);
99     }
100 }
101 
RunOnConnectCallback(const std::string & target,bool isConnect)102 void VirtualCommunicatorAggregator::RunOnConnectCallback(const std::string &target, bool isConnect)
103 {
104     if (onConnect_) {
105         onConnect_(target, isConnect);
106     }
107 }
108 
GetLocalIdentity(std::string & outTarget) const109 int VirtualCommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
110 {
111     // no work with using virtual communicator
112     std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
113     if (localDeviceId_.empty()) {
114         outTarget = "DEVICES_A";
115     } else {
116         outTarget = localDeviceId_;
117     }
118     return getLocalDeviceRet_;
119 }
120 
OnlineDevice(const std::string & deviceId) const121 void VirtualCommunicatorAggregator::OnlineDevice(const std::string &deviceId) const
122 {
123     if (!isEnable_) {
124         return;
125     }
126 
127     // Called in main thread only
128     for (const auto &iter : communicators_) {
129         VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
130         if (iter.first != deviceId) {
131             communicatorTmp->CallbackOnConnect(deviceId, true);
132         }
133     }
134 }
135 
OfflineDevice(const std::string & deviceId) const136 void VirtualCommunicatorAggregator::OfflineDevice(const std::string &deviceId) const
137 {
138     if (!isEnable_) {
139         return;
140     }
141 
142     // Called in main thread only
143     for (const auto &iter : communicators_) {
144         VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
145         if (iter.first != deviceId) {
146             communicatorTmp->CallbackOnConnect(deviceId, false);
147         }
148     }
149 }
150 
AllocCommunicator(const std::string & deviceId,int & outErrorNo)151 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const std::string &deviceId, int &outErrorNo)
152 {
153     // Called in main thread only
154     VirtualCommunicator *communicator = new (std::nothrow) VirtualCommunicator(deviceId, this);
155     if (communicator == nullptr) {
156         outErrorNo = -E_OUT_OF_MEMORY;
157     }
158     {
159         std::lock_guard<std::mutex> lock(communicatorsLock_);
160         communicators_.insert(std::pair<std::string, VirtualCommunicator *>(deviceId, communicator));
161     }
162     OnlineDevice(deviceId);
163     return communicator;
164 }
165 
GetCommunicator(const std::string & deviceId) const166 ICommunicator *VirtualCommunicatorAggregator::GetCommunicator(const std::string &deviceId) const
167 {
168     std::lock_guard<std::mutex> lock(communicatorsLock_);
169     auto iter = communicators_.find(deviceId);
170     if (iter != communicators_.end()) {
171         VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
172         return communicator;
173     }
174     return nullptr;
175 }
176 
DispatchMessage(const std::string & srcTarget,const std::string & dstTarget,const Message * inMsg,const OnSendEnd & onEnd)177 void VirtualCommunicatorAggregator::DispatchMessage(const std::string &srcTarget, const std::string &dstTarget,
178     const Message *inMsg, const OnSendEnd &onEnd)
179 {
180     if (VirtualCommunicatorAggregator::GetBlockValue()) {
181         std::unique_lock<std::mutex> lock(blockLock_);
182         conditionVar_.wait(lock);
183     }
184 
185     if (!isEnable_) {
186         LOGD("[VirtualCommunicatorAggregator] DispatchMessage, VirtualCommunicatorAggregator is disabled");
187         delete inMsg;
188         inMsg = nullptr;
189         return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
190     }
191     if (beforeDispatch_) {
192         beforeDispatch_(dstTarget, inMsg);
193     }
194     DispatchMessageInner(srcTarget, dstTarget, inMsg, onEnd);
195 }
196 
DispatchMessageInner(const std::string & srcTarget,const std::string & dstTarget,const Message * inMsg,const OnSendEnd & onEnd)197 void VirtualCommunicatorAggregator::DispatchMessageInner(const std::string &srcTarget, const std::string &dstTarget,
198     const Message *inMsg, const OnSendEnd &onEnd)
199 {
200     std::lock_guard<std::mutex> lock(communicatorsLock_);
201     auto iter = communicators_.find(dstTarget);
202     if (iter != communicators_.end()) {
203         LOGI("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s", dstTarget.c_str());
204         VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
205         if (!communicator->IsEnabled()) {
206             LOGE("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s disabled", dstTarget.c_str());
207             delete inMsg;
208             inMsg = nullptr;
209             return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
210         }
211         uint32_t messageId = inMsg->GetMessageId();
212         Message *msg = const_cast<Message *>(inMsg);
213         msg->SetTarget(srcTarget);
214         msg->SetSenderUserId(communicator->GetTargetUserId({}));
215         RefObject::IncObjRef(communicator);
216         auto onDispatch = onDispatch_;
217         bool isNeedDelay = ((sendDelayTime_ > 0) && (delayTimes_ > 0) && (messageId == delayMessageId_) &&
218             (delayDevices_.count(dstTarget) > 0) && (skipTimes_ == 0));
219         uint32_t sendDelayTime = sendDelayTime_;
220         std::thread thread([communicator, srcTarget, dstTarget, msg, isNeedDelay, sendDelayTime, onDispatch]() {
221             if (isNeedDelay) {
222                 std::this_thread::sleep_for(std::chrono::milliseconds(sendDelayTime));
223             }
224             if (onDispatch) {
225                 onDispatch(dstTarget, msg);
226             }
227             communicator->CallbackOnMessage(srcTarget, msg);
228             RefObject::DecObjRef(communicator);
229         });
230         DelayTimeHandle(messageId, dstTarget);
231         thread.detach();
232         CallSendEnd(E_OK, onEnd);
233     } else {
234         LOGE("[VirtualCommunicatorAggregator] DispatchMessage, can't find dstTarget %s", dstTarget.c_str());
235         delete inMsg;
236         inMsg = nullptr;
237         CallSendEnd(-E_NOT_FOUND, onEnd);
238     }
239 }
240 
SetBlockValue(bool value)241 void VirtualCommunicatorAggregator::SetBlockValue(bool value)
242 {
243     std::unique_lock<std::mutex> lock(blockLock_);
244     isBlock_ = value;
245     if (!value) {
246         conditionVar_.notify_all();
247     }
248 }
249 
GetBlockValue() const250 bool VirtualCommunicatorAggregator::GetBlockValue() const
251 {
252     return isBlock_;
253 }
254 
Disable()255 void VirtualCommunicatorAggregator::Disable()
256 {
257     isEnable_ = false;
258 }
259 
Enable()260 void VirtualCommunicatorAggregator::Enable()
261 {
262     LOGD("[VirtualCommunicatorAggregator] enable");
263     isEnable_ = true;
264 }
265 
CallSendEnd(int errCode,const OnSendEnd & onEnd)266 void VirtualCommunicatorAggregator::CallSendEnd(int errCode, const OnSendEnd &onEnd)
267 {
268     if (commErrCodeMock_ != E_OK) {
269         errCode = commErrCodeMock_;
270     }
271     if (onEnd) {
272         (void)RuntimeContext::GetInstance()->ScheduleTask([errCode, onEnd, this]() {
273             onEnd(errCode, isDirectEnd_);
274         });
275     }
276 }
277 
RegOnDispatch(const std::function<void (const std::string &,Message * inMsg)> & onDispatch)278 void VirtualCommunicatorAggregator::RegOnDispatch(
279     const std::function<void(const std::string&, Message *inMsg)> &onDispatch)
280 {
281     onDispatch_ = onDispatch;
282 }
283 
SetCurrentUserId(const std::string & userId)284 void VirtualCommunicatorAggregator::SetCurrentUserId(const std::string &userId)
285 {
286     userId_ = userId;
287 }
288 
SetTimeout(const std::string & deviceId,uint32_t timeout)289 void VirtualCommunicatorAggregator::SetTimeout(const std::string &deviceId, uint32_t timeout)
290 {
291     std::lock_guard<std::mutex> lock(communicatorsLock_);
292     if (communicators_.find(deviceId) != communicators_.end()) {
293         communicators_[deviceId]->SetTimeout(timeout);
294     }
295 }
296 
SetDropMessageTypeByDevice(const std::string & deviceId,MessageId msgid,uint32_t dropTimes)297 void VirtualCommunicatorAggregator::SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid,
298     uint32_t dropTimes)
299 {
300     std::lock_guard<std::mutex> lock(communicatorsLock_);
301     if (communicators_.find(deviceId) != communicators_.end()) {
302         communicators_[deviceId]->SetDropMessageTypeByDevice(msgid, dropTimes);
303     }
304 }
305 
SetDeviceMtuSize(const std::string & deviceId,uint32_t mtuSize)306 void VirtualCommunicatorAggregator::SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize)
307 {
308     std::lock_guard<std::mutex> lock(communicatorsLock_);
309     if (communicators_.find(deviceId) != communicators_.end()) {
310         communicators_[deviceId]->SetCommunicatorMtuSize(mtuSize);
311     }
312 }
313 
SetSendDelayInfo(uint32_t sendDelayTime,uint32_t delayMessageId,uint32_t delayTimes,uint32_t skipTimes,std::set<std::string> & delayDevices)314 void VirtualCommunicatorAggregator::SetSendDelayInfo(uint32_t sendDelayTime, uint32_t delayMessageId,
315     uint32_t delayTimes, uint32_t skipTimes, std::set<std::string> &delayDevices)
316 {
317     sendDelayTime_ = sendDelayTime;
318     delayMessageId_ = delayMessageId;
319     delayTimes_ = delayTimes;
320     delayDevices_ = delayDevices;
321     skipTimes_ = skipTimes;
322 }
323 
ResetSendDelayInfo()324 void VirtualCommunicatorAggregator::ResetSendDelayInfo()
325 {
326     sendDelayTime_ = 0;
327     delayMessageId_ = INVALID_MESSAGE_ID;
328     delayTimes_ = 0;
329     skipTimes_ = 0;
330     delayDevices_.clear();
331 }
332 
DelayTimeHandle(uint32_t messageId,const std::string & dstTarget)333 void VirtualCommunicatorAggregator::DelayTimeHandle(uint32_t messageId, const std::string &dstTarget)
334 {
335     if ((skipTimes_ == 0) && delayTimes_ > 0 && (messageId == delayMessageId_) &&
336         (delayDevices_.count(dstTarget) > 0)) {
337         delayTimes_--;
338     }
339     if (skipTimes_ > 0 && (messageId == delayMessageId_) && (delayDevices_.count(dstTarget) > 0)) {
340         skipTimes_--;
341     }
342 }
343 
GetOnlineDevices()344 std::set<std::string> VirtualCommunicatorAggregator::GetOnlineDevices()
345 {
346     std::lock_guard<std::mutex> lock(communicatorsLock_);
347     std::set<std::string> onlineDevices;
348     for (const auto &item: communicators_) {
349         onlineDevices.insert(item.first);
350     }
351     return onlineDevices;
352 }
353 
DisableCommunicator()354 void VirtualCommunicatorAggregator::DisableCommunicator()
355 {
356     std::lock_guard<std::mutex> lock(communicatorsLock_);
357     for (const auto &communicator: communicators_) {
358         communicator.second->Disable();
359     }
360 }
361 
EnableCommunicator()362 void VirtualCommunicatorAggregator::EnableCommunicator()
363 {
364     std::lock_guard<std::mutex> lock(communicatorsLock_);
365     for (const auto &communicator: communicators_) {
366         communicator.second->Disable();
367     }
368 }
369 
RegBeforeDispatch(const std::function<void (const std::string &,const Message *)> & beforeDispatch)370 void VirtualCommunicatorAggregator::RegBeforeDispatch(
371     const std::function<void(const std::string &, const Message *)> &beforeDispatch)
372 {
373     beforeDispatch_ = beforeDispatch;
374 }
375 
SetLocalDeviceId(const std::string & deviceId)376 void VirtualCommunicatorAggregator::SetLocalDeviceId(const std::string &deviceId)
377 {
378     std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
379     localDeviceId_ = deviceId;
380 }
381 
MockGetLocalDeviceRes(int mockRes)382 void VirtualCommunicatorAggregator::MockGetLocalDeviceRes(int mockRes)
383 {
384     std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
385     getLocalDeviceRet_ = mockRes;
386 }
387 
SetAllocCommunicatorCallback(AllocCommunicatorCallback allocCommunicatorCallback)388 void VirtualCommunicatorAggregator::SetAllocCommunicatorCallback(AllocCommunicatorCallback allocCommunicatorCallback)
389 {
390     allocCommunicatorCallback_ = allocCommunicatorCallback;
391 }
392 
SetReleaseCommunicatorCallback(ReleaseCommunicatorCallback releaseCommunicatorCallback)393 void VirtualCommunicatorAggregator::SetReleaseCommunicatorCallback(
394     ReleaseCommunicatorCallback releaseCommunicatorCallback)
395 {
396     releaseCommunicatorCallback_ = releaseCommunicatorCallback;
397 }
398 
MockCommErrCode(int mockErrCode)399 void VirtualCommunicatorAggregator::MockCommErrCode(int mockErrCode)
400 {
401     std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
402     commErrCodeMock_ = mockErrCode;
403 }
404 
MockDirectEndFlag(bool isDirectEnd)405 void VirtualCommunicatorAggregator::MockDirectEndFlag(bool isDirectEnd)
406 {
407     isDirectEnd_ = isDirectEnd;
408 }
409 
ClearOnlineLabel()410 void VirtualCommunicatorAggregator::ClearOnlineLabel()
411 {
412 }
413 
SetRemoteDeviceId(const std::string & dev)414 void VirtualCommunicatorAggregator::SetRemoteDeviceId(const std::string &dev)
415 {
416     std::lock_guard<std::mutex> autoLock(communicatorsLock_);
417     remoteDeviceId_ = dev;
418     LOGI("[VirtualCommunicatorAggregator] Set dev %s", dev.c_str());
419 }
420 
GetAllSendMsgSize() const421 uint64_t VirtualCommunicatorAggregator::GetAllSendMsgSize() const
422 {
423     uint64_t size = 0;
424     std::lock_guard<std::mutex> lock(communicatorsLock_);
425     for (const auto &communicator : communicators_) {
426         size += communicator.second->GetSendMsgSize();
427     }
428     return size;
429 }
430 } // namespace DistributedDB
431