• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "communicator.h"
17 #include "db_common.h"
18 #include "log_print.h"
19 #include "protocol_proto.h"
20 
21 namespace DistributedDB {
Communicator(CommunicatorAggregator * inCommAggregator,const LabelType & inLabel)22 Communicator::Communicator(CommunicatorAggregator *inCommAggregator, const LabelType &inLabel)
23     : commAggrHandle_(inCommAggregator), commLabel_(inLabel), dbClosePending_(false)
24 {
25     RefObject::IncObjRef(commAggrHandle_); // Rely on CommunicatorAggregator, hold its reference.
26 }
27 
~Communicator()28 Communicator:: ~Communicator()
29 {
30     RefObject::DecObjRef(commAggrHandle_); // Communicator no longer hold the reference of CommunicatorAggregator.
31     onMessageHandle_ = nullptr;
32     onConnectHandle_ = nullptr;
33     onSendableHandle_ = nullptr;
34     commAggrHandle_ = nullptr;
35 }
36 
RegOnMessageCallback(const OnMessageCallback & onMessage,const Finalizer & inOper)37 int Communicator::RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper)
38 {
39     std::lock_guard<std::mutex> messageHandleLockGuard(messageHandleMutex_);
40     return RegCallBack(onMessage, onMessageHandle_, inOper, onMessageFinalizer_);
41 }
42 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)43 int Communicator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
44 {
45     std::lock_guard<std::mutex> connectHandleLockGuard(connectHandleMutex_);
46     int errCode = RegCallBack(onConnect, onConnectHandle_, inOper, onConnectFinalizer_);
47     if (onConnect && errCode == E_OK) {
48         // Register action and success
49         for (auto &entry : onlineTargets_) {
50             LOGI("[Comm][RegConnect] Label=%.3s, online target=%s{private}.", VEC_TO_STR(commLabel_), entry.c_str());
51             onConnectHandle_(entry, true);
52         }
53     }
54     return errCode;
55 }
56 
RegOnSendableCallback(const std::function<void (void)> & onSendable,const Finalizer & inOper)57 int Communicator::RegOnSendableCallback(const std::function<void(void)> &onSendable, const Finalizer &inOper)
58 {
59     std::lock_guard<std::mutex> sendableHandleLockGuard(sendableHandleMutex_);
60     return RegCallBack(onSendable, onSendableHandle_, inOper, onSendableFinalizer_);
61 }
62 
Activate(const std::string & userId)63 void Communicator::Activate(const std::string &userId)
64 {
65     commAggrHandle_->ActivateCommunicator(commLabel_, userId);
66 }
67 
GetCommunicatorMtuSize() const68 uint32_t Communicator::GetCommunicatorMtuSize() const
69 {
70     return commAggrHandle_->GetCommunicatorAggregatorMtuSize();
71 }
72 
GetCommunicatorMtuSize(const std::string & target) const73 uint32_t Communicator::GetCommunicatorMtuSize(const std::string &target) const
74 {
75     return commAggrHandle_->GetCommunicatorAggregatorMtuSize(target);
76 }
77 
GetLocalIdentity(std::string & outTarget) const78 int Communicator::GetLocalIdentity(std::string &outTarget) const
79 {
80     return commAggrHandle_->GetLocalIdentity(outTarget);
81 }
82 
GetTimeout() const83 uint32_t Communicator::GetTimeout() const
84 {
85     return commAggrHandle_->GetCommunicatorAggregatorTimeout();
86 }
87 
GetTimeout(const std::string & target) const88 uint32_t Communicator::GetTimeout(const std::string &target) const
89 {
90     return commAggrHandle_->GetCommunicatorAggregatorTimeout(target);
91 }
92 
IsDeviceOnline(const std::string & device) const93 bool Communicator::IsDeviceOnline(const std::string &device) const
94 {
95     return commAggrHandle_->IsDeviceOnline(device);
96 }
97 
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config)98 int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config)
99 {
100     return SendMessage(dstTarget, inMsg, config, nullptr);
101 }
102 
SendMessage(const std::string & dstTarget,const Message * inMsg,const SendConfig & config,const OnSendEnd & onEnd)103 int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config,
104     const OnSendEnd &onEnd)
105 {
106     if (dstTarget.empty() || inMsg == nullptr) {
107         return -E_INVALID_ARGS;
108     }
109     std::shared_ptr<ExtendHeaderHandle> extendHandle = nullptr;
110     if (config.isNeedExtendHead) {
111         extendHandle = commAggrHandle_->GetExtendHeaderHandle(config.paramInfo);
112         if (extendHandle == nullptr) {
113             LOGE("[Comm][Send] get extendHandle failed");
114             return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND;
115         }
116     }
117     int error = E_OK;
118     // if error is not E_OK , null pointer will be returned
119     SerialBuffer *buffer = ProtocolProto::ToSerialBuffer(inMsg, extendHandle, false, error);
120     extendHandle = nullptr;
121     if (error != E_OK) {
122         LOGE("[Comm][Send] Serial fail, label=%.3s, error=%d.", VEC_TO_STR(commLabel_), error);
123         return error;
124     }
125     int errCode = ProtocolProto::SetDivergeHeader(buffer, commLabel_);
126     if (errCode != E_OK) {
127         LOGE("[Comm][Send] Set header fail, label=%.3s, errCode=%d.", VEC_TO_STR(commLabel_), errCode);
128         delete buffer;
129         buffer = nullptr;
130         return errCode;
131     }
132 
133     TaskConfig taskConfig {config.nonBlock, config.isRetryTask, config.timeout, inMsg->GetPriority()};
134     taskConfig.infos = {config.paramInfo.appId, config.paramInfo.storeId, config.paramInfo.userId,
135         config.paramInfo.subUserId};
136     errCode = commAggrHandle_->ScheduleSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, taskConfig, onEnd);
137     if (errCode == E_OK) {
138         // if ok, free inMsg, otherwise the caller should take over inMsg
139         delete inMsg;
140         inMsg = nullptr;
141     } else {
142         // if send fails, free buffer, otherwise buffer should be taked over by comminucator aggregator
143         delete buffer;
144         buffer = nullptr;
145     }
146     return errCode;
147 }
148 
OnBufferReceive(const std::string & srcTarget,const SerialBuffer * inBuf,const std::string & sendUser,uint16_t remoteDbVersion)149 int Communicator::OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf,
150     const std::string &sendUser, uint16_t remoteDbVersion)
151 {
152     std::lock_guard<std::mutex> messageHandleLockGuard(messageHandleMutex_);
153     if (!srcTarget.empty() && inBuf != nullptr && onMessageHandle_) {
154         int error = E_OK;
155         // if error is not E_OK, null pointer will be returned
156         Message *message = ProtocolProto::ToMessage(inBuf, error);
157         // message is not nullptr if error is E_OK or error is E_NOT_REGISTER.
158         // for the former case the message will be handled and release by sync module.
159         // for the latter case the message is released in TriggerUnknownMessageFeedback.
160         if (error != E_OK) {
161             LOGE("[Comm][Receive] ToMessage fail, label=%.3s, error=%d.", VEC_TO_STR(commLabel_), error);
162             delete inBuf;
163             inBuf = nullptr;
164             if (error == -E_VERSION_NOT_SUPPORT) {
165                 TriggerVersionNegotiation(srcTarget);
166             } else if (error == -E_NOT_REGISTER) {
167                 TriggerUnknownMessageFeedback(srcTarget, message);
168             }
169             return E_OK;
170         }
171         message->SetRemoteSoftwareVersion(DBCommon::TransfDbVersionToSoftwareVersion(remoteDbVersion));
172         if (message->IsSupportFeedDbClosing() && ExchangeClosePending(false)) {
173             delete message;
174             message = nullptr;
175             LOGW("[Comm][Receive] db closing label=%.3s", VEC_TO_STR(commLabel_));
176             return -E_FEEDBACK_DB_CLOSING;
177         }
178         message->SetSenderUserId(sendUser);
179         LOGI("[Comm][Receive] label=%.3s, srcTarget=%s{private}.", VEC_TO_STR(commLabel_), srcTarget.c_str());
180         error = onMessageHandle_(srcTarget, message);
181         if (error == -E_FEEDBACK_DB_CLOSING) {
182             return -E_FEEDBACK_DB_CLOSING;
183         }
184         delete inBuf;
185         inBuf = nullptr;
186         return E_OK;
187     } else {
188         LOGE("[Comm][Receive] label=%.3s, src.size=%zu or buf or handle invalid.", VEC_TO_STR(commLabel_),
189             srcTarget.size());
190         if (inBuf != nullptr) {
191             delete inBuf;
192             inBuf = nullptr;
193         }
194     }
195     return E_OK;
196 }
197 
OnConnectChange(const std::string & target,bool isConnect)198 void Communicator::OnConnectChange(const std::string &target, bool isConnect)
199 {
200     std::lock_guard<std::mutex> connectHandleLockGuard(connectHandleMutex_);
201     if (target.size() == 0) {
202         LOGE("[Comm][Connect] Target size zero, label=%.3s.", VEC_TO_STR(commLabel_));
203         return;
204     }
205     if (isConnect) {
206         onlineTargets_.insert(target);
207     } else {
208         onlineTargets_.erase(target);
209     }
210     LOGI("[Comm][Connect] Label=%.3s, target=%s{private}, Online=%d", VEC_TO_STR(commLabel_), target.c_str(),
211         isConnect);
212     if (onConnectHandle_) {
213         onConnectHandle_(target, isConnect);
214     } else {
215         LOGI("[Comm][Connect] Handle invalid currently.");
216     }
217 }
218 
OnSendAvailable()219 void Communicator::OnSendAvailable()
220 {
221     std::lock_guard<std::mutex> sendableHandleLockGuard(sendableHandleMutex_);
222     if (onSendableHandle_) {
223         onSendableHandle_();
224     }
225 }
226 
GetCommunicatorLabel() const227 LabelType Communicator::GetCommunicatorLabel() const
228 {
229     return commLabel_;
230 }
231 
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const232 int Communicator::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
233 {
234     return commAggrHandle_->GetRemoteCommunicatorVersion(target, outVersion);
235 }
236 
TriggerVersionNegotiation(const std::string & dstTarget)237 void Communicator::TriggerVersionNegotiation(const std::string &dstTarget)
238 {
239     LOGI("[Comm][TrigVer] Do version negotiate with target=%s{private}.", dstTarget.c_str());
240     int errCode = E_OK;
241     SerialBuffer *buffer = ProtocolProto::BuildEmptyFrameForVersionNegotiate(errCode);
242     if (errCode != E_OK) {
243         LOGE("[Comm][TrigVer] Build empty frame fail, errCode=%d", errCode);
244         return;
245     }
246 
247     TaskConfig config{true, true, 0, Priority::HIGH};
248     errCode = commAggrHandle_->ScheduleSendTask(dstTarget, buffer, FrameType::EMPTY, config);
249     if (errCode != E_OK) {
250         LOGE("[Comm][TrigVer] Send empty frame fail, errCode=%d", errCode);
251         // if send fails, free buffer, otherwise buffer will be taked over by comminucator aggregator
252         delete buffer;
253         buffer = nullptr;
254     }
255 }
256 
TriggerUnknownMessageFeedback(const std::string & dstTarget,Message * & oriMsg)257 void Communicator::TriggerUnknownMessageFeedback(const std::string &dstTarget, Message* &oriMsg)
258 {
259     if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) {
260         LOGI("[Comm][TrigFeedback] Do nothing for unknown message with type not request.");
261         // Do not have to do feedback if the message is not a request type message
262         delete oriMsg;
263         oriMsg = nullptr;
264         return;
265     }
266 
267     LOGI("[Comm][TrigFeedback] Do unknown message feedback with target=%s{private}.", dstTarget.c_str());
268     oriMsg->SetMessageType(TYPE_RESPONSE);
269     oriMsg->SetErrorNo(E_FEEDBACK_UNKNOWN_MESSAGE);
270 
271     int errCode = E_OK;
272     SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, commLabel_, errCode);
273     delete oriMsg;
274     oriMsg = nullptr;
275     if (errCode != E_OK) {
276         LOGE("[Comm][TrigFeedback] Build unknown message feedback frame fail, errCode=%d", errCode);
277         return;
278     }
279 
280     TaskConfig config{true, true, 0, Priority::HIGH};
281     errCode = commAggrHandle_->ScheduleSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, config);
282     if (errCode != E_OK) {
283         LOGE("[Comm][TrigFeedback] Send unknown message feedback frame fail, errCode=%d", errCode);
284         // if send fails, free buffer, otherwise buffer will be taked over by comminucator aggregator
285         delete buffer;
286         buffer = nullptr;
287     }
288 }
289 
GetTargetUserId(const ExtendInfo & paramInfo) const290 std::string Communicator::GetTargetUserId(const ExtendInfo &paramInfo) const
291 {
292     std::shared_ptr<ExtendHeaderHandle> extendHandle = commAggrHandle_->GetExtendHeaderHandle(paramInfo);
293     if (extendHandle == nullptr) {
294         return DBConstant::DEFAULT_USER;
295     }
296     return extendHandle->GetTargetUserId();
297 }
298 
ExchangeClosePending(bool expected)299 bool Communicator::ExchangeClosePending(bool expected)
300 {
301     bool curVal = !expected;
302     return dbClosePending_.compare_exchange_strong(curVal, expected);
303 }
304 DEFINE_OBJECT_TAG_FACILITIES(Communicator)
305 } // namespace DistributedDB
306