• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "virtual_communicator_aggregator.h"
16 
17 #include <cstdint>
18 #include <thread>
19 #include <utility>
20 
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "runtime_context.h"
25 
26 namespace DistributedDB {
Initialize(IAdapter * inAdapter)27 int VirtualCommunicatorAggregator::Initialize(IAdapter *inAdapter)
28 {
29     return E_OK;
30 }
31 
Finalize()32 void VirtualCommunicatorAggregator::Finalize()
33 {
34 }
35 
36 // If not success, return nullptr and set outErrorNo
AllocCommunicator(uint64_t commLabel,int & outErrorNo)37 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo)
38 {
39     if (isEnable_) {
40         return AllocCommunicator(remoteDeviceId_, outErrorNo);
41     }
42     return nullptr;
43 }
44 
AllocCommunicator(const LabelType & commLabel,int & outErrorNo)45 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const LabelType &commLabel, int &outErrorNo)
46 {
47     LOGI("[VirtualCommunicatorAggregator][Alloc] Label=%.6s.", VEC_TO_STR(commLabel));
48     if (commLabel.size() != COMM_LABEL_LENGTH) {
49         outErrorNo = -E_INVALID_ARGS;
50         return nullptr;
51     }
52 
53     if (isEnable_) {
54         return AllocCommunicator(remoteDeviceId_, outErrorNo);
55     }
56     return nullptr;
57 }
58 
ReleaseCommunicator(ICommunicator * inCommunicator)59 void VirtualCommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator)
60 {
61     // Called in main thread only
62     VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(inCommunicator);
63     OfflineDevice(communicator->GetDeviceId());
64     {
65         std::lock_guard<std::mutex> lock(communicatorsLock_);
66         communicators_.erase(communicator->GetDeviceId());
67     }
68     RefObject::KillAndDecObjRef(communicator);
69     communicator = nullptr;
70 }
71 
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)72 int VirtualCommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
73     const Finalizer &inOper)
74 {
75     onCommLack_ = onCommLack;
76     return E_OK;
77 }
78 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)79 int VirtualCommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
80 {
81     onConnect_ = onConnect;
82     RunOnConnectCallback("deviceId", true);
83     return E_OK;
84 }
85 
RunCommunicatorLackCallback(const LabelType & commLabel)86 void VirtualCommunicatorAggregator::RunCommunicatorLackCallback(const LabelType &commLabel)
87 {
88     if (onCommLack_) {
89         std::string userId;
90         onCommLack_(commLabel, userId_);
91     }
92 }
93 
RunOnConnectCallback(const std::string & target,bool isConnect)94 void VirtualCommunicatorAggregator::RunOnConnectCallback(const std::string &target, bool isConnect)
95 {
96     if (onConnect_) {
97         onConnect_(target, isConnect);
98     }
99 }
100 
GetLocalIdentity(std::string & outTarget) const101 int VirtualCommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
102 {
103     outTarget = "DEVICES_A";
104     return E_OK;
105 }
106 
OnlineDevice(const std::string & deviceId) const107 void VirtualCommunicatorAggregator::OnlineDevice(const std::string &deviceId) const
108 {
109     if (!isEnable_) {
110         return;
111     }
112 
113     // Called in main thread only
114     for (const auto &iter : communicators_) {
115         VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
116         if (iter.first != deviceId) {
117             communicatorTmp->CallbackOnConnect(deviceId, true);
118         }
119     }
120 }
121 
OfflineDevice(const std::string & deviceId) const122 void VirtualCommunicatorAggregator::OfflineDevice(const std::string &deviceId) const
123 {
124     if (!isEnable_) {
125         return;
126     }
127 
128     // Called in main thread only
129     for (const auto &iter : communicators_) {
130         VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
131         if (iter.first != deviceId) {
132             communicatorTmp->CallbackOnConnect(deviceId, false);
133         }
134     }
135 }
136 
AllocCommunicator(const std::string & deviceId,int & outErrorNo)137 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const std::string &deviceId, int &outErrorNo)
138 {
139     // Called in main thread only
140     VirtualCommunicator *communicator = new (std::nothrow) VirtualCommunicator(deviceId, this);
141     if (communicator == nullptr) {
142         outErrorNo = -E_OUT_OF_MEMORY;
143     }
144     {
145         std::lock_guard<std::mutex> lock(communicatorsLock_);
146         communicators_.insert(std::pair<std::string, VirtualCommunicator *>(deviceId, communicator));
147     }
148     OnlineDevice(deviceId);
149     return communicator;
150 }
151 
GetCommunicator(const std::string & deviceId) const152 ICommunicator *VirtualCommunicatorAggregator::GetCommunicator(const std::string &deviceId) const
153 {
154     std::lock_guard<std::mutex> lock(communicatorsLock_);
155     auto iter = communicators_.find(deviceId);
156     if (iter != communicators_.end()) {
157         VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
158         return communicator;
159     }
160     return nullptr;
161 }
162 
DispatchMessage(const std::string & srcTarget,const std::string & dstTarget,const Message * inMsg,const OnSendEnd & onEnd)163 void VirtualCommunicatorAggregator::DispatchMessage(const std::string &srcTarget, const std::string &dstTarget,
164     const Message *inMsg, const OnSendEnd &onEnd)
165 {
166     if (VirtualCommunicatorAggregator::GetBlockValue()) {
167         std::unique_lock<std::mutex> lock(blockLock_);
168         conditionVar_.wait(lock);
169     }
170 
171     if (!isEnable_) {
172         LOGD("[VirtualCommunicatorAggregator] DispatchMessage, VirtualCommunicatorAggregator is disabled");
173         delete inMsg;
174         inMsg = nullptr;
175         return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
176     }
177     std::lock_guard<std::mutex> lock(communicatorsLock_);
178     auto iter = communicators_.find(dstTarget);
179     if (iter != communicators_.end()) {
180         LOGI("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s", dstTarget.c_str());
181         VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
182         if (!communicator->IsEnabled()) {
183             LOGE("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s disabled", dstTarget.c_str());
184             delete inMsg;
185             inMsg = nullptr;
186             return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
187         }
188         uint32_t messageId = inMsg->GetMessageId();
189         Message *msg = const_cast<Message *>(inMsg);
190         msg->SetTarget(srcTarget);
191         RefObject::IncObjRef(communicator);
192         auto onDispatch = onDispatch_;
193         bool isNeedDelay = ((sendDelayTime_ > 0) && (delayTimes_ > 0) && (messageId == delayMessageId_) &&
194             (delayDevices_.count(dstTarget) > 0) && (skipTimes_ == 0));
195         uint32_t sendDelayTime = sendDelayTime_;
196         std::thread thread([communicator, srcTarget, dstTarget, msg, isNeedDelay, sendDelayTime, onDispatch]() {
197             if (isNeedDelay) {
198                 std::this_thread::sleep_for(std::chrono::milliseconds(sendDelayTime));
199             }
200             if (onDispatch) {
201                 onDispatch(dstTarget, msg);
202             }
203             communicator->CallbackOnMessage(srcTarget, msg);
204             RefObject::DecObjRef(communicator);
205         });
206         DelayTimeHandle(messageId, dstTarget);
207         thread.detach();
208         CallSendEnd(E_OK, onEnd);
209     } else {
210         LOGE("[VirtualCommunicatorAggregator] DispatchMessage, can't find dstTarget %s", dstTarget.c_str());
211         delete inMsg;
212         inMsg = nullptr;
213         CallSendEnd(-E_NOT_FOUND, onEnd);
214     }
215 }
216 
SetBlockValue(bool value)217 void VirtualCommunicatorAggregator::SetBlockValue(bool value)
218 {
219     std::unique_lock<std::mutex> lock(blockLock_);
220     isBlock_ = value;
221     if (!value) {
222         conditionVar_.notify_all();
223     }
224 }
225 
GetBlockValue() const226 bool VirtualCommunicatorAggregator::GetBlockValue() const
227 {
228     return isBlock_;
229 }
230 
Disable()231 void VirtualCommunicatorAggregator::Disable()
232 {
233     isEnable_ = false;
234 }
235 
Enable()236 void VirtualCommunicatorAggregator::Enable()
237 {
238     LOGD("[VirtualCommunicatorAggregator] enable");
239     isEnable_ = true;
240 }
241 
CallSendEnd(int errCode,const OnSendEnd & onEnd)242 void VirtualCommunicatorAggregator::CallSendEnd(int errCode, const OnSendEnd &onEnd)
243 {
244     if (onEnd) {
245         (void)RuntimeContext::GetInstance()->ScheduleTask([errCode, onEnd]() {
246             onEnd(errCode);
247         });
248     }
249 }
250 
RegOnDispatch(const std::function<void (const std::string &,Message * inMsg)> & onDispatch)251 void VirtualCommunicatorAggregator::RegOnDispatch(
252     const std::function<void(const std::string&, Message *inMsg)> &onDispatch)
253 {
254     onDispatch_ = onDispatch;
255 }
256 
SetCurrentUserId(const std::string & userId)257 void VirtualCommunicatorAggregator::SetCurrentUserId(const std::string &userId)
258 {
259     userId_ = userId;
260 }
261 
SetTimeout(const std::string & deviceId,uint32_t timeout)262 void VirtualCommunicatorAggregator::SetTimeout(const std::string &deviceId, uint32_t timeout)
263 {
264     std::lock_guard<std::mutex> lock(communicatorsLock_);
265     if (communicators_.find(deviceId) != communicators_.end()) {
266         communicators_[deviceId]->SetTimeout(timeout);
267     }
268 }
269 
SetDropMessageTypeByDevice(const std::string & deviceId,MessageId msgid,uint32_t dropTimes)270 void VirtualCommunicatorAggregator::SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid,
271     uint32_t dropTimes)
272 {
273     std::lock_guard<std::mutex> lock(communicatorsLock_);
274     if (communicators_.find(deviceId) != communicators_.end()) {
275         communicators_[deviceId]->SetDropMessageTypeByDevice(msgid, dropTimes);
276     }
277 }
278 
SetDeviceMtuSize(const std::string & deviceId,uint32_t mtuSize)279 void VirtualCommunicatorAggregator::SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize)
280 {
281     std::lock_guard<std::mutex> lock(communicatorsLock_);
282     if (communicators_.find(deviceId) != communicators_.end()) {
283         communicators_[deviceId]->SetCommunicatorMtuSize(mtuSize);
284     }
285 }
286 
SetSendDelayInfo(uint32_t sendDelayTime,uint32_t delayMessageId,uint32_t delayTimes,uint32_t skipTimes,std::set<std::string> & delayDevices)287 void VirtualCommunicatorAggregator::SetSendDelayInfo(uint32_t sendDelayTime, uint32_t delayMessageId,
288     uint32_t delayTimes, uint32_t skipTimes, std::set<std::string> &delayDevices)
289 {
290     sendDelayTime_ = sendDelayTime;
291     delayMessageId_ = delayMessageId;
292     delayTimes_ = delayTimes;
293     delayDevices_ = delayDevices;
294     skipTimes_ = skipTimes;
295 }
296 
ResetSendDelayInfo()297 void VirtualCommunicatorAggregator::ResetSendDelayInfo()
298 {
299     sendDelayTime_ = 0;
300     delayMessageId_ = INVALID_MESSAGE_ID;
301     delayTimes_ = 0;
302     skipTimes_ = 0;
303     delayDevices_.clear();
304 }
305 
DelayTimeHandle(uint32_t messageId,const std::string & dstTarget)306 void VirtualCommunicatorAggregator::DelayTimeHandle(uint32_t messageId, const std::string &dstTarget)
307 {
308     if ((skipTimes_ == 0) && delayTimes_ > 0 && (messageId == delayMessageId_) &&
309         (delayDevices_.count(dstTarget) > 0)) {
310         delayTimes_--;
311     }
312     if (skipTimes_ > 0 && (messageId == delayMessageId_) && (delayDevices_.count(dstTarget) > 0)) {
313         skipTimes_--;
314     }
315 }
316 } // namespace DistributedDB
317