• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "virtual_communicator_aggregator.h"
16 
17 #include <cstdint>
18 #include <thread>
19 #include <utility>
20 
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "runtime_context.h"
25 
26 namespace DistributedDB {
Initialize(IAdapter * inAdapter)27 int VirtualCommunicatorAggregator::Initialize(IAdapter *inAdapter)
28 {
29     return E_OK;
30 }
31 
Finalize()32 void VirtualCommunicatorAggregator::Finalize()
33 {
34 }
35 
36 // If not success, return nullptr and set outErrorNo
AllocCommunicator(uint64_t commLabel,int & outErrorNo)37 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo)
38 {
39     if (isEnable_) {
40         return AllocCommunicator(remoteDeviceId_, outErrorNo);
41     }
42     return nullptr;
43 }
44 
AllocCommunicator(const LabelType & commLabel,int & outErrorNo)45 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const LabelType &commLabel, int &outErrorNo)
46 {
47     LOGI("[VirtualCommunicatorAggregator][Alloc] Label=%.6s.", VEC_TO_STR(commLabel));
48     if (commLabel.size() != COMM_LABEL_LENGTH) {
49         outErrorNo = -E_INVALID_ARGS;
50         return nullptr;
51     }
52 
53     if (isEnable_) {
54         return AllocCommunicator(remoteDeviceId_, outErrorNo);
55     }
56     return nullptr;
57 }
58 
ReleaseCommunicator(ICommunicator * inCommunicator)59 void VirtualCommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator)
60 {
61     // Called in main thread only
62     VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(inCommunicator);
63     OfflineDevice(communicator->GetDeviceId());
64     {
65         std::lock_guard<std::mutex> lock(communicatorsLock_);
66         communicators_.erase(communicator->GetDeviceId());
67     }
68     RefObject::KillAndDecObjRef(communicator);
69     communicator = nullptr;
70 }
71 
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)72 int VirtualCommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
73     const Finalizer &inOper)
74 {
75     onCommLack_ = onCommLack;
76     return E_OK;
77 }
78 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)79 int VirtualCommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
80 {
81     onConnect_ = onConnect;
82     RunOnConnectCallback("deviceId", true);
83     return E_OK;
84 }
85 
RunCommunicatorLackCallback(const LabelType & commLabel)86 void VirtualCommunicatorAggregator::RunCommunicatorLackCallback(const LabelType &commLabel)
87 {
88     if (onCommLack_) {
89         onCommLack_(commLabel, userId_);
90     }
91 }
92 
RunOnConnectCallback(const std::string & target,bool isConnect)93 void VirtualCommunicatorAggregator::RunOnConnectCallback(const std::string &target, bool isConnect)
94 {
95     if (onConnect_) {
96         onConnect_(target, isConnect);
97     }
98 }
99 
GetLocalIdentity(std::string & outTarget) const100 int VirtualCommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
101 {
102     outTarget = "DEVICES_A";
103     return E_OK;
104 }
105 
OnlineDevice(const std::string & deviceId) const106 void VirtualCommunicatorAggregator::OnlineDevice(const std::string &deviceId) const
107 {
108     if (!isEnable_) {
109         return;
110     }
111 
112     // Called in main thread only
113     for (const auto &iter : communicators_) {
114         VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
115         if (iter.first != deviceId) {
116             communicatorTmp->CallbackOnConnect(deviceId, true);
117         }
118     }
119 }
120 
OfflineDevice(const std::string & deviceId) const121 void VirtualCommunicatorAggregator::OfflineDevice(const std::string &deviceId) const
122 {
123     if (!isEnable_) {
124         return;
125     }
126 
127     // Called in main thread only
128     for (const auto &iter : communicators_) {
129         VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
130         if (iter.first != deviceId) {
131             communicatorTmp->CallbackOnConnect(deviceId, false);
132         }
133     }
134 }
135 
AllocCommunicator(const std::string & deviceId,int & outErrorNo)136 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const std::string &deviceId, int &outErrorNo)
137 {
138     // Called in main thread only
139     VirtualCommunicator *communicator = new (std::nothrow) VirtualCommunicator(deviceId, this);
140     if (communicator == nullptr) {
141         outErrorNo = -E_OUT_OF_MEMORY;
142     }
143     {
144         std::lock_guard<std::mutex> lock(communicatorsLock_);
145         communicators_.insert(std::pair<std::string, VirtualCommunicator *>(deviceId, communicator));
146     }
147     OnlineDevice(deviceId);
148     return communicator;
149 }
150 
GetCommunicator(const std::string & deviceId) const151 ICommunicator *VirtualCommunicatorAggregator::GetCommunicator(const std::string &deviceId) const
152 {
153     std::lock_guard<std::mutex> lock(communicatorsLock_);
154     auto iter = communicators_.find(deviceId);
155     if (iter != communicators_.end()) {
156         VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
157         return communicator;
158     }
159     return nullptr;
160 }
161 
DispatchMessage(const std::string & srcTarget,const std::string & dstTarget,const Message * inMsg,const OnSendEnd & onEnd)162 void VirtualCommunicatorAggregator::DispatchMessage(const std::string &srcTarget, const std::string &dstTarget,
163     const Message *inMsg, const OnSendEnd &onEnd)
164 {
165     if (VirtualCommunicatorAggregator::GetBlockValue()) {
166         std::unique_lock<std::mutex> lock(blockLock_);
167         conditionVar_.wait(lock);
168     }
169 
170     if (!isEnable_) {
171         LOGD("[VirtualCommunicatorAggregator] DispatchMessage, VirtualCommunicatorAggregator is disabled");
172         delete inMsg;
173         inMsg = nullptr;
174         return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
175     }
176     std::lock_guard<std::mutex> lock(communicatorsLock_);
177     auto iter = communicators_.find(dstTarget);
178     if (iter != communicators_.end()) {
179         LOGI("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s", dstTarget.c_str());
180         VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
181         if (!communicator->IsEnabled()) {
182             LOGE("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s disabled", dstTarget.c_str());
183             delete inMsg;
184             inMsg = nullptr;
185             return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
186         }
187         uint32_t messageId = inMsg->GetMessageId();
188         Message *msg = const_cast<Message *>(inMsg);
189         msg->SetTarget(srcTarget);
190         RefObject::IncObjRef(communicator);
191         auto onDispatch = onDispatch_;
192         bool isNeedDelay = ((sendDelayTime_ > 0) && (delayTimes_ > 0) && (messageId == delayMessageId_) &&
193             (delayDevices_.count(dstTarget) > 0) && (skipTimes_ == 0));
194         uint32_t sendDelayTime = sendDelayTime_;
195         std::thread thread([communicator, srcTarget, dstTarget, msg, isNeedDelay, sendDelayTime, onDispatch]() {
196             if (isNeedDelay) {
197                 std::this_thread::sleep_for(std::chrono::milliseconds(sendDelayTime));
198             }
199             if (onDispatch) {
200                 onDispatch(dstTarget, msg);
201             }
202             communicator->CallbackOnMessage(srcTarget, msg);
203             RefObject::DecObjRef(communicator);
204         });
205         DelayTimeHandle(messageId, dstTarget);
206         thread.detach();
207         CallSendEnd(E_OK, onEnd);
208     } else {
209         LOGE("[VirtualCommunicatorAggregator] DispatchMessage, can't find dstTarget %s", dstTarget.c_str());
210         delete inMsg;
211         inMsg = nullptr;
212         CallSendEnd(-E_NOT_FOUND, onEnd);
213     }
214 }
215 
SetBlockValue(bool value)216 void VirtualCommunicatorAggregator::SetBlockValue(bool value)
217 {
218     std::unique_lock<std::mutex> lock(blockLock_);
219     isBlock_ = value;
220     if (!value) {
221         conditionVar_.notify_all();
222     }
223 }
224 
GetBlockValue() const225 bool VirtualCommunicatorAggregator::GetBlockValue() const
226 {
227     return isBlock_;
228 }
229 
Disable()230 void VirtualCommunicatorAggregator::Disable()
231 {
232     isEnable_ = false;
233 }
234 
Enable()235 void VirtualCommunicatorAggregator::Enable()
236 {
237     LOGD("[VirtualCommunicatorAggregator] enable");
238     isEnable_ = true;
239 }
240 
CallSendEnd(int errCode,const OnSendEnd & onEnd)241 void VirtualCommunicatorAggregator::CallSendEnd(int errCode, const OnSendEnd &onEnd)
242 {
243     if (onEnd) {
244         (void)RuntimeContext::GetInstance()->ScheduleTask([errCode, onEnd]() {
245             onEnd(errCode);
246         });
247     }
248 }
249 
RegOnDispatch(const std::function<void (const std::string &,Message * inMsg)> & onDispatch)250 void VirtualCommunicatorAggregator::RegOnDispatch(
251     const std::function<void(const std::string&, Message *inMsg)> &onDispatch)
252 {
253     onDispatch_ = onDispatch;
254 }
255 
SetCurrentUserId(const std::string & userId)256 void VirtualCommunicatorAggregator::SetCurrentUserId(const std::string &userId)
257 {
258     userId_ = userId;
259 }
260 
SetTimeout(const std::string & deviceId,uint32_t timeout)261 void VirtualCommunicatorAggregator::SetTimeout(const std::string &deviceId, uint32_t timeout)
262 {
263     std::lock_guard<std::mutex> lock(communicatorsLock_);
264     if (communicators_.find(deviceId) != communicators_.end()) {
265         communicators_[deviceId]->SetTimeout(timeout);
266     }
267 }
268 
SetDropMessageTypeByDevice(const std::string & deviceId,MessageId msgid,uint32_t dropTimes)269 void VirtualCommunicatorAggregator::SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid,
270     uint32_t dropTimes)
271 {
272     std::lock_guard<std::mutex> lock(communicatorsLock_);
273     if (communicators_.find(deviceId) != communicators_.end()) {
274         communicators_[deviceId]->SetDropMessageTypeByDevice(msgid, dropTimes);
275     }
276 }
277 
SetDeviceMtuSize(const std::string & deviceId,uint32_t mtuSize)278 void VirtualCommunicatorAggregator::SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize)
279 {
280     std::lock_guard<std::mutex> lock(communicatorsLock_);
281     if (communicators_.find(deviceId) != communicators_.end()) {
282         communicators_[deviceId]->SetCommunicatorMtuSize(mtuSize);
283     }
284 }
285 
SetSendDelayInfo(uint32_t sendDelayTime,uint32_t delayMessageId,uint32_t delayTimes,uint32_t skipTimes,std::set<std::string> & delayDevices)286 void VirtualCommunicatorAggregator::SetSendDelayInfo(uint32_t sendDelayTime, uint32_t delayMessageId,
287     uint32_t delayTimes, uint32_t skipTimes, std::set<std::string> &delayDevices)
288 {
289     sendDelayTime_ = sendDelayTime;
290     delayMessageId_ = delayMessageId;
291     delayTimes_ = delayTimes;
292     delayDevices_ = delayDevices;
293     skipTimes_ = skipTimes;
294 }
295 
ResetSendDelayInfo()296 void VirtualCommunicatorAggregator::ResetSendDelayInfo()
297 {
298     sendDelayTime_ = 0;
299     delayMessageId_ = INVALID_MESSAGE_ID;
300     delayTimes_ = 0;
301     skipTimes_ = 0;
302     delayDevices_.clear();
303 }
304 
DelayTimeHandle(uint32_t messageId,const std::string & dstTarget)305 void VirtualCommunicatorAggregator::DelayTimeHandle(uint32_t messageId, const std::string &dstTarget)
306 {
307     if ((skipTimes_ == 0) && delayTimes_ > 0 && (messageId == delayMessageId_) &&
308         (delayDevices_.count(dstTarget) > 0)) {
309         delayTimes_--;
310     }
311     if (skipTimes_ > 0 && (messageId == delayMessageId_) && (delayDevices_.count(dstTarget) > 0)) {
312         skipTimes_--;
313     }
314 }
315 
GetOnlineDevices()316 std::set<std::string> VirtualCommunicatorAggregator::GetOnlineDevices()
317 {
318     std::lock_guard<std::mutex> lock(communicatorsLock_);
319     std::set<std::string> onlineDevices;
320     for (const auto &item: communicators_) {
321         onlineDevices.insert(item.first);
322     }
323     return onlineDevices;
324 }
325 
DisableCommunicator()326 void VirtualCommunicatorAggregator::DisableCommunicator()
327 {
328     std::lock_guard<std::mutex> lock(communicatorsLock_);
329     for (const auto &communicator: communicators_) {
330         communicator.second->Disable();
331     }
332 }
333 
EnableCommunicator()334 void VirtualCommunicatorAggregator::EnableCommunicator()
335 {
336     std::lock_guard<std::mutex> lock(communicatorsLock_);
337     for (const auto &communicator: communicators_) {
338         communicator.second->Disable();
339     }
340 }
341 } // namespace DistributedDB
342