1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "virtual_communicator_aggregator.h"
16
17 #include <cstdint>
18 #include <thread>
19 #include <utility>
20
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "runtime_context.h"
25
26 namespace DistributedDB {
Initialize(IAdapter * inAdapter)27 int VirtualCommunicatorAggregator::Initialize(IAdapter *inAdapter)
28 {
29 return E_OK;
30 }
31
Finalize()32 void VirtualCommunicatorAggregator::Finalize()
33 {
34 }
35
36 // If not success, return nullptr and set outErrorNo
AllocCommunicator(uint64_t commLabel,int & outErrorNo)37 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo)
38 {
39 if (isEnable_) {
40 return AllocCommunicator(remoteDeviceId_, outErrorNo);
41 }
42 return nullptr;
43 }
44
AllocCommunicator(const LabelType & commLabel,int & outErrorNo)45 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const LabelType &commLabel, int &outErrorNo)
46 {
47 LOGI("[VirtualCommunicatorAggregator][Alloc] Label=%.6s.", VEC_TO_STR(commLabel));
48 if (commLabel.size() != COMM_LABEL_LENGTH) {
49 outErrorNo = -E_INVALID_ARGS;
50 return nullptr;
51 }
52
53 if (isEnable_) {
54 return AllocCommunicator(remoteDeviceId_, outErrorNo);
55 }
56 return nullptr;
57 }
58
ReleaseCommunicator(ICommunicator * inCommunicator)59 void VirtualCommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator)
60 {
61 // Called in main thread only
62 VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(inCommunicator);
63 OfflineDevice(communicator->GetDeviceId());
64 {
65 std::lock_guard<std::mutex> lock(communicatorsLock_);
66 communicators_.erase(communicator->GetDeviceId());
67 }
68 RefObject::KillAndDecObjRef(communicator);
69 communicator = nullptr;
70 }
71
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)72 int VirtualCommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
73 const Finalizer &inOper)
74 {
75 onCommLack_ = onCommLack;
76 return E_OK;
77 }
78
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)79 int VirtualCommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
80 {
81 onConnect_ = onConnect;
82 RunOnConnectCallback("deviceId", true);
83 return E_OK;
84 }
85
RunCommunicatorLackCallback(const LabelType & commLabel)86 void VirtualCommunicatorAggregator::RunCommunicatorLackCallback(const LabelType &commLabel)
87 {
88 if (onCommLack_) {
89 std::string userId;
90 onCommLack_(commLabel, userId_);
91 }
92 }
93
RunOnConnectCallback(const std::string & target,bool isConnect)94 void VirtualCommunicatorAggregator::RunOnConnectCallback(const std::string &target, bool isConnect)
95 {
96 if (onConnect_) {
97 onConnect_(target, isConnect);
98 }
99 }
100
GetLocalIdentity(std::string & outTarget) const101 int VirtualCommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
102 {
103 outTarget = "DEVICES_A";
104 return E_OK;
105 }
106
OnlineDevice(const std::string & deviceId) const107 void VirtualCommunicatorAggregator::OnlineDevice(const std::string &deviceId) const
108 {
109 if (!isEnable_) {
110 return;
111 }
112
113 // Called in main thread only
114 for (const auto &iter : communicators_) {
115 VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
116 if (iter.first != deviceId) {
117 communicatorTmp->CallbackOnConnect(deviceId, true);
118 }
119 }
120 }
121
OfflineDevice(const std::string & deviceId) const122 void VirtualCommunicatorAggregator::OfflineDevice(const std::string &deviceId) const
123 {
124 if (!isEnable_) {
125 return;
126 }
127
128 // Called in main thread only
129 for (const auto &iter : communicators_) {
130 VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
131 if (iter.first != deviceId) {
132 communicatorTmp->CallbackOnConnect(deviceId, false);
133 }
134 }
135 }
136
AllocCommunicator(const std::string & deviceId,int & outErrorNo)137 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const std::string &deviceId, int &outErrorNo)
138 {
139 // Called in main thread only
140 VirtualCommunicator *communicator = new (std::nothrow) VirtualCommunicator(deviceId, this);
141 if (communicator == nullptr) {
142 outErrorNo = -E_OUT_OF_MEMORY;
143 }
144 {
145 std::lock_guard<std::mutex> lock(communicatorsLock_);
146 communicators_.insert(std::pair<std::string, VirtualCommunicator *>(deviceId, communicator));
147 }
148 OnlineDevice(deviceId);
149 return communicator;
150 }
151
GetCommunicator(const std::string & deviceId) const152 ICommunicator *VirtualCommunicatorAggregator::GetCommunicator(const std::string &deviceId) const
153 {
154 std::lock_guard<std::mutex> lock(communicatorsLock_);
155 auto iter = communicators_.find(deviceId);
156 if (iter != communicators_.end()) {
157 VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
158 return communicator;
159 }
160 return nullptr;
161 }
162
DispatchMessage(const std::string & srcTarget,const std::string & dstTarget,const Message * inMsg,const OnSendEnd & onEnd)163 void VirtualCommunicatorAggregator::DispatchMessage(const std::string &srcTarget, const std::string &dstTarget,
164 const Message *inMsg, const OnSendEnd &onEnd)
165 {
166 if (VirtualCommunicatorAggregator::GetBlockValue()) {
167 std::unique_lock<std::mutex> lock(blockLock_);
168 conditionVar_.wait(lock);
169 }
170
171 if (!isEnable_) {
172 LOGD("[VirtualCommunicatorAggregator] DispatchMessage, VirtualCommunicatorAggregator is disabled");
173 delete inMsg;
174 inMsg = nullptr;
175 return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
176 }
177 std::lock_guard<std::mutex> lock(communicatorsLock_);
178 auto iter = communicators_.find(dstTarget);
179 if (iter != communicators_.end()) {
180 LOGI("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s", dstTarget.c_str());
181 VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
182 if (!communicator->IsEnabled()) {
183 LOGE("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s disabled", dstTarget.c_str());
184 delete inMsg;
185 inMsg = nullptr;
186 return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
187 }
188 uint32_t messageId = inMsg->GetMessageId();
189 Message *msg = const_cast<Message *>(inMsg);
190 msg->SetTarget(srcTarget);
191 RefObject::IncObjRef(communicator);
192 auto onDispatch = onDispatch_;
193 bool isNeedDelay = ((sendDelayTime_ > 0) && (delayTimes_ > 0) && (messageId == delayMessageId_) &&
194 (delayDevices_.count(dstTarget) > 0) && (skipTimes_ == 0));
195 uint32_t sendDelayTime = sendDelayTime_;
196 std::thread thread([communicator, srcTarget, dstTarget, msg, isNeedDelay, sendDelayTime, onDispatch]() {
197 if (isNeedDelay) {
198 std::this_thread::sleep_for(std::chrono::milliseconds(sendDelayTime));
199 }
200 if (onDispatch) {
201 onDispatch(dstTarget, msg);
202 }
203 communicator->CallbackOnMessage(srcTarget, msg);
204 RefObject::DecObjRef(communicator);
205 });
206 DelayTimeHandle(messageId, dstTarget);
207 thread.detach();
208 CallSendEnd(E_OK, onEnd);
209 } else {
210 LOGE("[VirtualCommunicatorAggregator] DispatchMessage, can't find dstTarget %s", dstTarget.c_str());
211 delete inMsg;
212 inMsg = nullptr;
213 CallSendEnd(-E_NOT_FOUND, onEnd);
214 }
215 }
216
SetBlockValue(bool value)217 void VirtualCommunicatorAggregator::SetBlockValue(bool value)
218 {
219 std::unique_lock<std::mutex> lock(blockLock_);
220 isBlock_ = value;
221 if (!value) {
222 conditionVar_.notify_all();
223 }
224 }
225
GetBlockValue() const226 bool VirtualCommunicatorAggregator::GetBlockValue() const
227 {
228 return isBlock_;
229 }
230
Disable()231 void VirtualCommunicatorAggregator::Disable()
232 {
233 isEnable_ = false;
234 }
235
Enable()236 void VirtualCommunicatorAggregator::Enable()
237 {
238 LOGD("[VirtualCommunicatorAggregator] enable");
239 isEnable_ = true;
240 }
241
CallSendEnd(int errCode,const OnSendEnd & onEnd)242 void VirtualCommunicatorAggregator::CallSendEnd(int errCode, const OnSendEnd &onEnd)
243 {
244 if (onEnd) {
245 (void)RuntimeContext::GetInstance()->ScheduleTask([errCode, onEnd]() {
246 onEnd(errCode);
247 });
248 }
249 }
250
RegOnDispatch(const std::function<void (const std::string &,Message * inMsg)> & onDispatch)251 void VirtualCommunicatorAggregator::RegOnDispatch(
252 const std::function<void(const std::string&, Message *inMsg)> &onDispatch)
253 {
254 onDispatch_ = onDispatch;
255 }
256
SetCurrentUserId(const std::string & userId)257 void VirtualCommunicatorAggregator::SetCurrentUserId(const std::string &userId)
258 {
259 userId_ = userId;
260 }
261
SetTimeout(const std::string & deviceId,uint32_t timeout)262 void VirtualCommunicatorAggregator::SetTimeout(const std::string &deviceId, uint32_t timeout)
263 {
264 std::lock_guard<std::mutex> lock(communicatorsLock_);
265 if (communicators_.find(deviceId) != communicators_.end()) {
266 communicators_[deviceId]->SetTimeout(timeout);
267 }
268 }
269
SetDropMessageTypeByDevice(const std::string & deviceId,MessageId msgid,uint32_t dropTimes)270 void VirtualCommunicatorAggregator::SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid,
271 uint32_t dropTimes)
272 {
273 std::lock_guard<std::mutex> lock(communicatorsLock_);
274 if (communicators_.find(deviceId) != communicators_.end()) {
275 communicators_[deviceId]->SetDropMessageTypeByDevice(msgid, dropTimes);
276 }
277 }
278
SetDeviceMtuSize(const std::string & deviceId,uint32_t mtuSize)279 void VirtualCommunicatorAggregator::SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize)
280 {
281 std::lock_guard<std::mutex> lock(communicatorsLock_);
282 if (communicators_.find(deviceId) != communicators_.end()) {
283 communicators_[deviceId]->SetCommunicatorMtuSize(mtuSize);
284 }
285 }
286
SetSendDelayInfo(uint32_t sendDelayTime,uint32_t delayMessageId,uint32_t delayTimes,uint32_t skipTimes,std::set<std::string> & delayDevices)287 void VirtualCommunicatorAggregator::SetSendDelayInfo(uint32_t sendDelayTime, uint32_t delayMessageId,
288 uint32_t delayTimes, uint32_t skipTimes, std::set<std::string> &delayDevices)
289 {
290 sendDelayTime_ = sendDelayTime;
291 delayMessageId_ = delayMessageId;
292 delayTimes_ = delayTimes;
293 delayDevices_ = delayDevices;
294 skipTimes_ = skipTimes;
295 }
296
ResetSendDelayInfo()297 void VirtualCommunicatorAggregator::ResetSendDelayInfo()
298 {
299 sendDelayTime_ = 0;
300 delayMessageId_ = INVALID_MESSAGE_ID;
301 delayTimes_ = 0;
302 skipTimes_ = 0;
303 delayDevices_.clear();
304 }
305
DelayTimeHandle(uint32_t messageId,const std::string & dstTarget)306 void VirtualCommunicatorAggregator::DelayTimeHandle(uint32_t messageId, const std::string &dstTarget)
307 {
308 if ((skipTimes_ == 0) && delayTimes_ > 0 && (messageId == delayMessageId_) &&
309 (delayDevices_.count(dstTarget) > 0)) {
310 delayTimes_--;
311 }
312 if (skipTimes_ > 0 && (messageId == delayMessageId_) && (delayDevices_.count(dstTarget) > 0)) {
313 skipTimes_--;
314 }
315 }
316 } // namespace DistributedDB
317