1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "communicator_linker.h"
17 #include "hash.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "protocol_proto.h"
21 #include "platform_specific.h"
22 #include "communicator_aggregator.h"
23
24 namespace DistributedDB {
25 namespace {
26 constexpr uint32_t TIME_LAPSE_FOR_WAITING_ACK = 5000; // 5s
27 constexpr uint32_t TIME_LAPSE_FOR_RETRY_SEND = 1000; // 1s
28 constexpr uint32_t RETRANSMIT_LIMIT = 20; // Currently we do at most 20 retransmission if no ack received
29 constexpr uint32_t RETRANSMIT_LIMIT_EQUAL_INTERVAL = 5; // First 5 retransmission will be equal interval
30 }
31
CommunicatorLinker(CommunicatorAggregator * inAggregator)32 CommunicatorLinker::CommunicatorLinker(CommunicatorAggregator *inAggregator)
33 : incSequenceId_(0), incAckTriggerId_(0)
34 {
35 aggregator_ = inAggregator;
36 RefObject::IncObjRef(aggregator_); // The linker rely on CommunicatorAggregator
37 }
38
~CommunicatorLinker()39 CommunicatorLinker::~CommunicatorLinker()
40 {
41 RefObject::DecObjRef(aggregator_); // The linker no longer rely on CommunicatorAggregator
42 aggregator_ = nullptr;
43 }
44
Initialize()45 void CommunicatorLinker::Initialize()
46 {
47 uint64_t curTime = 0;
48 int errCode = OS::GetCurrentSysTimeInMicrosecond(curTime);
49 if (errCode != E_OK) {
50 LOGW("[Linker][Init] Get systime fail, use default, errCode=%d.", errCode);
51 }
52 std::string curTimeStr = std::to_string(curTime);
53 localDistinctValue_ = Hash::HashFunc(curTimeStr);
54 LOGI("[Linker][Init] curTime=%llu, distinct=%llu.", ULL(curTime), ULL(localDistinctValue_));
55 }
56
57 // Create async task to send out label_exchange and waiting for label_exchange_ack.
58 // If waiting timeout, pass the send&wait task to overrall timing retry task.
TargetOnline(const std::string & inTarget,std::set<LabelType> & outRelatedLabels)59 int CommunicatorLinker::TargetOnline(const std::string &inTarget, std::set<LabelType> &outRelatedLabels)
60 {
61 {
62 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
63 // if inTarget is offline before, use the remembered previous online labels to decide which communicator to be
64 // notified online. Such handling is in case for abnormal unilateral offline, which A and B is notified online
65 // mutually, then B is notified A offline and for a while B is notified A online again, but A feels no notify.
66 if (remoteOnlineTarget_.count(inTarget) == 0) {
67 outRelatedLabels = targetMapOnlineLabels_[inTarget];
68 remoteOnlineTarget_.insert(inTarget);
69 }
70 }
71 return TriggerLabelExchangeEvent(inTarget);
72 }
73
74 // Clear all labels related to this target. Let no longer waiting for ack of this target.
75 // The caller should notify all related communicator about this target offline.
TargetOffline(const std::string & inTarget,std::set<LabelType> & outRelatedLabels)76 int CommunicatorLinker::TargetOffline(const std::string &inTarget, std::set<LabelType> &outRelatedLabels)
77 {
78 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
79 outRelatedLabels = targetMapOnlineLabels_[inTarget];
80 // Do not erase the Labels of inTarget from targetMapOnlineLabels_, remember it for using when TargetOnline
81 remoteOnlineTarget_.erase(inTarget);
82 // Note: The process of remote target may quit, when remote target restart,
83 // the distinctValue of this remote target may be changed, and the sequenceId may start from zero
84 targetDistinctValue_.erase(inTarget);
85 topRecvLabelSeq_.erase(inTarget);
86 return E_OK;
87 }
88
89 // Add local label. Create async task to send out label_exchange and waiting for label_exchange_ack.
90 // If waiting timeout, pass the send&wait task to overrall timing retry task.
91 // Find out targets for this label that is already online.
92 // The caller should notify communicator of this label about already online target.
IncreaseLocalLabel(const LabelType & inLabel,std::set<std::string> & outOnlineTarget)93 int CommunicatorLinker::IncreaseLocalLabel(const LabelType &inLabel, std::set<std::string> &outOnlineTarget)
94 {
95 std::set<std::string> totalOnlineTargets;
96 {
97 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
98 localOnlineLabels_.insert(inLabel);
99 totalOnlineTargets = remoteOnlineTarget_;
100 for (auto &entry : targetMapOnlineLabels_) {
101 if (remoteOnlineTarget_.count(entry.first) == 0) { // Ignore offline target
102 continue;
103 }
104 if (entry.second.count(inLabel) != 0) { // This online target had opened then same Label
105 outOnlineTarget.insert(entry.first);
106 }
107 }
108 }
109 bool everFail = false;
110 for (const auto &entry : totalOnlineTargets) {
111 int errCode = TriggerLabelExchangeEvent(entry);
112 if (errCode != E_OK) {
113 everFail = true;
114 }
115 }
116 return everFail ? -E_INTERNAL_ERROR : E_OK;
117 }
118
119 // Del local label. Create async task to send out label_exchange and waiting for label_exchange_ack.
120 // If waiting timeout, pass the send&wait task to overrall timing retry task.
DecreaseLocalLabel(const LabelType & inLabel)121 int CommunicatorLinker::DecreaseLocalLabel(const LabelType &inLabel)
122 {
123 std::set<std::string> totalOnlineTargets;
124 {
125 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
126 localOnlineLabels_.erase(inLabel);
127 totalOnlineTargets = remoteOnlineTarget_;
128 }
129 bool everFail = false;
130 for (const auto &entry : totalOnlineTargets) {
131 int errCode = TriggerLabelExchangeEvent(entry);
132 if (errCode != E_OK) {
133 everFail = true;
134 }
135 }
136 return everFail ? -E_INTERNAL_ERROR : E_OK;
137 }
138
139 // Compare the latest labels with previous Label, find out label changes.
140 // The caller should notify the target changes according to label changes.
141 // Update the online labels of this target. Send out label_exchange_ack.
ReceiveLabelExchange(const std::string & inTarget,const std::set<LabelType> & inLatestLabels,uint64_t inDistinctValue,uint64_t inSequenceId,std::map<LabelType,bool> & outChangeLabels)142 int CommunicatorLinker::ReceiveLabelExchange(const std::string &inTarget, const std::set<LabelType> &inLatestLabels,
143 uint64_t inDistinctValue, uint64_t inSequenceId, std::map<LabelType, bool> &outChangeLabels)
144 {
145 {
146 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
147 DetectDistinctValueChange(inTarget, inDistinctValue);
148 if (topRecvLabelSeq_.count(inTarget) == 0) {
149 // Firstly receive LabelExchange from this target
150 topRecvLabelSeq_[inTarget] = inSequenceId;
151 } else if (inSequenceId < topRecvLabelSeq_[inTarget]) {
152 // inSequenceId can be equal to topRecvLabelSeq, in this case, the ack of this sequence send to this target
153 // may be lost, this target resend LabelExchange, and we should resend ack to this target
154 LOGW("[Linker][RecvLabel] inSequenceId=%llu smaller than topRecvLabelSeq=%llu. Frame Ignored.",
155 ULL(inSequenceId), ULL(topRecvLabelSeq_[inTarget]));
156 return -E_OUT_OF_DATE;
157 } else {
158 // Update top sequenceId of received LabelExchange
159 topRecvLabelSeq_[inTarget] = inSequenceId;
160 }
161 // Find out online labels by check difference
162 for (auto &entry : inLatestLabels) {
163 if (targetMapOnlineLabels_[inTarget].count(entry) == 0) {
164 outChangeLabels[entry] = true;
165 }
166 }
167 // Find out offline labels by check difference
168 for (const auto &entry : targetMapOnlineLabels_[inTarget]) {
169 if (inLatestLabels.count(entry) == 0) {
170 outChangeLabels[entry] = false;
171 }
172 }
173 // Update target online labels
174 targetMapOnlineLabels_[inTarget] = inLatestLabels;
175 }
176 // Trigger sending ack
177 int errCode = TriggerLabelExchangeAckEvent(inTarget, inSequenceId);
178 if (errCode != E_OK) {
179 LOGE("[Linker][RecvLabel] TriggerAckEvent Fail, Just Log, errCode=%d.", errCode);
180 // Do not return error here
181 }
182 return E_OK;
183 }
184
185 // Waiting finish if the ack is what linker wait by check inSequenceId
186 // Similarly, stop the retry task of this Target.
ReceiveLabelExchangeAck(const std::string & inTarget,uint64_t inDistinctValue,uint64_t inSequenceId)187 int CommunicatorLinker::ReceiveLabelExchangeAck(const std::string &inTarget, uint64_t inDistinctValue,
188 uint64_t inSequenceId)
189 {
190 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
191 DetectDistinctValueChange(inTarget, inDistinctValue);
192 // This two judge is for detecting case that local device process restart so incSequenceId_ restart from 0
193 // The remote device may send an ack cause by previous process, which may destroy the functionality of this process
194 if (waitAckSeq_.count(inTarget) == 0) {
195 LOGW("[Linker][RecvAck] Not waiting any ack now, inSequenceId=%llu", ULL(inSequenceId));
196 return -E_NOT_FOUND;
197 }
198 if (waitAckSeq_[inTarget] < inSequenceId) {
199 LOGW("[Linker][RecvAck] Not waiting this ack now, inSequenceId=%llu, waitAckSeq_=%llu",
200 ULL(inSequenceId), ULL(waitAckSeq_[inTarget]));
201 return -E_NOT_FOUND;
202 }
203 // An valid ack received
204 if (recvAckSeq_.count(inTarget) == 0) {
205 // Firstly receive LabelExchangeAck from this target
206 recvAckSeq_[inTarget] = inSequenceId;
207 } else if (inSequenceId <= recvAckSeq_[inTarget]) {
208 LOGW("[Linker][RecvAck] inSequenceId=%llu not greater than recvAckSeq_=%llu. Frame Ignored.",
209 ULL(inSequenceId), ULL(recvAckSeq_[inTarget]));
210 return -E_OUT_OF_DATE;
211 } else {
212 // Update top sequenceId of received LabelExchangeAck
213 recvAckSeq_[inTarget] = inSequenceId;
214 }
215 return E_OK;
216 }
217
GetOnlineRemoteTarget() const218 std::set<std::string> CommunicatorLinker::GetOnlineRemoteTarget() const
219 {
220 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
221 return remoteOnlineTarget_;
222 }
223
IsRemoteTargetOnline(const std::string & inTarget) const224 bool CommunicatorLinker::IsRemoteTargetOnline(const std::string &inTarget) const
225 {
226 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
227 if (remoteOnlineTarget_.count(inTarget) != 0) {
228 return true;
229 }
230 return false;
231 }
232
233 // inCountDown is in millisecond
SuspendByOnceTimer(const std::function<void (void)> & inAction,uint32_t inCountDown)234 void CommunicatorLinker::SuspendByOnceTimer(const std::function<void(void)> &inAction, uint32_t inCountDown)
235 {
236 TimerId thisTimerId = 0;
237 RuntimeContext *context = RuntimeContext::GetInstance();
238 int errCode = context->SetTimer(static_cast<int>(inCountDown), [inAction](TimerId inTimerId)->int{
239 // Note: inAction should be captured by value (must not by reference)
240 LOGI("[Linker][Suspend] Timer Due : inTimerId=%llu.", ULL(inTimerId));
241 inAction();
242 return -E_END_TIMER;
243 }, nullptr, thisTimerId);
244 if (errCode == E_OK) {
245 LOGI("[Linker][Suspend] SetTimer Success : thisTimerId=%llu, wait=%u(ms).", ULL(thisTimerId), inCountDown);
246 } else {
247 LOGI("[Linker][Suspend] SetTimer Fail Raise Thread Instead : errCode=%d, wait=%u(ms).", errCode, inCountDown);
248 std::thread timerThread([inAction, inCountDown]() {
249 // Note: inAction and inCountDown should be captured by value (must not by reference)
250 std::this_thread::sleep_for(std::chrono::milliseconds(inCountDown));
251 inAction();
252 });
253 timerThread.detach();
254 }
255 }
256
257 // This function should be called under protection of entireInfoMutex_
DetectDistinctValueChange(const std::string & inTarget,uint64_t inDistinctValue)258 void CommunicatorLinker::DetectDistinctValueChange(const std::string &inTarget, uint64_t inDistinctValue)
259 {
260 // Firstly received distinctValue from this target ever or after offline
261 if (targetDistinctValue_.count(inTarget) == 0) {
262 targetDistinctValue_.try_emplace(inTarget, inDistinctValue);
263 return;
264 }
265
266 // DistinctValue is the same as before
267 if (targetDistinctValue_[inTarget] == inDistinctValue) {
268 return;
269 }
270
271 // DistinctValue change detected !!! This must be caused by malfunctioning of underlayer communication component.
272 LOGE("[Linker][Detect] ######## DISTINCT VALUE CHANGE DETECTED : %llu VS %llu ########",
273 ULL(inDistinctValue), ULL(targetDistinctValue_[inTarget]));
274 targetDistinctValue_[inTarget] = inDistinctValue;
275 // The process of remote target must have undergone a quit and restart, the remote sequenceId will start from zero.
276 topRecvLabelSeq_.erase(inTarget);
277 RefObject::IncObjRef(this);
278 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, inTarget]() {
279 LOGD("ReTrigger label exchange because remote process restarted!");
280 this->TriggerLabelExchangeEvent(inTarget);
281 RefObject::DecObjRef(this);
282 });
283 if (errCode != E_OK) {
284 LOGD("ReTrigger label exchange failed! errCode = %d", errCode);
285 RefObject::DecObjRef(this);
286 }
287 }
288
TriggerLabelExchangeEvent(const std::string & toTarget)289 int CommunicatorLinker::TriggerLabelExchangeEvent(const std::string &toTarget)
290 {
291 // Apply for a latest sequenceId
292 uint64_t sequenceId = incSequenceId_.fetch_add(1, std::memory_order_seq_cst);
293 // Get a snapshot of current online labels
294 std::set<LabelType> onlineLabels;
295 {
296 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
297 onlineLabels = localOnlineLabels_;
298 }
299 // Build LabelExchange Frame
300 int error = E_OK;
301 SerialBuffer *buffer = ProtocolProto::BuildLabelExchange(localDistinctValue_, sequenceId, onlineLabels, error);
302 if (error != E_OK) {
303 LOGE("[Linker][TriggerLabel] BuildLabel fail, error=%d", error);
304 return error;
305 }
306 // Update waitAckSeq, Check whether new event be triggered in other thread
307 {
308 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
309 if (waitAckSeq_.count(toTarget) == 0) {
310 // Firstly send LabelExchange to this target
311 waitAckSeq_[toTarget] = sequenceId;
312 } else if (waitAckSeq_[toTarget] > sequenceId) {
313 // New LabelExchangeEvent had been trigger for this target, so this event can be abort
314 LOGI("[Linker][TriggerLabel] Detect newSeqId=%llu than thisSeqId=%llu be triggered for target=%s{private}",
315 ULL(waitAckSeq_[toTarget]), ULL(sequenceId), toTarget.c_str());
316 delete buffer;
317 buffer = nullptr;
318 return E_OK;
319 } else {
320 waitAckSeq_[toTarget] = sequenceId;
321 }
322 }
323 // Synchronously call SendLabelExchange and hand over buffer to it
324 RefObject::IncObjRef(this); // SendLabelExchange will only DecRef when total done if no need to send
325 SendLabelExchange(toTarget, buffer, sequenceId, 0); // Initially retransmitCount is 0
326 return E_OK;
327 }
328
TriggerLabelExchangeAckEvent(const std::string & toTarget,uint64_t inSequenceId)329 int CommunicatorLinker::TriggerLabelExchangeAckEvent(const std::string &toTarget, uint64_t inSequenceId)
330 {
331 // Build LabelExchangeAck Frame
332 int errCode = E_OK;
333 SerialBuffer *buffer = ProtocolProto::BuildLabelExchangeAck(localDistinctValue_, inSequenceId, errCode);
334 if (errCode != E_OK) {
335 LOGE("[Linker][TriggerAck] BuildAck fail, error=%d", errCode);
336 return errCode;
337 }
338 // Apply for a latest ackId and update ackTriggerId_
339 uint64_t ackId;
340 {
341 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
342 ackId = incAckTriggerId_.fetch_add(1, std::memory_order_seq_cst);
343 ackTriggerId_[toTarget] = ackId;
344 }
345 // Synchronously call SendLabelExchangeAck and hand over buffer to it
346 RefObject::IncObjRef(this); // SendLabelExchangeAck will only DecRef when total done if no need to send
347 SendLabelExchangeAck(toTarget, buffer, inSequenceId, ackId);
348 return E_OK;
349 }
350
351 namespace {
GetDynamicTimeLapseForWaitingAck(uint32_t inRetransmitCount)352 inline uint32_t GetDynamicTimeLapseForWaitingAck(uint32_t inRetransmitCount)
353 {
354 if (inRetransmitCount <= RETRANSMIT_LIMIT_EQUAL_INTERVAL) {
355 return TIME_LAPSE_FOR_WAITING_ACK;
356 }
357 uint32_t subsequentRetransmit = inRetransmitCount - RETRANSMIT_LIMIT_EQUAL_INTERVAL;
358 return subsequentRetransmit * subsequentRetransmit * TIME_LAPSE_FOR_WAITING_ACK;
359 }
360 }
361
SendLabelExchange(const std::string & toTarget,SerialBuffer * inBuff,uint64_t inSequenceId,uint32_t inRetransmitCount)362 void CommunicatorLinker::SendLabelExchange(const std::string &toTarget, SerialBuffer *inBuff, uint64_t inSequenceId,
363 uint32_t inRetransmitCount)
364 {
365 // Check whether have the need to send
366 bool noNeedToSend = ((inRetransmitCount <= RETRANSMIT_LIMIT) ? false : true);
367 {
368 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
369 if (remoteOnlineTarget_.count(toTarget) == 0) {
370 // Target offline
371 noNeedToSend = true;
372 }
373 if (waitAckSeq_[toTarget] > inSequenceId) {
374 // New LabelExchangeEvent had been trigger for this target, so this event can be abort
375 noNeedToSend = true;
376 }
377 if (recvAckSeq_.count(toTarget) != 0 && recvAckSeq_[toTarget] >= inSequenceId) {
378 // Ack of this sequenceId had been received or even later ack had been received
379 noNeedToSend = true;
380 }
381 if (noNeedToSend) { // ATTENTION: This Log should be inside the protection of entireInfoLockGuard!!!
382 LOGI("[Linker][SendLabel] NoNeedSend:target=%s{private}, thisSeqId=%llu, waitAckSeq=%llu, recvAckSeq=%llu,"
383 "retrans=%u.", toTarget.c_str(), ULL(inSequenceId), ULL(waitAckSeq_[toTarget]),
384 ULL((recvAckSeq_.count(toTarget) != 0) ? recvAckSeq_[toTarget] : ~ULL(0)), inRetransmitCount);
385 } // ~0 indicate no ack ever recv
386 }
387 if (noNeedToSend) {
388 delete inBuff;
389 inBuff = nullptr;
390 RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
391 return;
392 }
393
394 int error = E_OK;
395 SerialBuffer *cloneBuffer = inBuff->Clone(error);
396 TaskConfig config{true, 0, Priority::HIGH};
397 int errCode = aggregator_->CreateSendTask(toTarget, inBuff, FrameType::COMMUNICATION_LABEL_EXCHANGE, config);
398 if (errCode == E_OK) {
399 // Send ok, go on to wait ack, and maybe resend
400 if (error == E_OK) {
401 SuspendByOnceTimer([this, toTarget, cloneBuffer, inSequenceId, inRetransmitCount]() {
402 // Note: toTarget and cloneBuffer and inSequenceId should be captured by value (must not by reference)
403 SendLabelExchange(toTarget, cloneBuffer, inSequenceId, inRetransmitCount + 1); // Do retransmission
404 }, GetDynamicTimeLapseForWaitingAck(inRetransmitCount));
405 } else {
406 LOGE("[Linker][SendLabel] CloneFail: target=%s{private}, SeqId=%llu.", toTarget.c_str(), ULL(inSequenceId));
407 }
408 } else {
409 // Send fail, go on to retry send
410 SuspendByOnceTimer([this, toTarget, inBuff, inSequenceId, inRetransmitCount]() {
411 // Note: toTarget and inBuff and inSequenceId should be captured by value (must not by reference)
412 SendLabelExchange(toTarget, inBuff, inSequenceId, inRetransmitCount); // Just do retry send
413 }, TIME_LAPSE_FOR_RETRY_SEND);
414 if (error == E_OK) {
415 delete cloneBuffer;
416 cloneBuffer = nullptr;
417 }
418 }
419 }
420
SendLabelExchangeAck(const std::string & toTarget,SerialBuffer * inBuff,uint64_t inSequenceId,uint64_t inAckTriggerId)421 void CommunicatorLinker::SendLabelExchangeAck(const std::string &toTarget, SerialBuffer *inBuff,
422 uint64_t inSequenceId, uint64_t inAckTriggerId)
423 {
424 // Check whether have the need to send
425 bool noNeedToSend = false;
426 {
427 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
428 // Now that LabelExchange is received, LabelExchangeAck should be send no matter target online or not
429 if (topRecvLabelSeq_.count(toTarget) != 0 && topRecvLabelSeq_[toTarget] > inSequenceId) {
430 // topRecvLabelSeq for this target may have been erased, detect it for avoid creating an entry
431 // New LabelExchange had been received for this target, so this event can be abort
432 noNeedToSend = true;
433 }
434 if (ackTriggerId_[toTarget] > inAckTriggerId) {
435 // New LabelExchangeAck had been trigger for this target, so this event can be abort
436 noNeedToSend = true;
437 }
438 if (noNeedToSend) { // ATTENTION: This Log should be inside the protection of entireInfoLockGuard!!!
439 LOGI("[Linker][SendAck] NoNeedSend:target=%s{private}, thisSeqId=%llu, topRecLabelSeq=%llu, thisAckId=%llu,"
440 "ackTriggerId=%llu.", toTarget.c_str(), ULL(inSequenceId), // ~0 indacate no label ever recv
441 ULL((topRecvLabelSeq_.count(toTarget) != 0) ? topRecvLabelSeq_[toTarget] : ~ULL(0)),
442 ULL(inAckTriggerId), ULL(ackTriggerId_[toTarget]));
443 }
444 }
445 if (noNeedToSend) {
446 delete inBuff;
447 inBuff = nullptr;
448 RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
449 return;
450 }
451
452 TaskConfig config{true, 0, Priority::HIGH};
453 int errCode = aggregator_->CreateSendTask(toTarget, inBuff, FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK, config);
454 if (errCode == E_OK) {
455 // Send ok, finish event
456 RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
457 } else {
458 // Send fail, go on to retry send
459 SuspendByOnceTimer([this, toTarget, inBuff, inSequenceId, inAckTriggerId]() {
460 // Note: toTarget, inBuff, inSequenceId, inAckTriggerId should be captured by value (must not by reference)
461 SendLabelExchangeAck(toTarget, inBuff, inSequenceId, inAckTriggerId);
462 }, TIME_LAPSE_FOR_RETRY_SEND);
463 }
464 }
465
466 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorLinker)
467 } // namespace DistributedDB
468