• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "virtual_communicator.h"
17 
18 #include "log_print.h"
19 #include "protocol_proto.h"
20 #include "single_ver_serialize_manager.h"
21 #include "sync_engine.h"
22 #include "virtual_communicator_aggregator.h"
23 
24 namespace DistributedDB {
RegOnMessageCallback(const OnMessageCallback & onMessage,const Finalizer & inOper)25 int VirtualCommunicator::RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper)
26 {
27     std::lock_guard<std::mutex> lock(onMessageLock_);
28     onMessage_ = onMessage;
29     return E_OK;
30 }
31 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)32 int VirtualCommunicator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
33 {
34     std::lock_guard<std::mutex> lock(onConnectLock_);
35     onConnect_ = onConnect;
36     return E_OK;
37 }
38 
RegOnSendableCallback(const std::function<void (void)> & onSendable,const Finalizer & inOper)39 int VirtualCommunicator::RegOnSendableCallback(const std::function<void(void)> &onSendable, const Finalizer &inOper)
40 {
41     return E_OK;
42 }
43 
Activate(const std::string & userId)44 void VirtualCommunicator::Activate(const std::string &userId)
45 {
46 }
47 
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config)48 int VirtualCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config)
49 {
50     return SendMessage(dstTarget, inMsg, config, nullptr);
51 }
52 
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config,const OnSendEnd & onEnd)53 int VirtualCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config,
54     const OnSendEnd &onEnd)
55 {
56     AutoLock lock(this);
57     if (IsKilled()) {
58         return -E_OBJ_IS_KILLED;
59     }
60     if (!isEnable_) {
61         LOGD("[VirtualCommunicator] the VirtualCommunicator disabled!");
62         return -E_PERIPHERAL_INTERFACE_FAIL;
63     }
64     if (dstTarget == deviceId_) {
65         delete inMsg;
66         inMsg = nullptr;
67         return E_OK;
68     }
69     Message *message = nullptr;
70     int errCode = TranslateMsg(inMsg, message);
71     if (errCode == -E_NOT_REGISTER) {
72         communicatorAggregator_->DispatchMessage(deviceId_, dstTarget, inMsg, onEnd);
73         return E_OK;
74     }
75     if (errCode != E_OK) {
76         return errCode;
77     }
78     delete inMsg;
79     inMsg = nullptr;
80     communicatorAggregator_->DispatchMessage(deviceId_, dstTarget, message, onEnd);
81     return E_OK;
82 }
83 
SetRemoteVersion(uint16_t remoteVersion)84 void VirtualCommunicator::SetRemoteVersion(uint16_t remoteVersion)
85 {
86     remoteVersion_ = remoteVersion;
87 }
88 
GetRemoteCommunicatorVersion(const std::string & deviceId,uint16_t & version) const89 int VirtualCommunicator::GetRemoteCommunicatorVersion(const std::string &deviceId, uint16_t &version) const
90 {
91     version = remoteVersion_;
92     return E_OK;
93 }
94 
CallbackOnMessage(const std::string & srcTarget,Message * inMsg)95 void VirtualCommunicator::CallbackOnMessage(const std::string &srcTarget, Message *inMsg)
96 {
97     std::lock_guard<std::mutex> lock(onMessageLock_);
98     if (isEnable_ && onMessage_ && (srcTarget != deviceId_) && ((inMsg->GetMessageId() != dropMsgId_) ||
99         (dropMsgTimes_ == 0))) {
100         onMessage_(srcTarget, inMsg);
101     } else {
102         LOGD("drop msg from dev=%s, localDev=%s", srcTarget.c_str(), deviceId_.c_str());
103         if (dropMsgTimes_ > 0) {
104             dropMsgTimes_--;
105         }
106         delete inMsg;
107         inMsg = nullptr;
108     }
109 }
110 
CallbackOnConnect(const std::string & target,bool isConnect) const111 void VirtualCommunicator::CallbackOnConnect(const std::string &target, bool isConnect) const
112 {
113     {
114         std::lock_guard<std::mutex> lock(devicesMapLock_);
115         if (target != deviceId_) {
116             onlineDevicesMap_[target] = isConnect;
117         }
118     }
119     std::lock_guard<std::mutex> lock(onConnectLock_);
120     if (isEnable_ && onConnect_) {
121         onConnect_(target, isConnect);
122     }
123 }
124 
GetCommunicatorMtuSize() const125 uint32_t VirtualCommunicator::GetCommunicatorMtuSize() const
126 {
127     return mtuSize_;
128 }
129 
GetCommunicatorMtuSize(const std::string & target) const130 uint32_t VirtualCommunicator::GetCommunicatorMtuSize(const std::string &target) const
131 {
132     return GetCommunicatorMtuSize();
133 }
134 
SetCommunicatorMtuSize(uint32_t mtuSize)135 void VirtualCommunicator::SetCommunicatorMtuSize(uint32_t mtuSize)
136 {
137     mtuSize_ = mtuSize;
138 }
139 
GetTimeout() const140 uint32_t VirtualCommunicator::GetTimeout() const
141 {
142     LOGD("[VirtualCommunicator] Get timeout %" PRIu32, timeout_);
143     return timeout_;
144 }
145 
GetTimeout(const std::string & target) const146 uint32_t VirtualCommunicator::GetTimeout(const std::string &target) const
147 {
148     return GetTimeout();
149 }
150 
SetTimeout(uint32_t timeout)151 void VirtualCommunicator::SetTimeout(uint32_t timeout)
152 {
153     timeout_ = timeout;
154 }
155 
GetLocalIdentity(std::string & outTarget) const156 int VirtualCommunicator::GetLocalIdentity(std::string &outTarget) const
157 {
158     outTarget = deviceId_;
159     return E_OK;
160 }
161 
GeneralVirtualSyncId()162 int VirtualCommunicator::GeneralVirtualSyncId()
163 {
164     std::lock_guard<std::mutex> lock(syncIdLock_);
165     currentSyncId_++;
166     return currentSyncId_;
167 }
168 
Disable()169 void VirtualCommunicator::Disable()
170 {
171     isEnable_ = false;
172 }
173 
Enable()174 void VirtualCommunicator::Enable()
175 {
176     isEnable_ = true;
177 }
178 
SetDeviceId(const std::string & deviceId)179 void VirtualCommunicator::SetDeviceId(const std::string &deviceId)
180 {
181     deviceId_ = deviceId;
182 }
183 
GetDeviceId() const184 std::string VirtualCommunicator::GetDeviceId() const
185 {
186     return deviceId_;
187 }
188 
IsEnabled() const189 bool VirtualCommunicator::IsEnabled() const
190 {
191     return isEnable_;
192 }
193 
IsDeviceOnline(const std::string & device) const194 bool VirtualCommunicator::IsDeviceOnline(const std::string &device) const
195 {
196     bool res = true;
197     {
198         std::lock_guard<std::mutex> lock(devicesMapLock_);
199         if (onlineDevicesMap_.find(device) != onlineDevicesMap_.end()) {
200             res = onlineDevicesMap_[device];
201         }
202     }
203     return res;
204 }
205 
~VirtualCommunicator()206 VirtualCommunicator::~VirtualCommunicator()
207 {
208 }
209 
VirtualCommunicator(const std::string & deviceId,VirtualCommunicatorAggregator * communicatorAggregator)210 VirtualCommunicator::VirtualCommunicator(const std::string &deviceId,
211     VirtualCommunicatorAggregator *communicatorAggregator)
212     : deviceId_(deviceId), communicatorAggregator_(communicatorAggregator), targetUserId_(DBConstant::DEFAULT_USER),
213     dbClosePending_(false)
214 {
215 }
216 
TranslateMsg(const Message * inMsg,Message * & outMsg)217 int VirtualCommunicator::TranslateMsg(const Message *inMsg, Message *&outMsg)
218 {
219     int errCode = E_OK;
220     std::shared_ptr<ExtendHeaderHandle> extendHandle = nullptr;
221     auto buffer = ProtocolProto::ToSerialBuffer(inMsg, extendHandle, false, errCode);
222     if (errCode != E_OK) {
223         return errCode;
224     }
225     sendMsgSize_ += buffer->GetSize();
226     outMsg = ProtocolProto::ToMessage(buffer, errCode);
227     delete buffer;
228     buffer = nullptr;
229     return errCode;
230 }
231 
SetDropMessageTypeByDevice(MessageId msgid,uint32_t dropTimes)232 void VirtualCommunicator::SetDropMessageTypeByDevice(MessageId msgid, uint32_t dropTimes)
233 {
234     dropMsgId_ = msgid;
235     dropMsgTimes_ = dropTimes;
236     if (msgid == UNKNOW_MESSAGE) {
237         dropMsgTimes_ = 0;
238     }
239 }
240 
GetTargetUserId(const ExtendInfo & paramInfo) const241 std::string VirtualCommunicator::GetTargetUserId(const ExtendInfo &paramInfo) const
242 {
243     return targetUserId_;
244 }
245 
SetTargetUserId(const std::string & userId)246 void VirtualCommunicator::SetTargetUserId(const std::string &userId)
247 {
248     targetUserId_ = userId;
249 }
250 
GetSendMsgSize() const251 uint64_t VirtualCommunicator::GetSendMsgSize() const
252 {
253     return sendMsgSize_;
254 }
255 
ExchangeClosePending(bool expected)256 bool VirtualCommunicator::ExchangeClosePending(bool expected)
257 {
258     bool curVal = !expected;
259     return dbClosePending_.compare_exchange_strong(curVal, expected);
260 }
261 } // namespace DistributedDB