• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "communicator_linker.h"
17 #include "hash.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "protocol_proto.h"
21 #include "platform_specific.h"
22 #include "communicator_aggregator.h"
23 
24 namespace DistributedDB {
25 namespace {
26 constexpr uint32_t TIME_LAPSE_FOR_WAITING_ACK = 5000; // 5s
27 constexpr uint32_t TIME_LAPSE_FOR_RETRY_SEND = 1000; // 1s
28 constexpr uint32_t RETRANSMIT_LIMIT = 20; // Currently we do at most 20 retransmission if no ack received
29 constexpr uint32_t RETRANSMIT_LIMIT_EQUAL_INTERVAL = 5; // First 5 retransmission will be equal interval
30 }
31 
CommunicatorLinker(CommunicatorAggregator * inAggregator)32 CommunicatorLinker::CommunicatorLinker(CommunicatorAggregator *inAggregator)
33     : incSequenceId_(0), incAckTriggerId_(0)
34 {
35     aggregator_ = inAggregator;
36     RefObject::IncObjRef(aggregator_); // The linker rely on CommunicatorAggregator
37 }
38 
~CommunicatorLinker()39 CommunicatorLinker::~CommunicatorLinker()
40 {
41     RefObject::DecObjRef(aggregator_); // The linker no longer rely on CommunicatorAggregator
42     aggregator_ = nullptr;
43 }
44 
Initialize()45 void CommunicatorLinker::Initialize()
46 {
47     uint64_t curTime = 0;
48     int errCode = OS::GetCurrentSysTimeInMicrosecond(curTime);
49     if (errCode != E_OK) {
50         LOGW("[Linker][Init] Get systime fail, use default, errCode=%d.", errCode);
51     }
52     std::string curTimeStr = std::to_string(curTime);
53     localDistinctValue_ = Hash::HashFunc(curTimeStr);
54     LOGI("[Linker][Init] curTime=%llu, distinct=%llu.", ULL(curTime), ULL(localDistinctValue_));
55 }
56 
57 // Create async task to send out label_exchange and waiting for label_exchange_ack.
58 // If waiting timeout, pass the send&wait task to overrall timing retry task.
TargetOnline(const std::string & inTarget,std::set<LabelType> & outRelatedLabels)59 int CommunicatorLinker::TargetOnline(const std::string &inTarget, std::set<LabelType> &outRelatedLabels)
60 {
61     {
62         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
63         // if inTarget is offline before, use the remembered previous online labels to decide which communicator to be
64         // notified online. Such handling is in case for abnormal unilateral offline, which A and B is notified online
65         // mutually, then B is notified A offline and for a while B is notified A online again, but A feels no notify.
66         if (remoteOnlineTarget_.count(inTarget) == 0) {
67             outRelatedLabels = targetMapOnlineLabels_[inTarget];
68             remoteOnlineTarget_.insert(inTarget);
69         }
70     }
71     return TriggerLabelExchangeEvent(inTarget);
72 }
73 
74 // Clear all labels related to this target. Let no longer waiting for ack of this target.
75 // The caller should notify all related communicator about this target offline.
TargetOffline(const std::string & inTarget,std::set<LabelType> & outRelatedLabels)76 int CommunicatorLinker::TargetOffline(const std::string &inTarget, std::set<LabelType> &outRelatedLabels)
77 {
78     std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
79     outRelatedLabels = targetMapOnlineLabels_[inTarget];
80     // Do not erase the Labels of inTarget from targetMapOnlineLabels_, remember it for using when TargetOnline
81     remoteOnlineTarget_.erase(inTarget);
82     // Note: The process of remote target may quit, when remote target restart,
83     // the distinctValue of this remote target may be changed, and the sequenceId may start from zero
84     targetDistinctValue_.erase(inTarget);
85     topRecvLabelSeq_.erase(inTarget);
86     return E_OK;
87 }
88 
89 // Add local label. Create async task to send out label_exchange and waiting for label_exchange_ack.
90 // If waiting timeout, pass the send&wait task to overrall timing retry task.
91 // Find out targets for this label that is already online.
92 // The caller should notify communicator of this label about already online target.
IncreaseLocalLabel(const LabelType & inLabel,std::set<std::string> & outOnlineTarget)93 int CommunicatorLinker::IncreaseLocalLabel(const LabelType &inLabel, std::set<std::string> &outOnlineTarget)
94 {
95     std::set<std::string> totalOnlineTargets;
96     {
97         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
98         localOnlineLabels_.insert(inLabel);
99         totalOnlineTargets = remoteOnlineTarget_;
100         for (auto &entry : targetMapOnlineLabels_) {
101             if (remoteOnlineTarget_.count(entry.first) == 0) { // Ignore offline target
102                 continue;
103             }
104             if (entry.second.count(inLabel) != 0) { // This online target had opened then same Label
105                 outOnlineTarget.insert(entry.first);
106             }
107         }
108     }
109     bool everFail = false;
110     for (const auto &entry : totalOnlineTargets) {
111         int errCode = TriggerLabelExchangeEvent(entry);
112         if (errCode != E_OK) {
113             everFail = true;
114         }
115     }
116     return everFail ? -E_INTERNAL_ERROR : E_OK;
117 }
118 
119 // Del local label. Create async task to send out label_exchange and waiting for label_exchange_ack.
120 // If waiting timeout, pass the send&wait task to overrall timing retry task.
DecreaseLocalLabel(const LabelType & inLabel)121 int CommunicatorLinker::DecreaseLocalLabel(const LabelType &inLabel)
122 {
123     std::set<std::string> totalOnlineTargets;
124     {
125         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
126         localOnlineLabels_.erase(inLabel);
127         totalOnlineTargets = remoteOnlineTarget_;
128     }
129     bool everFail = false;
130     for (const auto &entry : totalOnlineTargets) {
131         int errCode = TriggerLabelExchangeEvent(entry);
132         if (errCode != E_OK) {
133             everFail = true;
134         }
135     }
136     return everFail ? -E_INTERNAL_ERROR : E_OK;
137 }
138 
139 // Compare the latest labels with previous Label, find out label changes.
140 // The caller should notify the target changes according to label changes.
141 // Update the online labels of this target. Send out label_exchange_ack.
ReceiveLabelExchange(const std::string & inTarget,const std::set<LabelType> & inLatestLabels,uint64_t inDistinctValue,uint64_t inSequenceId,std::map<LabelType,bool> & outChangeLabels)142 int CommunicatorLinker::ReceiveLabelExchange(const std::string &inTarget, const std::set<LabelType> &inLatestLabels,
143     uint64_t inDistinctValue, uint64_t inSequenceId, std::map<LabelType, bool> &outChangeLabels)
144 {
145     {
146         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
147         DetectDistinctValueChange(inTarget, inDistinctValue);
148         if (topRecvLabelSeq_.count(inTarget) == 0) {
149             // Firstly receive LabelExchange from this target
150             topRecvLabelSeq_[inTarget] = inSequenceId;
151         } else if (inSequenceId < topRecvLabelSeq_[inTarget]) {
152             // inSequenceId can be equal to topRecvLabelSeq, in this case, the ack of this sequence send to this target
153             // may be lost, this target resend LabelExchange, and we should resend ack to this target
154             LOGW("[Linker][RecvLabel] inSequenceId=%llu smaller than topRecvLabelSeq=%llu. Frame Ignored.",
155                 ULL(inSequenceId), ULL(topRecvLabelSeq_[inTarget]));
156             return -E_OUT_OF_DATE;
157         } else {
158             // Update top sequenceId of received LabelExchange
159             topRecvLabelSeq_[inTarget] = inSequenceId;
160         }
161         // Find out online labels by check difference
162         for (auto &entry : inLatestLabels) {
163             if (targetMapOnlineLabels_[inTarget].count(entry) == 0) {
164                 outChangeLabels[entry] = true;
165             }
166         }
167         // Find out offline labels by check difference
168         for (const auto &entry : targetMapOnlineLabels_[inTarget]) {
169             if (inLatestLabels.count(entry) == 0) {
170                 outChangeLabels[entry] = false;
171             }
172         }
173         // Update target online labels
174         targetMapOnlineLabels_[inTarget] = inLatestLabels;
175     }
176     // Trigger sending ack
177     int errCode = TriggerLabelExchangeAckEvent(inTarget, inSequenceId);
178     if (errCode != E_OK) {
179         LOGE("[Linker][RecvLabel] TriggerAckEvent Fail, Just Log, errCode=%d.", errCode);
180         // Do not return error here
181     }
182     return E_OK;
183 }
184 
185 // Waiting finish if the ack is what linker wait by check inSequenceId
186 // Similarly, stop the retry task of this Target.
ReceiveLabelExchangeAck(const std::string & inTarget,uint64_t inDistinctValue,uint64_t inSequenceId)187 int CommunicatorLinker::ReceiveLabelExchangeAck(const std::string &inTarget, uint64_t inDistinctValue,
188     uint64_t inSequenceId)
189 {
190     std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
191     DetectDistinctValueChange(inTarget, inDistinctValue);
192     // This two judge is for detecting case that local device process restart so incSequenceId_ restart from 0
193     // The remote device may send an ack cause by previous process, which may destroy the functionality of this process
194     if (waitAckSeq_.count(inTarget) == 0) {
195         LOGW("[Linker][RecvAck] Not waiting any ack now, inSequenceId=%llu", ULL(inSequenceId));
196         return -E_NOT_FOUND;
197     }
198     if (waitAckSeq_[inTarget] < inSequenceId) {
199         LOGW("[Linker][RecvAck] Not waiting this ack now, inSequenceId=%llu, waitAckSeq_=%llu",
200             ULL(inSequenceId), ULL(waitAckSeq_[inTarget]));
201         return -E_NOT_FOUND;
202     }
203     // An valid ack received
204     if (recvAckSeq_.count(inTarget) == 0) {
205         // Firstly receive LabelExchangeAck from this target
206         recvAckSeq_[inTarget] = inSequenceId;
207     } else if (inSequenceId <= recvAckSeq_[inTarget]) {
208         LOGW("[Linker][RecvAck] inSequenceId=%llu not greater than recvAckSeq_=%llu. Frame Ignored.",
209             ULL(inSequenceId), ULL(recvAckSeq_[inTarget]));
210         return -E_OUT_OF_DATE;
211     } else {
212         // Update top sequenceId of received LabelExchangeAck
213         recvAckSeq_[inTarget] = inSequenceId;
214     }
215     return E_OK;
216 }
217 
GetOnlineRemoteTarget() const218 std::set<std::string> CommunicatorLinker::GetOnlineRemoteTarget() const
219 {
220     std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
221     return remoteOnlineTarget_;
222 }
223 
IsRemoteTargetOnline(const std::string & inTarget) const224 bool CommunicatorLinker::IsRemoteTargetOnline(const std::string &inTarget) const
225 {
226     std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
227     if (remoteOnlineTarget_.count(inTarget) != 0) {
228         return true;
229     }
230     return false;
231 }
232 
233 // inCountDown is in millisecond
SuspendByOnceTimer(const std::function<void (void)> & inAction,uint32_t inCountDown)234 void CommunicatorLinker::SuspendByOnceTimer(const std::function<void(void)> &inAction, uint32_t inCountDown)
235 {
236     TimerId thisTimerId = 0;
237     RuntimeContext *context = RuntimeContext::GetInstance();
238     int errCode = context->SetTimer(static_cast<int>(inCountDown), [inAction](TimerId inTimerId)->int{
239         // Note: inAction should be captured by value (must not by reference)
240         LOGI("[Linker][Suspend] Timer Due : inTimerId=%llu.", ULL(inTimerId));
241         inAction();
242         return -E_END_TIMER;
243     }, nullptr, thisTimerId);
244     if (errCode == E_OK) {
245         LOGI("[Linker][Suspend] SetTimer Success : thisTimerId=%llu, wait=%u(ms).", ULL(thisTimerId), inCountDown);
246     } else {
247         LOGI("[Linker][Suspend] SetTimer Fail Raise Thread Instead : errCode=%d, wait=%u(ms).", errCode, inCountDown);
248         std::thread timerThread([inAction, inCountDown]() {
249             // Note: inAction and inCountDown should be captured by value (must not by reference)
250             std::this_thread::sleep_for(std::chrono::milliseconds(inCountDown));
251             inAction();
252         });
253         timerThread.detach();
254     }
255 }
256 
257 // This function should be called under protection of entireInfoMutex_
DetectDistinctValueChange(const std::string & inTarget,uint64_t inDistinctValue)258 void CommunicatorLinker::DetectDistinctValueChange(const std::string &inTarget, uint64_t inDistinctValue)
259 {
260     // Firstly received distinctValue from this target ever or after offline
261     if (targetDistinctValue_.count(inTarget) == 0) {
262         targetDistinctValue_.try_emplace(inTarget, inDistinctValue);
263         return;
264     }
265 
266     // DistinctValue is the same as before
267     if (targetDistinctValue_[inTarget] == inDistinctValue) {
268         return;
269     }
270 
271     // DistinctValue change detected !!! This must be caused by malfunctioning of underlayer communication component.
272     LOGE("[Linker][Detect] ######## DISTINCT VALUE CHANGE DETECTED : %llu VS %llu ########",
273         ULL(inDistinctValue), ULL(targetDistinctValue_[inTarget]));
274     targetDistinctValue_[inTarget] = inDistinctValue;
275     // The process of remote target must have undergone a quit and restart, the remote sequenceId will start from zero.
276     topRecvLabelSeq_.erase(inTarget);
277     RefObject::IncObjRef(this);
278     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, inTarget]() {
279         LOGD("ReTrigger label exchange because remote process restarted!");
280         this->TriggerLabelExchangeEvent(inTarget);
281         RefObject::DecObjRef(this);
282     });
283     if (errCode != E_OK) {
284         LOGD("ReTrigger label exchange failed! errCode = %d", errCode);
285         RefObject::DecObjRef(this);
286     }
287 }
288 
TriggerLabelExchangeEvent(const std::string & toTarget)289 int CommunicatorLinker::TriggerLabelExchangeEvent(const std::string &toTarget)
290 {
291     // Apply for a latest sequenceId
292     uint64_t sequenceId = incSequenceId_.fetch_add(1, std::memory_order_seq_cst);
293     // Get a snapshot of current online labels
294     std::set<LabelType> onlineLabels;
295     {
296         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
297         onlineLabels = localOnlineLabels_;
298     }
299     // Build LabelExchange Frame
300     int error = E_OK;
301     SerialBuffer *buffer = ProtocolProto::BuildLabelExchange(localDistinctValue_, sequenceId, onlineLabels, error);
302     if (error != E_OK) {
303         LOGE("[Linker][TriggerLabel] BuildLabel fail, error=%d", error);
304         return error;
305     }
306     // Update waitAckSeq, Check whether new event be triggered in other thread
307     {
308         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
309         if (waitAckSeq_.count(toTarget) == 0) {
310             // Firstly send LabelExchange to this target
311             waitAckSeq_[toTarget] = sequenceId;
312         } else if (waitAckSeq_[toTarget] > sequenceId) {
313             // New LabelExchangeEvent had been trigger for this target, so this event can be abort
314             LOGI("[Linker][TriggerLabel] Detect newSeqId=%llu than thisSeqId=%llu be triggered for target=%s{private}",
315                 ULL(waitAckSeq_[toTarget]), ULL(sequenceId), toTarget.c_str());
316             delete buffer;
317             buffer = nullptr;
318             return E_OK;
319         } else {
320             waitAckSeq_[toTarget] = sequenceId;
321         }
322     }
323     // Synchronously call SendLabelExchange and hand over buffer to it
324     RefObject::IncObjRef(this); // SendLabelExchange will only DecRef when total done if no need to send
325     SendLabelExchange(toTarget, buffer, sequenceId, 0); // Initially retransmitCount is 0
326     return E_OK;
327 }
328 
TriggerLabelExchangeAckEvent(const std::string & toTarget,uint64_t inSequenceId)329 int CommunicatorLinker::TriggerLabelExchangeAckEvent(const std::string &toTarget, uint64_t inSequenceId)
330 {
331     // Build LabelExchangeAck Frame
332     int errCode = E_OK;
333     SerialBuffer *buffer = ProtocolProto::BuildLabelExchangeAck(localDistinctValue_, inSequenceId, errCode);
334     if (errCode != E_OK) {
335         LOGE("[Linker][TriggerAck] BuildAck fail, error=%d", errCode);
336         return errCode;
337     }
338     // Apply for a latest ackId and update ackTriggerId_
339     uint64_t ackId;
340     {
341         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
342         ackId = incAckTriggerId_.fetch_add(1, std::memory_order_seq_cst);
343         ackTriggerId_[toTarget] = ackId;
344     }
345     // Synchronously call SendLabelExchangeAck and hand over buffer to it
346     RefObject::IncObjRef(this); // SendLabelExchangeAck will only DecRef when total done if no need to send
347     SendLabelExchangeAck(toTarget, buffer, inSequenceId, ackId);
348     return E_OK;
349 }
350 
351 namespace {
GetDynamicTimeLapseForWaitingAck(uint32_t inRetransmitCount)352 inline uint32_t GetDynamicTimeLapseForWaitingAck(uint32_t inRetransmitCount)
353 {
354     if (inRetransmitCount <= RETRANSMIT_LIMIT_EQUAL_INTERVAL) {
355         return TIME_LAPSE_FOR_WAITING_ACK;
356     }
357     uint32_t subsequentRetransmit = inRetransmitCount - RETRANSMIT_LIMIT_EQUAL_INTERVAL;
358     return subsequentRetransmit * subsequentRetransmit * TIME_LAPSE_FOR_WAITING_ACK;
359 }
360 }
361 
SendLabelExchange(const std::string & toTarget,SerialBuffer * inBuff,uint64_t inSequenceId,uint32_t inRetransmitCount)362 void CommunicatorLinker::SendLabelExchange(const std::string &toTarget, SerialBuffer *inBuff, uint64_t inSequenceId,
363     uint32_t inRetransmitCount)
364 {
365     // Check whether have the need to send
366     bool noNeedToSend = ((inRetransmitCount <= RETRANSMIT_LIMIT) ? false : true);
367     {
368         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
369         if (remoteOnlineTarget_.count(toTarget) == 0) {
370             // Target offline
371             noNeedToSend = true;
372         }
373         if (waitAckSeq_[toTarget] > inSequenceId) {
374             // New LabelExchangeEvent had been trigger for this target, so this event can be abort
375             noNeedToSend = true;
376         }
377         if (recvAckSeq_.count(toTarget) != 0 && recvAckSeq_[toTarget] >= inSequenceId) {
378             // Ack of this sequenceId had been received or even later ack had been received
379             noNeedToSend = true;
380         }
381         if (noNeedToSend) { // ATTENTION: This Log should be inside the protection of entireInfoLockGuard!!!
382             LOGI("[Linker][SendLabel] NoNeedSend:target=%s{private}, thisSeqId=%llu, waitAckSeq=%llu, recvAckSeq=%llu,"
383                 "retrans=%u.", toTarget.c_str(), ULL(inSequenceId), ULL(waitAckSeq_[toTarget]),
384                 ULL((recvAckSeq_.count(toTarget) != 0) ? recvAckSeq_[toTarget] : ~ULL(0)), inRetransmitCount);
385         } // ~0 indicate no ack ever recv
386     }
387     if (noNeedToSend) {
388         delete inBuff;
389         inBuff = nullptr;
390         RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
391         return;
392     }
393 
394     int error = E_OK;
395     SerialBuffer *cloneBuffer = inBuff->Clone(error);
396     TaskConfig config{true, 0, Priority::HIGH};
397     int errCode = aggregator_->CreateSendTask(toTarget, inBuff, FrameType::COMMUNICATION_LABEL_EXCHANGE, config);
398     if (errCode == E_OK) {
399         // Send ok, go on to wait ack, and maybe resend
400         if (error == E_OK) {
401             SuspendByOnceTimer([this, toTarget, cloneBuffer, inSequenceId, inRetransmitCount]() {
402                 // Note: toTarget and cloneBuffer and inSequenceId should be captured by value (must not by reference)
403                 SendLabelExchange(toTarget, cloneBuffer, inSequenceId, inRetransmitCount + 1); // Do retransmission
404             }, GetDynamicTimeLapseForWaitingAck(inRetransmitCount));
405         } else {
406             LOGE("[Linker][SendLabel] CloneFail: target=%s{private}, SeqId=%llu.", toTarget.c_str(), ULL(inSequenceId));
407         }
408     } else {
409         // Send fail, go on to retry send
410         SuspendByOnceTimer([this, toTarget, inBuff, inSequenceId, inRetransmitCount]() {
411             // Note: toTarget and inBuff and inSequenceId should be captured by value (must not by reference)
412             SendLabelExchange(toTarget, inBuff, inSequenceId, inRetransmitCount); // Just do retry send
413         }, TIME_LAPSE_FOR_RETRY_SEND);
414         if (error == E_OK) {
415             delete cloneBuffer;
416             cloneBuffer = nullptr;
417         }
418     }
419 }
420 
SendLabelExchangeAck(const std::string & toTarget,SerialBuffer * inBuff,uint64_t inSequenceId,uint64_t inAckTriggerId)421 void CommunicatorLinker::SendLabelExchangeAck(const std::string &toTarget, SerialBuffer *inBuff,
422     uint64_t inSequenceId, uint64_t inAckTriggerId)
423 {
424     // Check whether have the need to send
425     bool noNeedToSend = false;
426     {
427         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
428         // Now that LabelExchange is received, LabelExchangeAck should be send no matter target online or not
429         if (topRecvLabelSeq_.count(toTarget) != 0 && topRecvLabelSeq_[toTarget] > inSequenceId) {
430             // topRecvLabelSeq for this target may have been erased, detect it for avoid creating an entry
431             // New LabelExchange had been received for this target, so this event can be abort
432             noNeedToSend = true;
433         }
434         if (ackTriggerId_[toTarget] > inAckTriggerId) {
435             // New LabelExchangeAck had been trigger for this target, so this event can be abort
436             noNeedToSend = true;
437         }
438         if (noNeedToSend) { // ATTENTION: This Log should be inside the protection of entireInfoLockGuard!!!
439             LOGI("[Linker][SendAck] NoNeedSend:target=%s{private}, thisSeqId=%llu, topRecLabelSeq=%llu, thisAckId=%llu,"
440                 "ackTriggerId=%llu.", toTarget.c_str(), ULL(inSequenceId), // ~0 indacate no label ever recv
441                 ULL((topRecvLabelSeq_.count(toTarget) != 0) ? topRecvLabelSeq_[toTarget] : ~ULL(0)),
442                 ULL(inAckTriggerId), ULL(ackTriggerId_[toTarget]));
443         }
444     }
445     if (noNeedToSend) {
446         delete inBuff;
447         inBuff = nullptr;
448         RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
449         return;
450     }
451 
452     TaskConfig config{true, 0, Priority::HIGH};
453     int errCode = aggregator_->CreateSendTask(toTarget, inBuff, FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK, config);
454     if (errCode == E_OK) {
455         // Send ok, finish event
456         RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
457     } else {
458         // Send fail, go on to retry send
459         SuspendByOnceTimer([this, toTarget, inBuff, inSequenceId, inAckTriggerId]() {
460             // Note: toTarget, inBuff, inSequenceId, inAckTriggerId should be captured by value (must not by reference)
461             SendLabelExchangeAck(toTarget, inBuff, inSequenceId, inAckTriggerId);
462         }, TIME_LAPSE_FOR_RETRY_SEND);
463     }
464 }
465 
466 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorLinker)
467 } // namespace DistributedDB
468