• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "communicator_linker.h"
17 #include "communicator_aggregator.h"
18 #include "db_errno.h"
19 #include "hash.h"
20 #include "log_print.h"
21 #include "platform_specific.h"
22 #include "protocol_proto.h"
23 
24 namespace DistributedDB {
25 namespace {
26 constexpr uint32_t TIME_LAPSE_FOR_WAITING_ACK = 5000; // 5s
27 constexpr uint32_t TIME_LAPSE_FOR_RETRY_SEND = 1000; // 1s
28 constexpr uint32_t RETRANSMIT_LIMIT = 20; // Currently we do at most 20 retransmission if no ack received
29 constexpr uint32_t RETRANSMIT_LIMIT_EQUAL_INTERVAL = 5; // First 5 retransmission will be equal interval
30 }
31 
CommunicatorLinker(CommunicatorAggregator * inAggregator)32 CommunicatorLinker::CommunicatorLinker(CommunicatorAggregator *inAggregator)
33     : incSequenceId_(0), incAckTriggerId_(0)
34 {
35     aggregator_ = inAggregator;
36     RefObject::IncObjRef(aggregator_); // The linker rely on CommunicatorAggregator
37 }
38 
~CommunicatorLinker()39 CommunicatorLinker::~CommunicatorLinker()
40 {
41     RefObject::DecObjRef(aggregator_); // The linker no longer rely on CommunicatorAggregator
42     aggregator_ = nullptr;
43 }
44 
Initialize()45 void CommunicatorLinker::Initialize()
46 {
47     uint64_t curTime = 0;
48     int errCode = OS::GetCurrentSysTimeInMicrosecond(curTime);
49     if (errCode != E_OK) {
50         LOGW("[Linker][Init] Get systime fail, use default, errCode=%d.", errCode);
51     }
52     std::string curTimeStr = std::to_string(curTime);
53     localDistinctValue_ = Hash::HashFunc(curTimeStr);
54     LOGI("[Linker][Init] curTime=%" PRIu64 ", distinct=%" PRIu64 ".", ULL(curTime), ULL(localDistinctValue_));
55 }
56 
57 // Create async task to send out label_exchange and waiting for label_exchange_ack.
58 // If waiting timeout, pass the send&wait task to overrall timing retry task.
TargetOnline(const std::string & inTarget,std::set<LabelType> & outRelatedLabels)59 int CommunicatorLinker::TargetOnline(const std::string &inTarget, std::set<LabelType> &outRelatedLabels)
60 {
61     {
62         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
63         // if inTarget is offline before, use the remembered previous online labels to decide which communicator to be
64         // notified online. Such handling is in case for abnormal unilateral offline, which A and B is notified online
65         // mutually, then B is notified A offline and for a while B is notified A online again, but A feels no notify.
66         if (remoteOnlineTarget_.count(inTarget) == 0) {
67             outRelatedLabels = targetMapOnlineLabels_[inTarget];
68             remoteOnlineTarget_.insert(inTarget);
69         }
70     }
71     return TriggerLabelExchangeEvent(inTarget);
72 }
73 
74 // Clear all labels related to this target. Let no longer waiting for ack of this target.
75 // The caller should notify all related communicator about this target offline.
TargetOffline(const std::string & inTarget,std::set<LabelType> & outRelatedLabels)76 void CommunicatorLinker::TargetOffline(const std::string &inTarget, std::set<LabelType> &outRelatedLabels)
77 {
78     std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
79     outRelatedLabels = targetMapOnlineLabels_[inTarget];
80     // Do not erase the Labels of inTarget from targetMapOnlineLabels_, remember it for using when TargetOnline
81     remoteOnlineTarget_.erase(inTarget);
82     // Note: The process of remote target may quit, when remote target restart,
83     // the distinctValue of this remote target may be changed, and the sequenceId may start from zero
84     targetDistinctValue_.erase(inTarget);
85     topRecvLabelSeq_.erase(inTarget);
86 }
87 
88 // Add local label. Create async task to send out label_exchange and waiting for label_exchange_ack.
89 // If waiting timeout, pass the send&wait task to overrall timing retry task.
90 // Find out targets for this label that is already online.
91 // The caller should notify communicator of this label about already online target.
IncreaseLocalLabel(const LabelType & inLabel,std::set<std::string> & outOnlineTarget)92 int CommunicatorLinker::IncreaseLocalLabel(const LabelType &inLabel, std::set<std::string> &outOnlineTarget)
93 {
94     std::set<std::string> totalOnlineTargets;
95     {
96         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
97         localOnlineLabels_.insert(inLabel);
98         totalOnlineTargets = remoteOnlineTarget_;
99         for (auto &entry : targetMapOnlineLabels_) {
100             if (remoteOnlineTarget_.count(entry.first) == 0) { // Ignore offline target
101                 continue;
102             }
103             if (entry.second.count(inLabel) != 0) { // This online target had opened then same Label
104                 outOnlineTarget.insert(entry.first);
105             }
106         }
107     }
108     bool everFail = false;
109     for (const auto &entry : totalOnlineTargets) {
110         int errCode = TriggerLabelExchangeEvent(entry);
111         if (errCode != E_OK) {
112             everFail = true;
113         }
114     }
115     return everFail ? -E_INTERNAL_ERROR : E_OK;
116 }
117 
118 // Del local label. Create async task to send out label_exchange and waiting for label_exchange_ack.
119 // If waiting timeout, pass the send&wait task to overrall timing retry task.
DecreaseLocalLabel(const LabelType & inLabel)120 int CommunicatorLinker::DecreaseLocalLabel(const LabelType &inLabel)
121 {
122     std::set<std::string> totalOnlineTargets;
123     {
124         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
125         localOnlineLabels_.erase(inLabel);
126         totalOnlineTargets = remoteOnlineTarget_;
127     }
128     bool everFail = false;
129     for (const auto &entry : totalOnlineTargets) {
130         int errCode = TriggerLabelExchangeEvent(entry);
131         if (errCode != E_OK) {
132             everFail = true;
133         }
134     }
135     return everFail ? -E_INTERNAL_ERROR : E_OK;
136 }
137 
138 // Compare the latest labels with previous Label, find out label changes.
139 // The caller should notify the target changes according to label changes.
140 // Update the online labels of this target. Send out label_exchange_ack.
ReceiveLabelExchange(const std::string & inTarget,const std::set<LabelType> & inLatestLabels,uint64_t inDistinctValue,uint64_t inSequenceId,std::map<LabelType,bool> & outChangeLabels)141 int CommunicatorLinker::ReceiveLabelExchange(const std::string &inTarget, const std::set<LabelType> &inLatestLabels,
142     uint64_t inDistinctValue, uint64_t inSequenceId, std::map<LabelType, bool> &outChangeLabels)
143 {
144     {
145         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
146         DetectDistinctValueChange(inTarget, inDistinctValue);
147         if (topRecvLabelSeq_.count(inTarget) == 0) {
148             // Firstly receive LabelExchange from this target
149             topRecvLabelSeq_[inTarget] = inSequenceId;
150         } else if (inSequenceId < topRecvLabelSeq_[inTarget]) {
151             // inSequenceId can be equal to topRecvLabelSeq, in this case, the ack of this sequence send to this target
152             // may be lost, this target resend LabelExchange, and we should resend ack to this target
153             LOGW("[Linker][RecvLabel] inSequenceId=%" PRIu64 " smaller than topRecvLabelSeq=%" PRIu64
154                 ". Frame Ignored.", ULL(inSequenceId), ULL(topRecvLabelSeq_[inTarget]));
155             return -E_OUT_OF_DATE;
156         } else {
157             // Update top sequenceId of received LabelExchange
158             topRecvLabelSeq_[inTarget] = inSequenceId;
159         }
160         // Find out online labels by check difference
161         for (auto &entry : inLatestLabels) {
162             if (targetMapOnlineLabels_[inTarget].count(entry) == 0) {
163                 outChangeLabels[entry] = true;
164             }
165         }
166         // Find out offline labels by check difference
167         for (const auto &entry : targetMapOnlineLabels_[inTarget]) {
168             if (inLatestLabels.count(entry) == 0) {
169                 outChangeLabels[entry] = false;
170             }
171         }
172         // Update target online labels
173         targetMapOnlineLabels_[inTarget] = inLatestLabels;
174     }
175     // Trigger sending ack
176     int errCode = TriggerLabelExchangeAckEvent(inTarget, inSequenceId);
177     if (errCode != E_OK) {
178         LOGE("[Linker][RecvLabel] TriggerAckEvent Fail, Just Log, errCode=%d.", errCode);
179         // Do not return error here
180     }
181     return E_OK;
182 }
183 
184 // Waiting finish if the ack is what linker wait by check inSequenceId
185 // Similarly, stop the retry task of this Target.
ReceiveLabelExchangeAck(const std::string & inTarget,uint64_t inDistinctValue,uint64_t inSequenceId)186 int CommunicatorLinker::ReceiveLabelExchangeAck(const std::string &inTarget, uint64_t inDistinctValue,
187     uint64_t inSequenceId)
188 {
189     std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
190     DetectDistinctValueChange(inTarget, inDistinctValue);
191     // This two judge is for detecting case that local device process restart so incSequenceId_ restart from 0
192     // The remote device may send an ack cause by previous process, which may destroy the functionality of this process
193     if (waitAckSeq_.count(inTarget) == 0) {
194         LOGW("[Linker][RecvAck] Not waiting any ack now, inSequenceId=%" PRIu64 "", ULL(inSequenceId));
195         return -E_NOT_FOUND;
196     }
197     if (waitAckSeq_[inTarget] < inSequenceId) {
198         LOGW("[Linker][RecvAck] Not waiting this ack now, inSequenceId=%" PRIu64 ", waitAckSeq_=%" PRIu64,
199             ULL(inSequenceId), ULL(waitAckSeq_[inTarget]));
200         return -E_NOT_FOUND;
201     }
202     // An valid ack received
203     if (recvAckSeq_.count(inTarget) == 0) {
204         // Firstly receive LabelExchangeAck from this target
205         recvAckSeq_[inTarget] = inSequenceId;
206     } else if (inSequenceId <= recvAckSeq_[inTarget]) {
207         LOGW("[Linker][RecvAck] inSequenceId=%" PRIu64 " not greater than recvAckSeq_=%" PRIu64 ". Frame Ignored.",
208             ULL(inSequenceId), ULL(recvAckSeq_[inTarget]));
209         return -E_OUT_OF_DATE;
210     } else {
211         // Update top sequenceId of received LabelExchangeAck
212         recvAckSeq_[inTarget] = inSequenceId;
213     }
214     return E_OK;
215 }
216 
GetOnlineRemoteTarget() const217 std::set<std::string> CommunicatorLinker::GetOnlineRemoteTarget() const
218 {
219     std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
220     return remoteOnlineTarget_;
221 }
222 
IsRemoteTargetOnline(const std::string & inTarget) const223 bool CommunicatorLinker::IsRemoteTargetOnline(const std::string &inTarget) const
224 {
225     std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
226     if (remoteOnlineTarget_.count(inTarget) != 0) {
227         return true;
228     }
229     return false;
230 }
231 
232 // inCountDown is in millisecond
SuspendByOnceTimer(const std::function<void (void)> & inAction,uint32_t inCountDown)233 void CommunicatorLinker::SuspendByOnceTimer(const std::function<void(void)> &inAction, uint32_t inCountDown)
234 {
235     TimerId thisTimerId = 0;
236     RuntimeContext *context = RuntimeContext::GetInstance();
237     int errCode = context->SetTimer(static_cast<int>(inCountDown), [inAction](TimerId inTimerId)->int{
238         // Note: inAction should be captured by value (must not by reference)
239         inAction();
240         return -E_END_TIMER;
241     }, nullptr, thisTimerId);
242     if (errCode == E_OK) {
243         return;
244     }
245     std::thread timerThread([inAction, inCountDown]() {
246         // Note: inAction and inCountDown should be captured by value (must not by reference)
247         std::this_thread::sleep_for(std::chrono::milliseconds(inCountDown));
248         inAction();
249     });
250     timerThread.detach();
251 }
252 
253 // This function should be called under protection of entireInfoMutex_
DetectDistinctValueChange(const std::string & inTarget,uint64_t inDistinctValue)254 void CommunicatorLinker::DetectDistinctValueChange(const std::string &inTarget, uint64_t inDistinctValue)
255 {
256     // Firstly received distinctValue from this target ever or after offline
257     if (targetDistinctValue_.count(inTarget) == 0) {
258         targetDistinctValue_.try_emplace(inTarget, inDistinctValue);
259         return;
260     }
261 
262     // DistinctValue is the same as before
263     if (targetDistinctValue_[inTarget] == inDistinctValue) {
264         return;
265     }
266 
267     // DistinctValue change detected !!! This must be caused by malfunctioning of underlayer communication component.
268     LOGE("[Linker][Detect] ######## DISTINCT VALUE CHANGE DETECTED : %" PRIu64 " VS %" PRIu64 " ########",
269         ULL(inDistinctValue), ULL(targetDistinctValue_[inTarget]));
270     targetDistinctValue_[inTarget] = inDistinctValue;
271     // The process of remote target must have undergone a quit and restart, the remote sequenceId will start from zero.
272     topRecvLabelSeq_.erase(inTarget);
273     RefObject::IncObjRef(this);
274     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, inTarget]() {
275         LOGD("ReTrigger label exchange because remote process restarted!");
276         this->TriggerLabelExchangeEvent(inTarget);
277         RefObject::DecObjRef(this);
278     });
279     if (errCode != E_OK) {
280         LOGD("ReTrigger label exchange failed! errCode = %d", errCode);
281         RefObject::DecObjRef(this);
282     }
283 }
284 
TriggerLabelExchangeEvent(const std::string & toTarget)285 int CommunicatorLinker::TriggerLabelExchangeEvent(const std::string &toTarget)
286 {
287     // Apply for a latest sequenceId
288     uint64_t sequenceId = incSequenceId_.fetch_add(1, std::memory_order_seq_cst);
289     // Get a snapshot of current online labels
290     std::set<LabelType> onlineLabels;
291     {
292         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
293         onlineLabels = localOnlineLabels_;
294     }
295     // Build LabelExchange Frame
296     int error = E_OK;
297     SerialBuffer *buffer = ProtocolProto::BuildLabelExchange(localDistinctValue_, sequenceId, onlineLabels, error);
298     if (error != E_OK) {
299         LOGE("[Linker][TriggerLabel] BuildLabel fail, error=%d", error);
300         return error;
301     }
302     // Update waitAckSeq, Check whether new event be triggered in other thread
303     {
304         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
305         if (waitAckSeq_.count(toTarget) == 0) {
306             // Firstly send LabelExchange to this target
307             waitAckSeq_[toTarget] = sequenceId;
308         } else if (waitAckSeq_[toTarget] > sequenceId) {
309             // New LabelExchangeEvent had been trigger for this target, so this event can be abort
310             LOGI("[Linker][TriggerLabel] Detect newSeqId=%" PRIu64 " than thisSeqId=%" PRIu64
311                 " be triggered for target=%s{private}", ULL(waitAckSeq_[toTarget]), ULL(sequenceId), toTarget.c_str());
312             delete buffer;
313             buffer = nullptr;
314             return E_OK;
315         } else {
316             waitAckSeq_[toTarget] = sequenceId;
317         }
318     }
319     // Synchronously call SendLabelExchange and hand over buffer to it
320     RefObject::IncObjRef(this); // SendLabelExchange will only DecRef when total done if no need to send
321     SendLabelExchange(toTarget, buffer, sequenceId, 0); // Initially retransmitCount is 0
322     return E_OK;
323 }
324 
TriggerLabelExchangeAckEvent(const std::string & toTarget,uint64_t inSequenceId)325 int CommunicatorLinker::TriggerLabelExchangeAckEvent(const std::string &toTarget, uint64_t inSequenceId)
326 {
327     // Build LabelExchangeAck Frame
328     int errCode = E_OK;
329     SerialBuffer *buffer = ProtocolProto::BuildLabelExchangeAck(localDistinctValue_, inSequenceId, errCode);
330     if (errCode != E_OK) {
331         LOGE("[Linker][TriggerAck] BuildAck fail, error=%d", errCode);
332         return errCode;
333     }
334     // Apply for a latest ackId and update ackTriggerId_
335     uint64_t ackId;
336     {
337         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
338         ackId = incAckTriggerId_.fetch_add(1, std::memory_order_seq_cst);
339         ackTriggerId_[toTarget] = ackId;
340     }
341     // Synchronously call SendLabelExchangeAck and hand over buffer to it
342     RefObject::IncObjRef(this); // SendLabelExchangeAck will only DecRef when total done if no need to send
343     SendLabelExchangeAck(toTarget, buffer, inSequenceId, ackId);
344     return E_OK;
345 }
346 
347 namespace {
GetDynamicTimeLapseForWaitingAck(uint32_t inRetransmitCount)348 inline uint32_t GetDynamicTimeLapseForWaitingAck(uint32_t inRetransmitCount)
349 {
350     if (inRetransmitCount <= RETRANSMIT_LIMIT_EQUAL_INTERVAL) {
351         return TIME_LAPSE_FOR_WAITING_ACK;
352     }
353     uint32_t subsequentRetransmit = inRetransmitCount - RETRANSMIT_LIMIT_EQUAL_INTERVAL;
354     return subsequentRetransmit * subsequentRetransmit * TIME_LAPSE_FOR_WAITING_ACK;
355 }
356 }
357 
SendLabelExchange(const std::string & toTarget,SerialBuffer * inBuff,uint64_t inSequenceId,uint32_t inRetransmitCount)358 void CommunicatorLinker::SendLabelExchange(const std::string &toTarget, SerialBuffer *inBuff, uint64_t inSequenceId,
359     uint32_t inRetransmitCount)
360 {
361     // Check whether have the need to send
362     bool noNeedToSend = inRetransmitCount > RETRANSMIT_LIMIT;
363     {
364         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
365         if (remoteOnlineTarget_.count(toTarget) == 0) {
366             // Target offline
367             noNeedToSend = true;
368         }
369         if (waitAckSeq_[toTarget] > inSequenceId) {
370             // New LabelExchangeEvent had been trigger for this target, so this event can be abort
371             noNeedToSend = true;
372         }
373         if (recvAckSeq_.count(toTarget) != 0 && recvAckSeq_[toTarget] >= inSequenceId) {
374             // Ack of this sequenceId had been received or even later ack had been received
375             noNeedToSend = true;
376         }
377         if (noNeedToSend) { // ATTENTION: This Log should be inside the protection of entireInfoLockGuard!!!
378             LOGI("[Linker][SendLabel] NoNeedSend:target=%s{private}, thisSeqId=%" PRIu64 ", waitAckSeq=%" PRIu64
379                 ", recvAckSeq=%" PRIu64 ",retrans=%u.", toTarget.c_str(), ULL(inSequenceId),
380                 ULL(waitAckSeq_[toTarget]), ULL((recvAckSeq_.count(toTarget) != 0) ? recvAckSeq_[toTarget] : ~ULL(0)),
381                 inRetransmitCount);
382         } // ~0 indicate no ack ever recv
383     }
384     if (noNeedToSend) {
385         delete inBuff;
386         inBuff = nullptr;
387         RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
388         return;
389     }
390 
391     int error = E_OK;
392     SerialBuffer *cloneBuffer = inBuff->Clone(error);
393     TaskConfig config{true, 0, Priority::HIGH};
394     int errCode = aggregator_->ScheduleSendTask(toTarget, inBuff, FrameType::COMMUNICATION_LABEL_EXCHANGE, config);
395     if (errCode == E_OK) {
396         // Send ok, go on to wait ack, and maybe resend
397         if (error == E_OK) {
398             SuspendByOnceTimer([this, toTarget, cloneBuffer, inSequenceId, inRetransmitCount]() {
399                 // Note: toTarget and cloneBuffer and inSequenceId should be captured by value (must not by reference)
400                 SendLabelExchange(toTarget, cloneBuffer, inSequenceId, inRetransmitCount + 1); // Do retransmission
401             }, GetDynamicTimeLapseForWaitingAck(inRetransmitCount));
402         } else {
403             LOGE("[Linker][SendLabel] CloneFail: target=%s{private}, SeqId=%" PRIu64 ".",
404             toTarget.c_str(), ULL(inSequenceId));
405         }
406     } else {
407         // Send fail, go on to retry send
408         SuspendByOnceTimer([this, toTarget, inBuff, inSequenceId, inRetransmitCount]() {
409             // Note: toTarget and inBuff and inSequenceId should be captured by value (must not by reference)
410             SendLabelExchange(toTarget, inBuff, inSequenceId, inRetransmitCount); // Just do retry send
411         }, TIME_LAPSE_FOR_RETRY_SEND);
412         if (error == E_OK) {
413             delete cloneBuffer;
414             cloneBuffer = nullptr;
415         }
416     }
417 }
418 
SendLabelExchangeAck(const std::string & toTarget,SerialBuffer * inBuff,uint64_t inSequenceId,uint64_t inAckTriggerId)419 void CommunicatorLinker::SendLabelExchangeAck(const std::string &toTarget, SerialBuffer *inBuff,
420     uint64_t inSequenceId, uint64_t inAckTriggerId)
421 {
422     // Check whether have the need to send
423     bool noNeedToSend = false;
424     {
425         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
426         // Now that LabelExchange is received, LabelExchangeAck should be send no matter target online or not
427         if (topRecvLabelSeq_.count(toTarget) != 0 && topRecvLabelSeq_[toTarget] > inSequenceId) {
428             // topRecvLabelSeq for this target may have been erased, detect it for avoid creating an entry
429             // New LabelExchange had been received for this target, so this event can be abort
430             noNeedToSend = true;
431         }
432         if (ackTriggerId_[toTarget] > inAckTriggerId) {
433             // New LabelExchangeAck had been trigger for this target, so this event can be abort
434             noNeedToSend = true;
435         }
436         if (noNeedToSend) { // ATTENTION: This Log should be inside the protection of entireInfoLockGuard!!!
437             LOGI("[Linker][SendAck] NoNeedSend:target=%s{private}, thisSeqId=%" PRIu64 ", topRecLabelSeq=%" PRIu64
438                 ", thisAckId=%" PRIu64 ",ackTriggerId=%" PRIu64 ".",
439                 toTarget.c_str(), ULL(inSequenceId), // ~0 indacate no label ever recv
440                 ULL((topRecvLabelSeq_.count(toTarget) != 0) ? topRecvLabelSeq_[toTarget] : ~ULL(0)),
441                 ULL(inAckTriggerId), ULL(ackTriggerId_[toTarget]));
442         }
443     }
444     if (noNeedToSend) {
445         delete inBuff;
446         inBuff = nullptr;
447         RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
448         return;
449     }
450 
451     TaskConfig config{true, 0, Priority::HIGH};
452     int errCode = aggregator_->ScheduleSendTask(toTarget, inBuff, FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK, config);
453     if (errCode == E_OK) {
454         // Send ok, finish event
455         RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
456     } else {
457         // Send fail, go on to retry send
458         SuspendByOnceTimer([this, toTarget, inBuff, inSequenceId, inAckTriggerId]() {
459             // Note: toTarget, inBuff, inSequenceId, inAckTriggerId should be captured by value (must not by reference)
460             SendLabelExchangeAck(toTarget, inBuff, inSequenceId, inAckTriggerId);
461         }, TIME_LAPSE_FOR_RETRY_SEND);
462     }
463 }
464 
465 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorLinker)
466 } // namespace DistributedDB
467