• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "virtual_communicator.h"
17 
18 #include "log_print.h"
19 #include "protocol_proto.h"
20 #include "single_ver_serialize_manager.h"
21 #include "sync_engine.h"
22 #include "virtual_communicator_aggregator.h"
23 
24 namespace DistributedDB {
RegOnMessageCallback(const OnMessageCallback & onMessage,const Finalizer & inOper)25 int VirtualCommunicator::RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper)
26 {
27     std::lock_guard<std::mutex> lock(onMessageLock_);
28     onMessage_ = onMessage;
29     return E_OK;
30 }
31 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)32 int VirtualCommunicator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
33 {
34     std::lock_guard<std::mutex> lock(onConnectLock_);
35     onConnect_ = onConnect;
36     return E_OK;
37 }
38 
RegOnSendableCallback(const std::function<void (void)> & onSendable,const Finalizer & inOper)39 int VirtualCommunicator::RegOnSendableCallback(const std::function<void(void)> &onSendable, const Finalizer &inOper)
40 {
41     return E_OK;
42 }
43 
Activate()44 void VirtualCommunicator::Activate()
45 {
46 }
47 
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config)48 int VirtualCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config)
49 {
50     return SendMessage(dstTarget, inMsg, config, nullptr);
51 }
52 
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config,const OnSendEnd & onEnd)53 int VirtualCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config,
54     const OnSendEnd &onEnd)
55 {
56     AutoLock lock(this);
57     if (IsKilled()) {
58         return -E_OBJ_IS_KILLED;
59     }
60     if (!isEnable_) {
61         LOGD("[VirtualCommunicator] the VirtualCommunicator disabled!");
62         return -E_PERIPHERAL_INTERFACE_FAIL;
63     }
64     if (dstTarget == deviceId_) {
65         delete inMsg;
66         inMsg = nullptr;
67         return E_OK;
68     }
69     Message *message = nullptr;
70     int errCode = TranslateMsg(inMsg, message);
71     if (errCode == -E_NOT_REGISTER) {
72         communicatorAggregator_->DispatchMessage(deviceId_, dstTarget, inMsg, onEnd);
73         return E_OK;
74     }
75     if (errCode != E_OK) {
76         return errCode;
77     }
78     delete inMsg;
79     inMsg = nullptr;
80     communicatorAggregator_->DispatchMessage(deviceId_, dstTarget, message, onEnd);
81     return E_OK;
82 }
83 
GetRemoteCommunicatorVersion(const std::string & deviceId,uint16_t & version) const84 int VirtualCommunicator::GetRemoteCommunicatorVersion(const std::string &deviceId, uint16_t &version) const
85 {
86     version = UINT16_MAX;
87     return E_OK;
88 }
89 
CallbackOnMessage(const std::string & srcTarget,Message * inMsg)90 void VirtualCommunicator::CallbackOnMessage(const std::string &srcTarget, Message *inMsg)
91 {
92     std::lock_guard<std::mutex> lock(onMessageLock_);
93     if (isEnable_ && onMessage_ && (srcTarget != deviceId_) && ((inMsg->GetMessageId() != dropMsgId_) ||
94         (dropMsgTimes_ == 0))) {
95         onMessage_(srcTarget, inMsg);
96     } else {
97         LOGD("drop msg from dev=%s, localDev=%s", srcTarget.c_str(), deviceId_.c_str());
98         if (dropMsgTimes_ > 0) {
99             dropMsgTimes_--;
100         }
101         delete inMsg;
102         inMsg = nullptr;
103     }
104 }
105 
CallbackOnConnect(const std::string & target,bool isConnect) const106 void VirtualCommunicator::CallbackOnConnect(const std::string &target, bool isConnect) const
107 {
108     {
109         std::lock_guard<std::mutex> lock(devicesMapLock_);
110         if (target != deviceId_) {
111             onlineDevicesMap_[target] = isConnect;
112         }
113     }
114     std::lock_guard<std::mutex> lock(onConnectLock_);
115     if (isEnable_ && onConnect_) {
116         onConnect_(target, isConnect);
117     }
118 }
119 
GetCommunicatorMtuSize() const120 uint32_t VirtualCommunicator::GetCommunicatorMtuSize() const
121 {
122     return mtuSize_;
123 }
124 
GetCommunicatorMtuSize(const std::string & target) const125 uint32_t VirtualCommunicator::GetCommunicatorMtuSize(const std::string &target) const
126 {
127     return GetCommunicatorMtuSize();
128 }
129 
SetCommunicatorMtuSize(uint32_t mtuSize)130 void VirtualCommunicator::SetCommunicatorMtuSize(uint32_t mtuSize)
131 {
132     mtuSize_ = mtuSize;
133 }
134 
GetTimeout() const135 uint32_t VirtualCommunicator::GetTimeout() const
136 {
137     return timeout_;
138 }
139 
GetTimeout(const std::string & target) const140 uint32_t VirtualCommunicator::GetTimeout(const std::string &target) const
141 {
142     return GetTimeout();
143 }
144 
SetTimeout(uint32_t timeout)145 void VirtualCommunicator::SetTimeout(uint32_t timeout)
146 {
147     timeout_ = timeout;
148 }
149 
GetLocalIdentity(std::string & outTarget) const150 int VirtualCommunicator::GetLocalIdentity(std::string &outTarget) const
151 {
152     outTarget = deviceId_;
153     return E_OK;
154 }
155 
GeneralVirtualSyncId()156 int VirtualCommunicator::GeneralVirtualSyncId()
157 {
158     std::lock_guard<std::mutex> lock(syncIdLock_);
159     currentSyncId_++;
160     return currentSyncId_;
161 }
162 
Disable()163 void VirtualCommunicator::Disable()
164 {
165     isEnable_ = false;
166 }
167 
Enable()168 void VirtualCommunicator::Enable()
169 {
170     isEnable_ = true;
171 }
172 
SetDeviceId(const std::string & deviceId)173 void VirtualCommunicator::SetDeviceId(const std::string &deviceId)
174 {
175     deviceId_ = deviceId;
176 }
177 
GetDeviceId() const178 std::string VirtualCommunicator::GetDeviceId() const
179 {
180     return deviceId_;
181 }
182 
IsEnabled() const183 bool VirtualCommunicator::IsEnabled() const
184 {
185     return isEnable_;
186 }
187 
IsDeviceOnline(const std::string & device) const188 bool VirtualCommunicator::IsDeviceOnline(const std::string &device) const
189 {
190     bool res = true;
191     {
192         std::lock_guard<std::mutex> lock(devicesMapLock_);
193         if (onlineDevicesMap_.find(device) != onlineDevicesMap_.end()) {
194             res = onlineDevicesMap_[device];
195         }
196     }
197     return res;
198 }
199 
~VirtualCommunicator()200 VirtualCommunicator::~VirtualCommunicator()
201 {
202 }
203 
VirtualCommunicator(const std::string & deviceId,VirtualCommunicatorAggregator * communicatorAggregator)204 VirtualCommunicator::VirtualCommunicator(const std::string &deviceId,
205     VirtualCommunicatorAggregator *communicatorAggregator)
206     : deviceId_(deviceId), communicatorAggregator_(communicatorAggregator)
207 {
208 }
209 
TranslateMsg(const Message * inMsg,Message * & outMsg)210 int VirtualCommunicator::TranslateMsg(const Message *inMsg, Message *&outMsg)
211 {
212     int errCode = E_OK;
213     std::shared_ptr<ExtendHeaderHandle> extendHandle = nullptr;
214     auto buffer = ProtocolProto::ToSerialBuffer(inMsg, errCode, extendHandle);
215     if (errCode != E_OK) {
216         return errCode;
217     }
218 
219     outMsg = ProtocolProto::ToMessage(buffer, errCode);
220     if (errCode != E_OK) {
221         delete buffer;
222         buffer = nullptr;
223     }
224     return errCode;
225 }
226 
SetDropMessageTypeByDevice(MessageId msgid,uint32_t dropTimes)227 void VirtualCommunicator::SetDropMessageTypeByDevice(MessageId msgid, uint32_t dropTimes)
228 {
229     dropMsgId_ = msgid;
230     dropMsgTimes_ = dropTimes;
231     if (msgid == UNKNOW_MESSAGE) {
232         dropMsgTimes_ = 0;
233     }
234 }
235 } // namespace DistributedDB