1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "virtual_communicator_aggregator.h"
16
17 #include <cstdint>
18 #include <thread>
19 #include <utility>
20
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "runtime_context.h"
25
26 namespace DistributedDB {
Initialize(IAdapter * inAdapter,const std::shared_ptr<DBStatusAdapter> & statusAdapter)27 int VirtualCommunicatorAggregator::Initialize(IAdapter *inAdapter,
28 const std::shared_ptr<DBStatusAdapter> &statusAdapter)
29 {
30 return E_OK;
31 }
32
Finalize()33 void VirtualCommunicatorAggregator::Finalize()
34 {
35 }
36
37 // If not success, return nullptr and set outErrorNo
AllocCommunicator(uint64_t commLabel,int & outErrorNo)38 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo)
39 {
40 if (isEnable_) {
41 return AllocCommunicator(remoteDeviceId_, outErrorNo);
42 }
43 return nullptr;
44 }
45
AllocCommunicator(const LabelType & commLabel,int & outErrorNo)46 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const LabelType &commLabel, int &outErrorNo)
47 {
48 LOGI("[VirtualCommunicatorAggregator][Alloc] Label=%.6s.", VEC_TO_STR(commLabel));
49 if (commLabel.size() != COMM_LABEL_LENGTH) {
50 outErrorNo = -E_INVALID_ARGS;
51 return nullptr;
52 }
53
54 if (isEnable_) {
55 return AllocCommunicator(remoteDeviceId_, outErrorNo);
56 }
57 return nullptr;
58 }
59
ReleaseCommunicator(ICommunicator * inCommunicator)60 void VirtualCommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator)
61 {
62 // Called in main thread only
63 VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(inCommunicator);
64 OfflineDevice(communicator->GetDeviceId());
65 {
66 std::lock_guard<std::mutex> lock(communicatorsLock_);
67 communicators_.erase(communicator->GetDeviceId());
68 }
69 RefObject::KillAndDecObjRef(communicator);
70 communicator = nullptr;
71 }
72
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)73 int VirtualCommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
74 const Finalizer &inOper)
75 {
76 onCommLack_ = onCommLack;
77 return E_OK;
78 }
79
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)80 int VirtualCommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
81 {
82 onConnect_ = onConnect;
83 RunOnConnectCallback("deviceId", true);
84 return E_OK;
85 }
86
RunCommunicatorLackCallback(const LabelType & commLabel)87 void VirtualCommunicatorAggregator::RunCommunicatorLackCallback(const LabelType &commLabel)
88 {
89 if (onCommLack_) {
90 onCommLack_(commLabel, userId_);
91 }
92 }
93
RunOnConnectCallback(const std::string & target,bool isConnect)94 void VirtualCommunicatorAggregator::RunOnConnectCallback(const std::string &target, bool isConnect)
95 {
96 if (onConnect_) {
97 onConnect_(target, isConnect);
98 }
99 }
100
GetLocalIdentity(std::string & outTarget) const101 int VirtualCommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
102 {
103 std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
104 if (localDeviceId_.empty()) {
105 outTarget = "DEVICES_A";
106 } else {
107 outTarget = localDeviceId_;
108 }
109 return getLocalDeviceRet_;
110 }
111
OnlineDevice(const std::string & deviceId) const112 void VirtualCommunicatorAggregator::OnlineDevice(const std::string &deviceId) const
113 {
114 if (!isEnable_) {
115 return;
116 }
117
118 // Called in main thread only
119 for (const auto &iter : communicators_) {
120 VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
121 if (iter.first != deviceId) {
122 communicatorTmp->CallbackOnConnect(deviceId, true);
123 }
124 }
125 }
126
OfflineDevice(const std::string & deviceId) const127 void VirtualCommunicatorAggregator::OfflineDevice(const std::string &deviceId) const
128 {
129 if (!isEnable_) {
130 return;
131 }
132
133 // Called in main thread only
134 for (const auto &iter : communicators_) {
135 VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
136 if (iter.first != deviceId) {
137 communicatorTmp->CallbackOnConnect(deviceId, false);
138 }
139 }
140 }
141
AllocCommunicator(const std::string & deviceId,int & outErrorNo)142 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const std::string &deviceId, int &outErrorNo)
143 {
144 // Called in main thread only
145 VirtualCommunicator *communicator = new (std::nothrow) VirtualCommunicator(deviceId, this);
146 if (communicator == nullptr) {
147 outErrorNo = -E_OUT_OF_MEMORY;
148 }
149 {
150 std::lock_guard<std::mutex> lock(communicatorsLock_);
151 communicators_.insert(std::pair<std::string, VirtualCommunicator *>(deviceId, communicator));
152 }
153 OnlineDevice(deviceId);
154 return communicator;
155 }
156
GetCommunicator(const std::string & deviceId) const157 ICommunicator *VirtualCommunicatorAggregator::GetCommunicator(const std::string &deviceId) const
158 {
159 std::lock_guard<std::mutex> lock(communicatorsLock_);
160 auto iter = communicators_.find(deviceId);
161 if (iter != communicators_.end()) {
162 VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
163 return communicator;
164 }
165 return nullptr;
166 }
167
DispatchMessage(const std::string & srcTarget,const std::string & dstTarget,const Message * inMsg,const OnSendEnd & onEnd)168 void VirtualCommunicatorAggregator::DispatchMessage(const std::string &srcTarget, const std::string &dstTarget,
169 const Message *inMsg, const OnSendEnd &onEnd)
170 {
171 if (VirtualCommunicatorAggregator::GetBlockValue()) {
172 std::unique_lock<std::mutex> lock(blockLock_);
173 conditionVar_.wait(lock);
174 }
175
176 if (!isEnable_) {
177 LOGD("[VirtualCommunicatorAggregator] DispatchMessage, VirtualCommunicatorAggregator is disabled");
178 delete inMsg;
179 inMsg = nullptr;
180 return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
181 }
182 if (beforeDispatch_) {
183 beforeDispatch_(dstTarget, inMsg);
184 }
185 DispatchMessageInner(srcTarget, dstTarget, inMsg, onEnd);
186 }
187
DispatchMessageInner(const std::string & srcTarget,const std::string & dstTarget,const Message * inMsg,const OnSendEnd & onEnd)188 void VirtualCommunicatorAggregator::DispatchMessageInner(const std::string &srcTarget, const std::string &dstTarget,
189 const Message *inMsg, const OnSendEnd &onEnd)
190 {
191 std::lock_guard<std::mutex> lock(communicatorsLock_);
192 auto iter = communicators_.find(dstTarget);
193 if (iter != communicators_.end()) {
194 LOGI("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s", dstTarget.c_str());
195 VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
196 if (!communicator->IsEnabled()) {
197 LOGE("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s disabled", dstTarget.c_str());
198 delete inMsg;
199 inMsg = nullptr;
200 return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
201 }
202 uint32_t messageId = inMsg->GetMessageId();
203 Message *msg = const_cast<Message *>(inMsg);
204 msg->SetTarget(srcTarget);
205 RefObject::IncObjRef(communicator);
206 auto onDispatch = onDispatch_;
207 bool isNeedDelay = ((sendDelayTime_ > 0) && (delayTimes_ > 0) && (messageId == delayMessageId_) &&
208 (delayDevices_.count(dstTarget) > 0) && (skipTimes_ == 0));
209 uint32_t sendDelayTime = sendDelayTime_;
210 std::thread thread([communicator, srcTarget, dstTarget, msg, isNeedDelay, sendDelayTime, onDispatch]() {
211 if (isNeedDelay) {
212 std::this_thread::sleep_for(std::chrono::milliseconds(sendDelayTime));
213 }
214 if (onDispatch) {
215 onDispatch(dstTarget, msg);
216 }
217 communicator->CallbackOnMessage(srcTarget, msg);
218 RefObject::DecObjRef(communicator);
219 });
220 DelayTimeHandle(messageId, dstTarget);
221 thread.detach();
222 CallSendEnd(E_OK, onEnd);
223 } else {
224 LOGE("[VirtualCommunicatorAggregator] DispatchMessage, can't find dstTarget %s", dstTarget.c_str());
225 delete inMsg;
226 inMsg = nullptr;
227 CallSendEnd(-E_NOT_FOUND, onEnd);
228 }
229 }
230
SetBlockValue(bool value)231 void VirtualCommunicatorAggregator::SetBlockValue(bool value)
232 {
233 std::unique_lock<std::mutex> lock(blockLock_);
234 isBlock_ = value;
235 if (!value) {
236 conditionVar_.notify_all();
237 }
238 }
239
GetBlockValue() const240 bool VirtualCommunicatorAggregator::GetBlockValue() const
241 {
242 return isBlock_;
243 }
244
Disable()245 void VirtualCommunicatorAggregator::Disable()
246 {
247 isEnable_ = false;
248 }
249
Enable()250 void VirtualCommunicatorAggregator::Enable()
251 {
252 LOGD("[VirtualCommunicatorAggregator] enable");
253 isEnable_ = true;
254 }
255
CallSendEnd(int errCode,const OnSendEnd & onEnd)256 void VirtualCommunicatorAggregator::CallSendEnd(int errCode, const OnSendEnd &onEnd)
257 {
258 if (onEnd) {
259 (void)RuntimeContext::GetInstance()->ScheduleTask([errCode, onEnd]() {
260 onEnd(errCode);
261 });
262 }
263 }
264
RegOnDispatch(const std::function<void (const std::string &,Message * inMsg)> & onDispatch)265 void VirtualCommunicatorAggregator::RegOnDispatch(
266 const std::function<void(const std::string&, Message *inMsg)> &onDispatch)
267 {
268 onDispatch_ = onDispatch;
269 }
270
SetCurrentUserId(const std::string & userId)271 void VirtualCommunicatorAggregator::SetCurrentUserId(const std::string &userId)
272 {
273 userId_ = userId;
274 }
275
SetTimeout(const std::string & deviceId,uint32_t timeout)276 void VirtualCommunicatorAggregator::SetTimeout(const std::string &deviceId, uint32_t timeout)
277 {
278 std::lock_guard<std::mutex> lock(communicatorsLock_);
279 if (communicators_.find(deviceId) != communicators_.end()) {
280 communicators_[deviceId]->SetTimeout(timeout);
281 }
282 }
283
SetDropMessageTypeByDevice(const std::string & deviceId,MessageId msgid,uint32_t dropTimes)284 void VirtualCommunicatorAggregator::SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid,
285 uint32_t dropTimes)
286 {
287 std::lock_guard<std::mutex> lock(communicatorsLock_);
288 if (communicators_.find(deviceId) != communicators_.end()) {
289 communicators_[deviceId]->SetDropMessageTypeByDevice(msgid, dropTimes);
290 }
291 }
292
SetDeviceMtuSize(const std::string & deviceId,uint32_t mtuSize)293 void VirtualCommunicatorAggregator::SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize)
294 {
295 std::lock_guard<std::mutex> lock(communicatorsLock_);
296 if (communicators_.find(deviceId) != communicators_.end()) {
297 communicators_[deviceId]->SetCommunicatorMtuSize(mtuSize);
298 }
299 }
300
SetSendDelayInfo(uint32_t sendDelayTime,uint32_t delayMessageId,uint32_t delayTimes,uint32_t skipTimes,std::set<std::string> & delayDevices)301 void VirtualCommunicatorAggregator::SetSendDelayInfo(uint32_t sendDelayTime, uint32_t delayMessageId,
302 uint32_t delayTimes, uint32_t skipTimes, std::set<std::string> &delayDevices)
303 {
304 sendDelayTime_ = sendDelayTime;
305 delayMessageId_ = delayMessageId;
306 delayTimes_ = delayTimes;
307 delayDevices_ = delayDevices;
308 skipTimes_ = skipTimes;
309 }
310
ResetSendDelayInfo()311 void VirtualCommunicatorAggregator::ResetSendDelayInfo()
312 {
313 sendDelayTime_ = 0;
314 delayMessageId_ = INVALID_MESSAGE_ID;
315 delayTimes_ = 0;
316 skipTimes_ = 0;
317 delayDevices_.clear();
318 }
319
DelayTimeHandle(uint32_t messageId,const std::string & dstTarget)320 void VirtualCommunicatorAggregator::DelayTimeHandle(uint32_t messageId, const std::string &dstTarget)
321 {
322 if ((skipTimes_ == 0) && delayTimes_ > 0 && (messageId == delayMessageId_) &&
323 (delayDevices_.count(dstTarget) > 0)) {
324 delayTimes_--;
325 }
326 if (skipTimes_ > 0 && (messageId == delayMessageId_) && (delayDevices_.count(dstTarget) > 0)) {
327 skipTimes_--;
328 }
329 }
330
GetOnlineDevices()331 std::set<std::string> VirtualCommunicatorAggregator::GetOnlineDevices()
332 {
333 std::lock_guard<std::mutex> lock(communicatorsLock_);
334 std::set<std::string> onlineDevices;
335 for (const auto &item: communicators_) {
336 onlineDevices.insert(item.first);
337 }
338 return onlineDevices;
339 }
340
DisableCommunicator()341 void VirtualCommunicatorAggregator::DisableCommunicator()
342 {
343 std::lock_guard<std::mutex> lock(communicatorsLock_);
344 for (const auto &communicator: communicators_) {
345 communicator.second->Disable();
346 }
347 }
348
EnableCommunicator()349 void VirtualCommunicatorAggregator::EnableCommunicator()
350 {
351 std::lock_guard<std::mutex> lock(communicatorsLock_);
352 for (const auto &communicator: communicators_) {
353 communicator.second->Disable();
354 }
355 }
356
RegBeforeDispatch(const std::function<void (const std::string &,const Message *)> & beforeDispatch)357 void VirtualCommunicatorAggregator::RegBeforeDispatch(
358 const std::function<void(const std::string &, const Message *)> &beforeDispatch)
359 {
360 beforeDispatch_ = beforeDispatch;
361 }
362
SetLocalDeviceId(const std::string & deviceId)363 void VirtualCommunicatorAggregator::SetLocalDeviceId(const std::string &deviceId)
364 {
365 std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
366 localDeviceId_ = deviceId;
367 }
368
MockGetLocalDeviceRes(int mockRes)369 void VirtualCommunicatorAggregator::MockGetLocalDeviceRes(int mockRes)
370 {
371 std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
372 getLocalDeviceRet_ = mockRes;
373 }
374 } // namespace DistributedDB
375