• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "virtual_communicator_aggregator.h"
16 
17 #include <cstdint>
18 #include <thread>
19 #include <utility>
20 
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "runtime_context.h"
25 
26 namespace DistributedDB {
Initialize(IAdapter * inAdapter,const std::shared_ptr<DBStatusAdapter> & statusAdapter)27 int VirtualCommunicatorAggregator::Initialize(IAdapter *inAdapter,
28     const std::shared_ptr<DBStatusAdapter> &statusAdapter)
29 {
30     return E_OK;
31 }
32 
Finalize()33 void VirtualCommunicatorAggregator::Finalize()
34 {
35 }
36 
37 // If not success, return nullptr and set outErrorNo
AllocCommunicator(uint64_t commLabel,int & outErrorNo,const std::string & userId)38 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo,
39     const std::string &userId)
40 {
41     if (isEnable_) {
42         return AllocCommunicator(remoteDeviceId_, outErrorNo);
43     }
44     return nullptr;
45 }
46 
AllocCommunicator(const LabelType & commLabel,int & outErrorNo,const std::string & userId)47 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const LabelType &commLabel, int &outErrorNo,
48     const std::string &userId)
49 {
50     LOGI("[VirtualCommunicatorAggregator][Alloc] Label=%.6s.", VEC_TO_STR(commLabel));
51     if (allocCommunicatorCallback_) {
52         allocCommunicatorCallback_(userId);
53     }
54     if (commLabel.size() != COMM_LABEL_LENGTH) {
55         outErrorNo = -E_INVALID_ARGS;
56         return nullptr;
57     }
58 
59     if (isEnable_) {
60         return AllocCommunicator(remoteDeviceId_, outErrorNo);
61     }
62     return nullptr;
63 }
64 
ReleaseCommunicator(ICommunicator * inCommunicator,const std::string & userId)65 void VirtualCommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator, const std::string &userId)
66 {
67     if (releaseCommunicatorCallback_) {
68         releaseCommunicatorCallback_(userId);
69     }
70     // Called in main thread only
71     VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(inCommunicator);
72     OfflineDevice(communicator->GetDeviceId());
73     {
74         std::lock_guard<std::mutex> lock(communicatorsLock_);
75         communicators_.erase(communicator->GetDeviceId());
76     }
77     RefObject::KillAndDecObjRef(communicator);
78     communicator = nullptr;
79 }
80 
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)81 int VirtualCommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
82     const Finalizer &inOper)
83 {
84     onCommLack_ = onCommLack;
85     return E_OK;
86 }
87 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)88 int VirtualCommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
89 {
90     onConnect_ = onConnect;
91     RunOnConnectCallback("deviceId", true);
92     return E_OK;
93 }
94 
RunCommunicatorLackCallback(const LabelType & commLabel)95 void VirtualCommunicatorAggregator::RunCommunicatorLackCallback(const LabelType &commLabel)
96 {
97     if (onCommLack_) {
98         onCommLack_(commLabel, userId_);
99     }
100 }
101 
RunOnConnectCallback(const std::string & target,bool isConnect)102 void VirtualCommunicatorAggregator::RunOnConnectCallback(const std::string &target, bool isConnect)
103 {
104     if (onConnect_) {
105         onConnect_(target, isConnect);
106     }
107 }
108 
GetLocalIdentity(std::string & outTarget) const109 int VirtualCommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
110 {
111     std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
112     if (localDeviceId_.empty()) {
113         outTarget = "DEVICES_A";
114     } else {
115         outTarget = localDeviceId_;
116     }
117     return getLocalDeviceRet_;
118 }
119 
OnlineDevice(const std::string & deviceId) const120 void VirtualCommunicatorAggregator::OnlineDevice(const std::string &deviceId) const
121 {
122     if (!isEnable_) {
123         return;
124     }
125 
126     // Called in main thread only
127     for (const auto &iter : communicators_) {
128         VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
129         if (iter.first != deviceId) {
130             communicatorTmp->CallbackOnConnect(deviceId, true);
131         }
132     }
133 }
134 
OfflineDevice(const std::string & deviceId) const135 void VirtualCommunicatorAggregator::OfflineDevice(const std::string &deviceId) const
136 {
137     if (!isEnable_) {
138         return;
139     }
140 
141     // Called in main thread only
142     for (const auto &iter : communicators_) {
143         VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
144         if (iter.first != deviceId) {
145             communicatorTmp->CallbackOnConnect(deviceId, false);
146         }
147     }
148 }
149 
AllocCommunicator(const std::string & deviceId,int & outErrorNo)150 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const std::string &deviceId, int &outErrorNo)
151 {
152     // Called in main thread only
153     VirtualCommunicator *communicator = new (std::nothrow) VirtualCommunicator(deviceId, this);
154     if (communicator == nullptr) {
155         outErrorNo = -E_OUT_OF_MEMORY;
156     }
157     {
158         std::lock_guard<std::mutex> lock(communicatorsLock_);
159         communicators_.insert(std::pair<std::string, VirtualCommunicator *>(deviceId, communicator));
160     }
161     OnlineDevice(deviceId);
162     return communicator;
163 }
164 
GetCommunicator(const std::string & deviceId) const165 ICommunicator *VirtualCommunicatorAggregator::GetCommunicator(const std::string &deviceId) const
166 {
167     std::lock_guard<std::mutex> lock(communicatorsLock_);
168     auto iter = communicators_.find(deviceId);
169     if (iter != communicators_.end()) {
170         VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
171         return communicator;
172     }
173     return nullptr;
174 }
175 
DispatchMessage(const std::string & srcTarget,const std::string & dstTarget,const Message * inMsg,const OnSendEnd & onEnd)176 void VirtualCommunicatorAggregator::DispatchMessage(const std::string &srcTarget, const std::string &dstTarget,
177     const Message *inMsg, const OnSendEnd &onEnd)
178 {
179     if (VirtualCommunicatorAggregator::GetBlockValue()) {
180         std::unique_lock<std::mutex> lock(blockLock_);
181         conditionVar_.wait(lock);
182     }
183 
184     if (!isEnable_) {
185         LOGD("[VirtualCommunicatorAggregator] DispatchMessage, VirtualCommunicatorAggregator is disabled");
186         delete inMsg;
187         inMsg = nullptr;
188         return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
189     }
190     if (beforeDispatch_) {
191         beforeDispatch_(dstTarget, inMsg);
192     }
193     DispatchMessageInner(srcTarget, dstTarget, inMsg, onEnd);
194 }
195 
DispatchMessageInner(const std::string & srcTarget,const std::string & dstTarget,const Message * inMsg,const OnSendEnd & onEnd)196 void VirtualCommunicatorAggregator::DispatchMessageInner(const std::string &srcTarget, const std::string &dstTarget,
197     const Message *inMsg, const OnSendEnd &onEnd)
198 {
199     std::lock_guard<std::mutex> lock(communicatorsLock_);
200     auto iter = communicators_.find(dstTarget);
201     if (iter != communicators_.end()) {
202         LOGI("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s", dstTarget.c_str());
203         VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
204         if (!communicator->IsEnabled()) {
205             LOGE("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s disabled", dstTarget.c_str());
206             delete inMsg;
207             inMsg = nullptr;
208             return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
209         }
210         uint32_t messageId = inMsg->GetMessageId();
211         Message *msg = const_cast<Message *>(inMsg);
212         msg->SetTarget(srcTarget);
213         RefObject::IncObjRef(communicator);
214         auto onDispatch = onDispatch_;
215         bool isNeedDelay = ((sendDelayTime_ > 0) && (delayTimes_ > 0) && (messageId == delayMessageId_) &&
216             (delayDevices_.count(dstTarget) > 0) && (skipTimes_ == 0));
217         uint32_t sendDelayTime = sendDelayTime_;
218         std::thread thread([communicator, srcTarget, dstTarget, msg, isNeedDelay, sendDelayTime, onDispatch]() {
219             if (isNeedDelay) {
220                 std::this_thread::sleep_for(std::chrono::milliseconds(sendDelayTime));
221             }
222             if (onDispatch) {
223                 onDispatch(dstTarget, msg);
224             }
225             communicator->CallbackOnMessage(srcTarget, msg);
226             RefObject::DecObjRef(communicator);
227         });
228         DelayTimeHandle(messageId, dstTarget);
229         thread.detach();
230         CallSendEnd(E_OK, onEnd);
231     } else {
232         LOGE("[VirtualCommunicatorAggregator] DispatchMessage, can't find dstTarget %s", dstTarget.c_str());
233         delete inMsg;
234         inMsg = nullptr;
235         CallSendEnd(-E_NOT_FOUND, onEnd);
236     }
237 }
238 
SetBlockValue(bool value)239 void VirtualCommunicatorAggregator::SetBlockValue(bool value)
240 {
241     std::unique_lock<std::mutex> lock(blockLock_);
242     isBlock_ = value;
243     if (!value) {
244         conditionVar_.notify_all();
245     }
246 }
247 
GetBlockValue() const248 bool VirtualCommunicatorAggregator::GetBlockValue() const
249 {
250     return isBlock_;
251 }
252 
Disable()253 void VirtualCommunicatorAggregator::Disable()
254 {
255     isEnable_ = false;
256 }
257 
Enable()258 void VirtualCommunicatorAggregator::Enable()
259 {
260     LOGD("[VirtualCommunicatorAggregator] enable");
261     isEnable_ = true;
262 }
263 
CallSendEnd(int errCode,const OnSendEnd & onEnd)264 void VirtualCommunicatorAggregator::CallSendEnd(int errCode, const OnSendEnd &onEnd)
265 {
266     if (commErrCodeMock_ != E_OK) {
267         errCode = commErrCodeMock_;
268     }
269     if (onEnd) {
270         (void)RuntimeContext::GetInstance()->ScheduleTask([errCode, onEnd, this]() {
271             onEnd(errCode, isDirectEnd_);
272         });
273     }
274 }
275 
RegOnDispatch(const std::function<void (const std::string &,Message * inMsg)> & onDispatch)276 void VirtualCommunicatorAggregator::RegOnDispatch(
277     const std::function<void(const std::string&, Message *inMsg)> &onDispatch)
278 {
279     onDispatch_ = onDispatch;
280 }
281 
SetCurrentUserId(const std::string & userId)282 void VirtualCommunicatorAggregator::SetCurrentUserId(const std::string &userId)
283 {
284     userId_ = userId;
285 }
286 
SetTimeout(const std::string & deviceId,uint32_t timeout)287 void VirtualCommunicatorAggregator::SetTimeout(const std::string &deviceId, uint32_t timeout)
288 {
289     std::lock_guard<std::mutex> lock(communicatorsLock_);
290     if (communicators_.find(deviceId) != communicators_.end()) {
291         communicators_[deviceId]->SetTimeout(timeout);
292     }
293 }
294 
SetDropMessageTypeByDevice(const std::string & deviceId,MessageId msgid,uint32_t dropTimes)295 void VirtualCommunicatorAggregator::SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid,
296     uint32_t dropTimes)
297 {
298     std::lock_guard<std::mutex> lock(communicatorsLock_);
299     if (communicators_.find(deviceId) != communicators_.end()) {
300         communicators_[deviceId]->SetDropMessageTypeByDevice(msgid, dropTimes);
301     }
302 }
303 
SetDeviceMtuSize(const std::string & deviceId,uint32_t mtuSize)304 void VirtualCommunicatorAggregator::SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize)
305 {
306     std::lock_guard<std::mutex> lock(communicatorsLock_);
307     if (communicators_.find(deviceId) != communicators_.end()) {
308         communicators_[deviceId]->SetCommunicatorMtuSize(mtuSize);
309     }
310 }
311 
SetSendDelayInfo(uint32_t sendDelayTime,uint32_t delayMessageId,uint32_t delayTimes,uint32_t skipTimes,std::set<std::string> & delayDevices)312 void VirtualCommunicatorAggregator::SetSendDelayInfo(uint32_t sendDelayTime, uint32_t delayMessageId,
313     uint32_t delayTimes, uint32_t skipTimes, std::set<std::string> &delayDevices)
314 {
315     sendDelayTime_ = sendDelayTime;
316     delayMessageId_ = delayMessageId;
317     delayTimes_ = delayTimes;
318     delayDevices_ = delayDevices;
319     skipTimes_ = skipTimes;
320 }
321 
ResetSendDelayInfo()322 void VirtualCommunicatorAggregator::ResetSendDelayInfo()
323 {
324     sendDelayTime_ = 0;
325     delayMessageId_ = INVALID_MESSAGE_ID;
326     delayTimes_ = 0;
327     skipTimes_ = 0;
328     delayDevices_.clear();
329 }
330 
DelayTimeHandle(uint32_t messageId,const std::string & dstTarget)331 void VirtualCommunicatorAggregator::DelayTimeHandle(uint32_t messageId, const std::string &dstTarget)
332 {
333     if ((skipTimes_ == 0) && delayTimes_ > 0 && (messageId == delayMessageId_) &&
334         (delayDevices_.count(dstTarget) > 0)) {
335         delayTimes_--;
336     }
337     if (skipTimes_ > 0 && (messageId == delayMessageId_) && (delayDevices_.count(dstTarget) > 0)) {
338         skipTimes_--;
339     }
340 }
341 
GetOnlineDevices()342 std::set<std::string> VirtualCommunicatorAggregator::GetOnlineDevices()
343 {
344     std::lock_guard<std::mutex> lock(communicatorsLock_);
345     std::set<std::string> onlineDevices;
346     for (const auto &item: communicators_) {
347         onlineDevices.insert(item.first);
348     }
349     return onlineDevices;
350 }
351 
DisableCommunicator()352 void VirtualCommunicatorAggregator::DisableCommunicator()
353 {
354     std::lock_guard<std::mutex> lock(communicatorsLock_);
355     for (const auto &communicator: communicators_) {
356         communicator.second->Disable();
357     }
358 }
359 
EnableCommunicator()360 void VirtualCommunicatorAggregator::EnableCommunicator()
361 {
362     std::lock_guard<std::mutex> lock(communicatorsLock_);
363     for (const auto &communicator: communicators_) {
364         communicator.second->Disable();
365     }
366 }
367 
RegBeforeDispatch(const std::function<void (const std::string &,const Message *)> & beforeDispatch)368 void VirtualCommunicatorAggregator::RegBeforeDispatch(
369     const std::function<void(const std::string &, const Message *)> &beforeDispatch)
370 {
371     beforeDispatch_ = beforeDispatch;
372 }
373 
SetLocalDeviceId(const std::string & deviceId)374 void VirtualCommunicatorAggregator::SetLocalDeviceId(const std::string &deviceId)
375 {
376     std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
377     localDeviceId_ = deviceId;
378 }
379 
MockGetLocalDeviceRes(int mockRes)380 void VirtualCommunicatorAggregator::MockGetLocalDeviceRes(int mockRes)
381 {
382     std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
383     getLocalDeviceRet_ = mockRes;
384 }
385 
SetAllocCommunicatorCallback(AllocCommunicatorCallback allocCommunicatorCallback)386 void VirtualCommunicatorAggregator::SetAllocCommunicatorCallback(AllocCommunicatorCallback allocCommunicatorCallback)
387 {
388     allocCommunicatorCallback_ = allocCommunicatorCallback;
389 }
390 
SetReleaseCommunicatorCallback(ReleaseCommunicatorCallback releaseCommunicatorCallback)391 void VirtualCommunicatorAggregator::SetReleaseCommunicatorCallback(
392     ReleaseCommunicatorCallback releaseCommunicatorCallback)
393 {
394     releaseCommunicatorCallback_ = releaseCommunicatorCallback;
395 }
396 
MockCommErrCode(int mockErrCode)397 void VirtualCommunicatorAggregator::MockCommErrCode(int mockErrCode)
398 {
399     std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
400     commErrCodeMock_ = mockErrCode;
401 }
402 
MockDirectEndFlag(bool isDirectEnd)403 void VirtualCommunicatorAggregator::MockDirectEndFlag(bool isDirectEnd)
404 {
405     isDirectEnd_ = isDirectEnd;
406 }
407 } // namespace DistributedDB
408