1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "virtual_communicator.h"
17
18 #include "log_print.h"
19 #include "protocol_proto.h"
20 #include "single_ver_serialize_manager.h"
21 #include "sync_engine.h"
22 #include "virtual_communicator_aggregator.h"
23
24 namespace DistributedDB {
RegOnMessageCallback(const OnMessageCallback & onMessage,const Finalizer & inOper)25 int VirtualCommunicator::RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper)
26 {
27 std::lock_guard<std::mutex> lock(onMessageLock_);
28 onMessage_ = onMessage;
29 return E_OK;
30 }
31
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)32 int VirtualCommunicator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
33 {
34 std::lock_guard<std::mutex> lock(onConnectLock_);
35 onConnect_ = onConnect;
36 return E_OK;
37 }
38
RegOnSendableCallback(const std::function<void (void)> & onSendable,const Finalizer & inOper)39 int VirtualCommunicator::RegOnSendableCallback(const std::function<void(void)> &onSendable, const Finalizer &inOper)
40 {
41 return E_OK;
42 }
43
Activate()44 void VirtualCommunicator::Activate()
45 {
46 }
47
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config)48 int VirtualCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config)
49 {
50 return SendMessage(dstTarget, inMsg, config, nullptr);
51 }
52
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config,const OnSendEnd & onEnd)53 int VirtualCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config,
54 const OnSendEnd &onEnd)
55 {
56 AutoLock lock(this);
57 if (IsKilled()) {
58 return -E_OBJ_IS_KILLED;
59 }
60 if (!isEnable_) {
61 LOGD("[VirtualCommunicator] the VirtualCommunicator disabled!");
62 return -E_PERIPHERAL_INTERFACE_FAIL;
63 }
64 if (dstTarget == deviceId_) {
65 delete inMsg;
66 inMsg = nullptr;
67 return E_OK;
68 }
69 Message *message = nullptr;
70 int errCode = TranslateMsg(inMsg, message);
71 if (errCode == -E_NOT_REGISTER) {
72 communicatorAggregator_->DispatchMessage(deviceId_, dstTarget, inMsg, onEnd);
73 return E_OK;
74 }
75 if (errCode != E_OK) {
76 return errCode;
77 }
78 delete inMsg;
79 inMsg = nullptr;
80 communicatorAggregator_->DispatchMessage(deviceId_, dstTarget, message, onEnd);
81 return E_OK;
82 }
83
GetRemoteCommunicatorVersion(const std::string & deviceId,uint16_t & version) const84 int VirtualCommunicator::GetRemoteCommunicatorVersion(const std::string &deviceId, uint16_t &version) const
85 {
86 version = UINT16_MAX;
87 return E_OK;
88 }
89
CallbackOnMessage(const std::string & srcTarget,Message * inMsg)90 void VirtualCommunicator::CallbackOnMessage(const std::string &srcTarget, Message *inMsg)
91 {
92 std::lock_guard<std::mutex> lock(onMessageLock_);
93 if (isEnable_ && onMessage_ && (srcTarget != deviceId_) && ((inMsg->GetMessageId() != dropMsgId_) ||
94 (dropMsgTimes_ == 0))) {
95 onMessage_(srcTarget, inMsg);
96 } else {
97 LOGD("drop msg from dev=%s, localDev=%s", srcTarget.c_str(), deviceId_.c_str());
98 if (dropMsgTimes_ > 0) {
99 dropMsgTimes_--;
100 }
101 delete inMsg;
102 inMsg = nullptr;
103 }
104 }
105
CallbackOnConnect(const std::string & target,bool isConnect) const106 void VirtualCommunicator::CallbackOnConnect(const std::string &target, bool isConnect) const
107 {
108 {
109 std::lock_guard<std::mutex> lock(devicesMapLock_);
110 if (target != deviceId_) {
111 onlineDevicesMap_[target] = isConnect;
112 }
113 }
114 std::lock_guard<std::mutex> lock(onConnectLock_);
115 if (isEnable_ && onConnect_) {
116 onConnect_(target, isConnect);
117 }
118 }
119
GetCommunicatorMtuSize() const120 uint32_t VirtualCommunicator::GetCommunicatorMtuSize() const
121 {
122 return mtuSize_;
123 }
124
GetCommunicatorMtuSize(const std::string & target) const125 uint32_t VirtualCommunicator::GetCommunicatorMtuSize(const std::string &target) const
126 {
127 return GetCommunicatorMtuSize();
128 }
129
SetCommunicatorMtuSize(uint32_t mtuSize)130 void VirtualCommunicator::SetCommunicatorMtuSize(uint32_t mtuSize)
131 {
132 mtuSize_ = mtuSize;
133 }
134
GetTimeout() const135 uint32_t VirtualCommunicator::GetTimeout() const
136 {
137 return timeout_;
138 }
139
GetTimeout(const std::string & target) const140 uint32_t VirtualCommunicator::GetTimeout(const std::string &target) const
141 {
142 return GetTimeout();
143 }
144
SetTimeout(uint32_t timeout)145 void VirtualCommunicator::SetTimeout(uint32_t timeout)
146 {
147 timeout_ = timeout;
148 }
149
GetLocalIdentity(std::string & outTarget) const150 int VirtualCommunicator::GetLocalIdentity(std::string &outTarget) const
151 {
152 outTarget = deviceId_;
153 return E_OK;
154 }
155
GeneralVirtualSyncId()156 int VirtualCommunicator::GeneralVirtualSyncId()
157 {
158 std::lock_guard<std::mutex> lock(syncIdLock_);
159 currentSyncId_++;
160 return currentSyncId_;
161 }
162
Disable()163 void VirtualCommunicator::Disable()
164 {
165 isEnable_ = false;
166 }
167
Enable()168 void VirtualCommunicator::Enable()
169 {
170 isEnable_ = true;
171 }
172
SetDeviceId(const std::string & deviceId)173 void VirtualCommunicator::SetDeviceId(const std::string &deviceId)
174 {
175 deviceId_ = deviceId;
176 }
177
GetDeviceId() const178 std::string VirtualCommunicator::GetDeviceId() const
179 {
180 return deviceId_;
181 }
182
IsEnabled() const183 bool VirtualCommunicator::IsEnabled() const
184 {
185 return isEnable_;
186 }
187
IsDeviceOnline(const std::string & device) const188 bool VirtualCommunicator::IsDeviceOnline(const std::string &device) const
189 {
190 bool res = true;
191 {
192 std::lock_guard<std::mutex> lock(devicesMapLock_);
193 if (onlineDevicesMap_.find(device) != onlineDevicesMap_.end()) {
194 res = onlineDevicesMap_[device];
195 }
196 }
197 return res;
198 }
199
~VirtualCommunicator()200 VirtualCommunicator::~VirtualCommunicator()
201 {
202 }
203
VirtualCommunicator(const std::string & deviceId,VirtualCommunicatorAggregator * communicatorAggregator)204 VirtualCommunicator::VirtualCommunicator(const std::string &deviceId,
205 VirtualCommunicatorAggregator *communicatorAggregator)
206 : deviceId_(deviceId), communicatorAggregator_(communicatorAggregator)
207 {
208 }
209
TranslateMsg(const Message * inMsg,Message * & outMsg)210 int VirtualCommunicator::TranslateMsg(const Message *inMsg, Message *&outMsg)
211 {
212 int errCode = E_OK;
213 std::shared_ptr<ExtendHeaderHandle> extendHandle = nullptr;
214 auto buffer = ProtocolProto::ToSerialBuffer(inMsg, errCode, extendHandle);
215 if (errCode != E_OK) {
216 return errCode;
217 }
218
219 outMsg = ProtocolProto::ToMessage(buffer, errCode);
220 if (errCode != E_OK) {
221 delete buffer;
222 buffer = nullptr;
223 }
224 return errCode;
225 }
226
SetDropMessageTypeByDevice(MessageId msgid,uint32_t dropTimes)227 void VirtualCommunicator::SetDropMessageTypeByDevice(MessageId msgid, uint32_t dropTimes)
228 {
229 dropMsgId_ = msgid;
230 dropMsgTimes_ = dropTimes;
231 if (msgid == UNKNOW_MESSAGE) {
232 dropMsgTimes_ = 0;
233 }
234 }
235 } // namespace DistributedDB