• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "communicator.h"
17 #include "db_common.h"
18 #include "log_print.h"
19 #include "protocol_proto.h"
20 
21 namespace DistributedDB {
Communicator(CommunicatorAggregator * inCommAggregator,const LabelType & inLabel)22 Communicator::Communicator(CommunicatorAggregator *inCommAggregator, const LabelType &inLabel)
23     : commAggrHandle_(inCommAggregator), commLabel_(inLabel)
24 {
25     RefObject::IncObjRef(commAggrHandle_); // Rely on CommunicatorAggregator, hold its reference.
26 }
27 
~Communicator()28 Communicator:: ~Communicator()
29 {
30     RefObject::DecObjRef(commAggrHandle_); // Communicator no longer hold the reference of CommunicatorAggregator.
31     onMessageHandle_ = nullptr;
32     onConnectHandle_ = nullptr;
33     onSendableHandle_ = nullptr;
34     commAggrHandle_ = nullptr;
35 }
36 
RegOnMessageCallback(const OnMessageCallback & onMessage,const Finalizer & inOper)37 int Communicator::RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper)
38 {
39     std::lock_guard<std::mutex> messageHandleLockGuard(messageHandleMutex_);
40     return RegCallBack(onMessage, onMessageHandle_, inOper, onMessageFinalizer_);
41 }
42 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)43 int Communicator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
44 {
45     std::lock_guard<std::mutex> connectHandleLockGuard(connectHandleMutex_);
46     int errCode = RegCallBack(onConnect, onConnectHandle_, inOper, onConnectFinalizer_);
47     if (onConnect && errCode == E_OK) {
48         // Register action and success
49         for (auto &entry : onlineTargets_) {
50             LOGI("[Comm][RegConnect] Label=%.6s, online target=%s{private}.", VEC_TO_STR(commLabel_), entry.c_str());
51             onConnectHandle_(entry, true);
52         }
53     }
54     return errCode;
55 }
56 
RegOnSendableCallback(const std::function<void (void)> & onSendable,const Finalizer & inOper)57 int Communicator::RegOnSendableCallback(const std::function<void(void)> &onSendable, const Finalizer &inOper)
58 {
59     std::lock_guard<std::mutex> sendableHandleLockGuard(sendableHandleMutex_);
60     return RegCallBack(onSendable, onSendableHandle_, inOper, onSendableFinalizer_);
61 }
62 
Activate()63 void Communicator::Activate()
64 {
65     commAggrHandle_->ActivateCommunicator(commLabel_);
66 }
67 
GetCommunicatorMtuSize() const68 uint32_t Communicator::GetCommunicatorMtuSize() const
69 {
70     return commAggrHandle_->GetCommunicatorAggregatorMtuSize();
71 }
72 
GetCommunicatorMtuSize(const std::string & target) const73 uint32_t Communicator::GetCommunicatorMtuSize(const std::string &target) const
74 {
75     return commAggrHandle_->GetCommunicatorAggregatorMtuSize(target);
76 }
77 
GetLocalIdentity(std::string & outTarget) const78 int Communicator::GetLocalIdentity(std::string &outTarget) const
79 {
80     return commAggrHandle_->GetLocalIdentity(outTarget);
81 }
82 
GetTimeout() const83 uint32_t Communicator::GetTimeout() const
84 {
85     return commAggrHandle_->GetCommunicatorAggregatorTimeout();
86 }
87 
GetTimeout(const std::string & target) const88 uint32_t Communicator::GetTimeout(const std::string &target) const
89 {
90     return commAggrHandle_->GetCommunicatorAggregatorTimeout(target);
91 }
92 
IsDeviceOnline(const std::string & device) const93 bool Communicator::IsDeviceOnline(const std::string &device) const
94 {
95     return commAggrHandle_->IsDeviceOnline(device);
96 }
97 
SendMessage(const std::string & dstTarget,const Message * inMsg,SendConfig & config)98 int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config)
99 {
100     return SendMessage(dstTarget, inMsg, config, nullptr);
101 }
102 
SendMessage(const std::string & dstTarget,const Message * inMsg,SendConfig & config,const OnSendEnd & onEnd)103 int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config,
104     const OnSendEnd &onEnd)
105 {
106     if (dstTarget.size() == 0 || inMsg == nullptr) {
107         return -E_INVALID_ARGS;
108     }
109     std::shared_ptr<ExtendHeaderHandle> extendHandle = nullptr;
110     if (config.isNeedExtendHead) {
111         extendHandle = commAggrHandle_->GetExtendHeaderHandle(config.paramInfo);
112         if (extendHandle == nullptr) {
113             LOGE("[Comm][Send] get extendHandle failed");
114             return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND;
115         }
116     }
117     int error = E_OK;
118     // if error is not E_OK , null pointer will be returned
119     SerialBuffer *buffer = ProtocolProto::ToSerialBuffer(inMsg, error, extendHandle, false);
120     extendHandle = nullptr;
121     if (error != E_OK) {
122         LOGE("[Comm][Send] Serial fail, label=%s, error=%d.", VEC_TO_STR(commLabel_), error);
123         return error;
124     }
125     int errCode = ProtocolProto::SetDivergeHeader(buffer, commLabel_);
126     if (errCode != E_OK) {
127         LOGE("[Comm][Send] Set header fail, label=%s, errCode=%d.", VEC_TO_STR(commLabel_), errCode);
128         delete buffer;
129         buffer = nullptr;
130         return errCode;
131     }
132 
133     TaskConfig taskConfig {config.nonBlock, config.timeout, inMsg->GetPriority()};
134     errCode = commAggrHandle_->CreateSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, taskConfig, onEnd);
135     if (errCode == E_OK) {
136         // if ok, free inMsg, otherwise the caller should take over inMsg
137         delete inMsg;
138         inMsg = nullptr;
139     } else {
140         // if send fails, free buffer, otherwise buffer should be taked over by comminucator aggregator
141         delete buffer;
142         buffer = nullptr;
143     }
144     return errCode;
145 }
146 
OnBufferReceive(const std::string & srcTarget,const SerialBuffer * inBuf)147 void Communicator::OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf)
148 {
149     std::lock_guard<std::mutex> messageHandleLockGuard(messageHandleMutex_);
150     if (srcTarget.size() != 0 && inBuf != nullptr && onMessageHandle_) {
151         int error = E_OK;
152         // if error is not E_OK, null pointer will be returned
153         Message *message = ProtocolProto::ToMessage(inBuf, error);
154         delete inBuf;
155         inBuf = nullptr;
156         // message is not nullptr if error is E_OK or error is E_NOT_REGISTER.
157         // for the former case the message will be handled and release by sync module.
158         // for the latter case the message is released in TriggerUnknownMessageFeedback.
159         if (error != E_OK) {
160             LOGE("[Comm][Receive] ToMessage fail, label=%s, error=%d.", VEC_TO_STR(commLabel_), error);
161             if (error == -E_VERSION_NOT_SUPPORT) {
162                 TriggerVersionNegotiation(srcTarget);
163             } else if (error == -E_NOT_REGISTER) {
164                 TriggerUnknownMessageFeedback(srcTarget, message);
165             }
166             return;
167         }
168         LOGI("[Comm][Receive] label=%s, srcTarget=%s{private}.", VEC_TO_STR(commLabel_), srcTarget.c_str());
169         onMessageHandle_(srcTarget, message);
170     } else {
171         LOGE("[Comm][Receive] label=%s, src.size=%zu or buf or handle invalid.", VEC_TO_STR(commLabel_),
172             srcTarget.size());
173         if (inBuf != nullptr) {
174             delete inBuf;
175             inBuf = nullptr;
176         }
177     }
178 }
179 
OnConnectChange(const std::string & target,bool isConnect)180 void Communicator::OnConnectChange(const std::string &target, bool isConnect)
181 {
182     std::lock_guard<std::mutex> connectHandleLockGuard(connectHandleMutex_);
183     if (target.size() == 0) {
184         LOGE("[Comm][Connect] Target size zero, label=%s.", VEC_TO_STR(commLabel_));
185         return;
186     }
187     if (isConnect) {
188         onlineTargets_.insert(target);
189     } else {
190         onlineTargets_.erase(target);
191     }
192     LOGI("[Comm][Connect] Label=%s, target=%s{private}, Online=%d", VEC_TO_STR(commLabel_), target.c_str(), isConnect);
193     if (onConnectHandle_) {
194         onConnectHandle_(target, isConnect);
195     } else {
196         LOGI("[Comm][Connect] Handle invalid currently.");
197     }
198 }
199 
OnSendAvailable()200 void Communicator::OnSendAvailable()
201 {
202     std::lock_guard<std::mutex> sendableHandleLockGuard(sendableHandleMutex_);
203     if (onSendableHandle_) {
204         onSendableHandle_();
205     }
206 }
207 
GetCommunicatorLabel() const208 LabelType Communicator::GetCommunicatorLabel() const
209 {
210     return commLabel_;
211 }
212 
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const213 int Communicator::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
214 {
215     return commAggrHandle_->GetRemoteCommunicatorVersion(target, outVersion);
216 }
217 
TriggerVersionNegotiation(const std::string & dstTarget)218 void Communicator::TriggerVersionNegotiation(const std::string &dstTarget)
219 {
220     LOGI("[Comm][TrigVer] Do version negotiate with target=%s{private}.", dstTarget.c_str());
221     int errCode = E_OK;
222     SerialBuffer *buffer = ProtocolProto::BuildEmptyFrameForVersionNegotiate(errCode);
223     if (errCode != E_OK) {
224         LOGE("[Comm][TrigVer] Build empty frame fail, errCode=%d", errCode);
225         return;
226     }
227 
228     TaskConfig config{true, 0, Priority::HIGH};
229     errCode = commAggrHandle_->CreateSendTask(dstTarget, buffer, FrameType::EMPTY, config);
230     if (errCode != E_OK) {
231         LOGE("[Comm][TrigVer] Send empty frame fail, errCode=%d", errCode);
232         // if send fails, free buffer, otherwise buffer will be taked over by comminucator aggregator
233         delete buffer;
234         buffer = nullptr;
235     }
236 }
237 
TriggerUnknownMessageFeedback(const std::string & dstTarget,Message * & oriMsg)238 void Communicator::TriggerUnknownMessageFeedback(const std::string &dstTarget, Message* &oriMsg)
239 {
240     if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) {
241         LOGI("[Comm][TrigFeedback] Do nothing for unknown message with type not request.");
242         // Do not have to do feedback if the message is not a request type message
243         delete oriMsg;
244         oriMsg = nullptr;
245         return;
246     }
247 
248     LOGI("[Comm][TrigFeedback] Do unknown message feedback with target=%s{private}.", dstTarget.c_str());
249     oriMsg->SetMessageType(TYPE_RESPONSE);
250     oriMsg->SetErrorNo(E_FEEDBACK_UNKNOWN_MESSAGE);
251 
252     int errCode = E_OK;
253     SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, commLabel_, errCode);
254     delete oriMsg;
255     oriMsg = nullptr;
256     if (errCode != E_OK) {
257         LOGE("[Comm][TrigFeedback] Build unknown message feedback frame fail, errCode=%d", errCode);
258         return;
259     }
260 
261     TaskConfig config{true, 0, Priority::HIGH};
262     errCode = commAggrHandle_->CreateSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, config);
263     if (errCode != E_OK) {
264         LOGE("[Comm][TrigFeedback] Send unknown message feedback frame fail, errCode=%d", errCode);
265         // if send fails, free buffer, otherwise buffer will be taked over by comminucator aggregator
266         delete buffer;
267         buffer = nullptr;
268     }
269 }
270 
271 DEFINE_OBJECT_TAG_FACILITIES(Communicator)
272 } // namespace DistributedDB
273