1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "virtual_communicator_aggregator.h"
16
17 #include <cstdint>
18 #include <thread>
19 #include <utility>
20
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "runtime_context.h"
25
26 namespace DistributedDB {
Initialize(IAdapter * inAdapter)27 int VirtualCommunicatorAggregator::Initialize(IAdapter *inAdapter)
28 {
29 return E_OK;
30 }
31
Finalize()32 void VirtualCommunicatorAggregator::Finalize()
33 {
34 }
35
36 // If not success, return nullptr and set outErrorNo
AllocCommunicator(uint64_t commLabel,int & outErrorNo)37 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo)
38 {
39 if (isEnable_) {
40 return AllocCommunicator(remoteDeviceId_, outErrorNo);
41 }
42 return nullptr;
43 }
44
AllocCommunicator(const LabelType & commLabel,int & outErrorNo)45 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const LabelType &commLabel, int &outErrorNo)
46 {
47 LOGI("[VirtualCommunicatorAggregator][Alloc] Label=%.6s.", VEC_TO_STR(commLabel));
48 if (commLabel.size() != COMM_LABEL_LENGTH) {
49 outErrorNo = -E_INVALID_ARGS;
50 return nullptr;
51 }
52
53 if (isEnable_) {
54 return AllocCommunicator(remoteDeviceId_, outErrorNo);
55 }
56 return nullptr;
57 }
58
ReleaseCommunicator(ICommunicator * inCommunicator)59 void VirtualCommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator)
60 {
61 // Called in main thread only
62 VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(inCommunicator);
63 OfflineDevice(communicator->GetDeviceId());
64 {
65 std::lock_guard<std::mutex> lock(communicatorsLock_);
66 communicators_.erase(communicator->GetDeviceId());
67 }
68 RefObject::KillAndDecObjRef(communicator);
69 communicator = nullptr;
70 }
71
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)72 int VirtualCommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
73 const Finalizer &inOper)
74 {
75 onCommLack_ = onCommLack;
76 return E_OK;
77 }
78
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)79 int VirtualCommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
80 {
81 onConnect_ = onConnect;
82 RunOnConnectCallback("deviceId", true);
83 return E_OK;
84 }
85
RunCommunicatorLackCallback(const LabelType & commLabel)86 void VirtualCommunicatorAggregator::RunCommunicatorLackCallback(const LabelType &commLabel)
87 {
88 if (onCommLack_) {
89 onCommLack_(commLabel, userId_);
90 }
91 }
92
RunOnConnectCallback(const std::string & target,bool isConnect)93 void VirtualCommunicatorAggregator::RunOnConnectCallback(const std::string &target, bool isConnect)
94 {
95 if (onConnect_) {
96 onConnect_(target, isConnect);
97 }
98 }
99
GetLocalIdentity(std::string & outTarget) const100 int VirtualCommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
101 {
102 outTarget = "DEVICES_A";
103 return E_OK;
104 }
105
OnlineDevice(const std::string & deviceId) const106 void VirtualCommunicatorAggregator::OnlineDevice(const std::string &deviceId) const
107 {
108 if (!isEnable_) {
109 return;
110 }
111
112 // Called in main thread only
113 for (const auto &iter : communicators_) {
114 VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
115 if (iter.first != deviceId) {
116 communicatorTmp->CallbackOnConnect(deviceId, true);
117 }
118 }
119 }
120
OfflineDevice(const std::string & deviceId) const121 void VirtualCommunicatorAggregator::OfflineDevice(const std::string &deviceId) const
122 {
123 if (!isEnable_) {
124 return;
125 }
126
127 // Called in main thread only
128 for (const auto &iter : communicators_) {
129 VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
130 if (iter.first != deviceId) {
131 communicatorTmp->CallbackOnConnect(deviceId, false);
132 }
133 }
134 }
135
AllocCommunicator(const std::string & deviceId,int & outErrorNo)136 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const std::string &deviceId, int &outErrorNo)
137 {
138 // Called in main thread only
139 VirtualCommunicator *communicator = new (std::nothrow) VirtualCommunicator(deviceId, this);
140 if (communicator == nullptr) {
141 outErrorNo = -E_OUT_OF_MEMORY;
142 }
143 {
144 std::lock_guard<std::mutex> lock(communicatorsLock_);
145 communicators_.insert(std::pair<std::string, VirtualCommunicator *>(deviceId, communicator));
146 }
147 OnlineDevice(deviceId);
148 return communicator;
149 }
150
GetCommunicator(const std::string & deviceId) const151 ICommunicator *VirtualCommunicatorAggregator::GetCommunicator(const std::string &deviceId) const
152 {
153 std::lock_guard<std::mutex> lock(communicatorsLock_);
154 auto iter = communicators_.find(deviceId);
155 if (iter != communicators_.end()) {
156 VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
157 return communicator;
158 }
159 return nullptr;
160 }
161
DispatchMessage(const std::string & srcTarget,const std::string & dstTarget,const Message * inMsg,const OnSendEnd & onEnd)162 void VirtualCommunicatorAggregator::DispatchMessage(const std::string &srcTarget, const std::string &dstTarget,
163 const Message *inMsg, const OnSendEnd &onEnd)
164 {
165 if (VirtualCommunicatorAggregator::GetBlockValue()) {
166 std::unique_lock<std::mutex> lock(blockLock_);
167 conditionVar_.wait(lock);
168 }
169
170 if (!isEnable_) {
171 LOGD("[VirtualCommunicatorAggregator] DispatchMessage, VirtualCommunicatorAggregator is disabled");
172 delete inMsg;
173 inMsg = nullptr;
174 return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
175 }
176 std::lock_guard<std::mutex> lock(communicatorsLock_);
177 auto iter = communicators_.find(dstTarget);
178 if (iter != communicators_.end()) {
179 LOGI("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s", dstTarget.c_str());
180 VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
181 if (!communicator->IsEnabled()) {
182 LOGE("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s disabled", dstTarget.c_str());
183 delete inMsg;
184 inMsg = nullptr;
185 return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
186 }
187 uint32_t messageId = inMsg->GetMessageId();
188 Message *msg = const_cast<Message *>(inMsg);
189 msg->SetTarget(srcTarget);
190 RefObject::IncObjRef(communicator);
191 auto onDispatch = onDispatch_;
192 bool isNeedDelay = ((sendDelayTime_ > 0) && (delayTimes_ > 0) && (messageId == delayMessageId_) &&
193 (delayDevices_.count(dstTarget) > 0) && (skipTimes_ == 0));
194 uint32_t sendDelayTime = sendDelayTime_;
195 std::thread thread([communicator, srcTarget, dstTarget, msg, isNeedDelay, sendDelayTime, onDispatch]() {
196 if (isNeedDelay) {
197 std::this_thread::sleep_for(std::chrono::milliseconds(sendDelayTime));
198 }
199 if (onDispatch) {
200 onDispatch(dstTarget, msg);
201 }
202 communicator->CallbackOnMessage(srcTarget, msg);
203 RefObject::DecObjRef(communicator);
204 });
205 DelayTimeHandle(messageId, dstTarget);
206 thread.detach();
207 CallSendEnd(E_OK, onEnd);
208 } else {
209 LOGE("[VirtualCommunicatorAggregator] DispatchMessage, can't find dstTarget %s", dstTarget.c_str());
210 delete inMsg;
211 inMsg = nullptr;
212 CallSendEnd(-E_NOT_FOUND, onEnd);
213 }
214 }
215
SetBlockValue(bool value)216 void VirtualCommunicatorAggregator::SetBlockValue(bool value)
217 {
218 std::unique_lock<std::mutex> lock(blockLock_);
219 isBlock_ = value;
220 if (!value) {
221 conditionVar_.notify_all();
222 }
223 }
224
GetBlockValue() const225 bool VirtualCommunicatorAggregator::GetBlockValue() const
226 {
227 return isBlock_;
228 }
229
Disable()230 void VirtualCommunicatorAggregator::Disable()
231 {
232 isEnable_ = false;
233 }
234
Enable()235 void VirtualCommunicatorAggregator::Enable()
236 {
237 LOGD("[VirtualCommunicatorAggregator] enable");
238 isEnable_ = true;
239 }
240
CallSendEnd(int errCode,const OnSendEnd & onEnd)241 void VirtualCommunicatorAggregator::CallSendEnd(int errCode, const OnSendEnd &onEnd)
242 {
243 if (onEnd) {
244 (void)RuntimeContext::GetInstance()->ScheduleTask([errCode, onEnd]() {
245 onEnd(errCode);
246 });
247 }
248 }
249
RegOnDispatch(const std::function<void (const std::string &,Message * inMsg)> & onDispatch)250 void VirtualCommunicatorAggregator::RegOnDispatch(
251 const std::function<void(const std::string&, Message *inMsg)> &onDispatch)
252 {
253 onDispatch_ = onDispatch;
254 }
255
SetCurrentUserId(const std::string & userId)256 void VirtualCommunicatorAggregator::SetCurrentUserId(const std::string &userId)
257 {
258 userId_ = userId;
259 }
260
SetTimeout(const std::string & deviceId,uint32_t timeout)261 void VirtualCommunicatorAggregator::SetTimeout(const std::string &deviceId, uint32_t timeout)
262 {
263 std::lock_guard<std::mutex> lock(communicatorsLock_);
264 if (communicators_.find(deviceId) != communicators_.end()) {
265 communicators_[deviceId]->SetTimeout(timeout);
266 }
267 }
268
SetDropMessageTypeByDevice(const std::string & deviceId,MessageId msgid,uint32_t dropTimes)269 void VirtualCommunicatorAggregator::SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid,
270 uint32_t dropTimes)
271 {
272 std::lock_guard<std::mutex> lock(communicatorsLock_);
273 if (communicators_.find(deviceId) != communicators_.end()) {
274 communicators_[deviceId]->SetDropMessageTypeByDevice(msgid, dropTimes);
275 }
276 }
277
SetDeviceMtuSize(const std::string & deviceId,uint32_t mtuSize)278 void VirtualCommunicatorAggregator::SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize)
279 {
280 std::lock_guard<std::mutex> lock(communicatorsLock_);
281 if (communicators_.find(deviceId) != communicators_.end()) {
282 communicators_[deviceId]->SetCommunicatorMtuSize(mtuSize);
283 }
284 }
285
SetSendDelayInfo(uint32_t sendDelayTime,uint32_t delayMessageId,uint32_t delayTimes,uint32_t skipTimes,std::set<std::string> & delayDevices)286 void VirtualCommunicatorAggregator::SetSendDelayInfo(uint32_t sendDelayTime, uint32_t delayMessageId,
287 uint32_t delayTimes, uint32_t skipTimes, std::set<std::string> &delayDevices)
288 {
289 sendDelayTime_ = sendDelayTime;
290 delayMessageId_ = delayMessageId;
291 delayTimes_ = delayTimes;
292 delayDevices_ = delayDevices;
293 skipTimes_ = skipTimes;
294 }
295
ResetSendDelayInfo()296 void VirtualCommunicatorAggregator::ResetSendDelayInfo()
297 {
298 sendDelayTime_ = 0;
299 delayMessageId_ = INVALID_MESSAGE_ID;
300 delayTimes_ = 0;
301 skipTimes_ = 0;
302 delayDevices_.clear();
303 }
304
DelayTimeHandle(uint32_t messageId,const std::string & dstTarget)305 void VirtualCommunicatorAggregator::DelayTimeHandle(uint32_t messageId, const std::string &dstTarget)
306 {
307 if ((skipTimes_ == 0) && delayTimes_ > 0 && (messageId == delayMessageId_) &&
308 (delayDevices_.count(dstTarget) > 0)) {
309 delayTimes_--;
310 }
311 if (skipTimes_ > 0 && (messageId == delayMessageId_) && (delayDevices_.count(dstTarget) > 0)) {
312 skipTimes_--;
313 }
314 }
315
GetOnlineDevices()316 std::set<std::string> VirtualCommunicatorAggregator::GetOnlineDevices()
317 {
318 std::lock_guard<std::mutex> lock(communicatorsLock_);
319 std::set<std::string> onlineDevices;
320 for (const auto &item: communicators_) {
321 onlineDevices.insert(item.first);
322 }
323 return onlineDevices;
324 }
325
DisableCommunicator()326 void VirtualCommunicatorAggregator::DisableCommunicator()
327 {
328 std::lock_guard<std::mutex> lock(communicatorsLock_);
329 for (const auto &communicator: communicators_) {
330 communicator.second->Disable();
331 }
332 }
333
EnableCommunicator()334 void VirtualCommunicatorAggregator::EnableCommunicator()
335 {
336 std::lock_guard<std::mutex> lock(communicatorsLock_);
337 for (const auto &communicator: communicators_) {
338 communicator.second->Disable();
339 }
340 }
341 } // namespace DistributedDB
342