1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "communicator_linker.h"
17 #include "communicator_aggregator.h"
18 #include "db_errno.h"
19 #include "hash.h"
20 #include "log_print.h"
21 #include "platform_specific.h"
22 #include "protocol_proto.h"
23
24 namespace DistributedDB {
25 namespace {
26 constexpr uint32_t TIME_LAPSE_FOR_WAITING_ACK = 5000; // 5s
27 constexpr uint32_t TIME_LAPSE_FOR_RETRY_SEND = 1000; // 1s
28 constexpr uint32_t RETRANSMIT_LIMIT = 20; // Currently we do at most 20 retransmission if no ack received
29 constexpr uint32_t RETRANSMIT_LIMIT_EQUAL_INTERVAL = 5; // First 5 retransmission will be equal interval
30 }
31
CommunicatorLinker(CommunicatorAggregator * inAggregator)32 CommunicatorLinker::CommunicatorLinker(CommunicatorAggregator *inAggregator)
33 : incSequenceId_(0), incAckTriggerId_(0)
34 {
35 aggregator_ = inAggregator;
36 RefObject::IncObjRef(aggregator_); // The linker rely on CommunicatorAggregator
37 }
38
~CommunicatorLinker()39 CommunicatorLinker::~CommunicatorLinker()
40 {
41 RefObject::DecObjRef(aggregator_); // The linker no longer rely on CommunicatorAggregator
42 aggregator_ = nullptr;
43 }
44
Initialize()45 void CommunicatorLinker::Initialize()
46 {
47 uint64_t curTime = 0;
48 int errCode = OS::GetCurrentSysTimeInMicrosecond(curTime);
49 if (errCode != E_OK) {
50 LOGW("[Linker][Init] Get systime fail, use default, errCode=%d.", errCode);
51 }
52 std::string curTimeStr = std::to_string(curTime);
53 localDistinctValue_ = Hash::HashFunc(curTimeStr);
54 LOGI("[Linker][Init] curTime=%" PRIu64 ", distinct=%" PRIu64 ".", ULL(curTime), ULL(localDistinctValue_));
55 }
56
57 // Create async task to send out label_exchange and waiting for label_exchange_ack.
58 // If waiting timeout, pass the send&wait task to overrall timing retry task.
TargetOnline(const std::string & inTarget,std::set<LabelType> & outRelatedLabels)59 int CommunicatorLinker::TargetOnline(const std::string &inTarget, std::set<LabelType> &outRelatedLabels)
60 {
61 {
62 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
63 // if inTarget is offline before, use the remembered previous online labels to decide which communicator to be
64 // notified online. Such handling is in case for abnormal unilateral offline, which A and B is notified online
65 // mutually, then B is notified A offline and for a while B is notified A online again, but A feels no notify.
66 if (remoteOnlineTarget_.count(inTarget) == 0) {
67 outRelatedLabels = targetMapOnlineLabels_[inTarget];
68 remoteOnlineTarget_.insert(inTarget);
69 }
70 }
71 return TriggerLabelExchangeEvent(inTarget);
72 }
73
74 // Clear all labels related to this target. Let no longer waiting for ack of this target.
75 // The caller should notify all related communicator about this target offline.
TargetOffline(const std::string & inTarget,std::set<LabelType> & outRelatedLabels)76 void CommunicatorLinker::TargetOffline(const std::string &inTarget, std::set<LabelType> &outRelatedLabels)
77 {
78 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
79 outRelatedLabels = targetMapOnlineLabels_[inTarget];
80 // Do not erase the Labels of inTarget from targetMapOnlineLabels_, remember it for using when TargetOnline
81 remoteOnlineTarget_.erase(inTarget);
82 // Note: The process of remote target may quit, when remote target restart,
83 // the distinctValue of this remote target may be changed, and the sequenceId may start from zero
84 targetDistinctValue_.erase(inTarget);
85 topRecvLabelSeq_.erase(inTarget);
86 }
87
88 // Add local label. Create async task to send out label_exchange and waiting for label_exchange_ack.
89 // If waiting timeout, pass the send&wait task to overrall timing retry task.
90 // Find out targets for this label that is already online.
91 // The caller should notify communicator of this label about already online target.
IncreaseLocalLabel(const LabelType & inLabel,std::set<std::string> & outOnlineTarget)92 int CommunicatorLinker::IncreaseLocalLabel(const LabelType &inLabel, std::set<std::string> &outOnlineTarget)
93 {
94 std::set<std::string> totalOnlineTargets;
95 {
96 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
97 localOnlineLabels_.insert(inLabel);
98 totalOnlineTargets = remoteOnlineTarget_;
99 for (auto &entry : targetMapOnlineLabels_) {
100 if (remoteOnlineTarget_.count(entry.first) == 0) { // Ignore offline target
101 continue;
102 }
103 if (entry.second.count(inLabel) != 0) { // This online target had opened then same Label
104 outOnlineTarget.insert(entry.first);
105 }
106 }
107 }
108 bool everFail = false;
109 for (const auto &entry : totalOnlineTargets) {
110 int errCode = TriggerLabelExchangeEvent(entry);
111 if (errCode != E_OK) {
112 everFail = true;
113 }
114 }
115 return everFail ? -E_INTERNAL_ERROR : E_OK;
116 }
117
118 // Del local label. Create async task to send out label_exchange and waiting for label_exchange_ack.
119 // If waiting timeout, pass the send&wait task to overrall timing retry task.
DecreaseLocalLabel(const LabelType & inLabel)120 int CommunicatorLinker::DecreaseLocalLabel(const LabelType &inLabel)
121 {
122 std::set<std::string> totalOnlineTargets;
123 {
124 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
125 localOnlineLabels_.erase(inLabel);
126 totalOnlineTargets = remoteOnlineTarget_;
127 }
128 bool everFail = false;
129 for (const auto &entry : totalOnlineTargets) {
130 int errCode = TriggerLabelExchangeEvent(entry);
131 if (errCode != E_OK) {
132 everFail = true;
133 }
134 }
135 return everFail ? -E_INTERNAL_ERROR : E_OK;
136 }
137
138 // Compare the latest labels with previous Label, find out label changes.
139 // The caller should notify the target changes according to label changes.
140 // Update the online labels of this target. Send out label_exchange_ack.
ReceiveLabelExchange(const std::string & inTarget,const std::set<LabelType> & inLatestLabels,uint64_t inDistinctValue,uint64_t inSequenceId,std::map<LabelType,bool> & outChangeLabels)141 int CommunicatorLinker::ReceiveLabelExchange(const std::string &inTarget, const std::set<LabelType> &inLatestLabels,
142 uint64_t inDistinctValue, uint64_t inSequenceId, std::map<LabelType, bool> &outChangeLabels)
143 {
144 {
145 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
146 DetectDistinctValueChange(inTarget, inDistinctValue);
147 if (topRecvLabelSeq_.count(inTarget) == 0) {
148 // Firstly receive LabelExchange from this target
149 topRecvLabelSeq_[inTarget] = inSequenceId;
150 } else if (inSequenceId < topRecvLabelSeq_[inTarget]) {
151 // inSequenceId can be equal to topRecvLabelSeq, in this case, the ack of this sequence send to this target
152 // may be lost, this target resend LabelExchange, and we should resend ack to this target
153 LOGW("[Linker][RecvLabel] inSequenceId=%" PRIu64 " smaller than topRecvLabelSeq=%" PRIu64
154 ". Frame Ignored.", ULL(inSequenceId), ULL(topRecvLabelSeq_[inTarget]));
155 return -E_OUT_OF_DATE;
156 } else {
157 // Update top sequenceId of received LabelExchange
158 topRecvLabelSeq_[inTarget] = inSequenceId;
159 }
160 // Find out online labels by check difference
161 for (auto &entry : inLatestLabels) {
162 if (targetMapOnlineLabels_[inTarget].count(entry) == 0) {
163 outChangeLabels[entry] = true;
164 }
165 }
166 // Find out offline labels by check difference
167 for (const auto &entry : targetMapOnlineLabels_[inTarget]) {
168 if (inLatestLabels.count(entry) == 0) {
169 outChangeLabels[entry] = false;
170 }
171 }
172 // Update target online labels
173 targetMapOnlineLabels_[inTarget] = inLatestLabels;
174 }
175 // Trigger sending ack
176 int errCode = TriggerLabelExchangeAckEvent(inTarget, inSequenceId);
177 if (errCode != E_OK) {
178 LOGE("[Linker][RecvLabel] TriggerAckEvent Fail, Just Log, errCode=%d.", errCode);
179 // Do not return error here
180 }
181 return E_OK;
182 }
183
184 // Waiting finish if the ack is what linker wait by check inSequenceId
185 // Similarly, stop the retry task of this Target.
ReceiveLabelExchangeAck(const std::string & inTarget,uint64_t inDistinctValue,uint64_t inSequenceId)186 int CommunicatorLinker::ReceiveLabelExchangeAck(const std::string &inTarget, uint64_t inDistinctValue,
187 uint64_t inSequenceId)
188 {
189 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
190 DetectDistinctValueChange(inTarget, inDistinctValue);
191 // This two judge is for detecting case that local device process restart so incSequenceId_ restart from 0
192 // The remote device may send an ack cause by previous process, which may destroy the functionality of this process
193 if (waitAckSeq_.count(inTarget) == 0) {
194 LOGW("[Linker][RecvAck] Not waiting any ack now, inSequenceId=%" PRIu64 "", ULL(inSequenceId));
195 return -E_NOT_FOUND;
196 }
197 if (waitAckSeq_[inTarget] < inSequenceId) {
198 LOGW("[Linker][RecvAck] Not waiting this ack now, inSequenceId=%" PRIu64 ", waitAckSeq_=%" PRIu64,
199 ULL(inSequenceId), ULL(waitAckSeq_[inTarget]));
200 return -E_NOT_FOUND;
201 }
202 // An valid ack received
203 if (recvAckSeq_.count(inTarget) == 0) {
204 // Firstly receive LabelExchangeAck from this target
205 recvAckSeq_[inTarget] = inSequenceId;
206 } else if (inSequenceId <= recvAckSeq_[inTarget]) {
207 LOGW("[Linker][RecvAck] inSequenceId=%" PRIu64 " not greater than recvAckSeq_=%" PRIu64 ". Frame Ignored.",
208 ULL(inSequenceId), ULL(recvAckSeq_[inTarget]));
209 return -E_OUT_OF_DATE;
210 } else {
211 // Update top sequenceId of received LabelExchangeAck
212 recvAckSeq_[inTarget] = inSequenceId;
213 }
214 return E_OK;
215 }
216
GetOnlineRemoteTarget() const217 std::set<std::string> CommunicatorLinker::GetOnlineRemoteTarget() const
218 {
219 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
220 return remoteOnlineTarget_;
221 }
222
IsRemoteTargetOnline(const std::string & inTarget) const223 bool CommunicatorLinker::IsRemoteTargetOnline(const std::string &inTarget) const
224 {
225 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
226 if (remoteOnlineTarget_.count(inTarget) != 0) {
227 return true;
228 }
229 return false;
230 }
231
232 // inCountDown is in millisecond
SuspendByOnceTimer(const std::function<void (void)> & inAction,uint32_t inCountDown)233 void CommunicatorLinker::SuspendByOnceTimer(const std::function<void(void)> &inAction, uint32_t inCountDown)
234 {
235 TimerId thisTimerId = 0;
236 RuntimeContext *context = RuntimeContext::GetInstance();
237 int errCode = context->SetTimer(static_cast<int>(inCountDown), [inAction](TimerId inTimerId)->int{
238 // Note: inAction should be captured by value (must not by reference)
239 inAction();
240 return -E_END_TIMER;
241 }, nullptr, thisTimerId);
242 if (errCode == E_OK) {
243 return;
244 }
245 std::thread timerThread([inAction, inCountDown]() {
246 // Note: inAction and inCountDown should be captured by value (must not by reference)
247 std::this_thread::sleep_for(std::chrono::milliseconds(inCountDown));
248 inAction();
249 });
250 timerThread.detach();
251 }
252
253 // This function should be called under protection of entireInfoMutex_
DetectDistinctValueChange(const std::string & inTarget,uint64_t inDistinctValue)254 void CommunicatorLinker::DetectDistinctValueChange(const std::string &inTarget, uint64_t inDistinctValue)
255 {
256 // Firstly received distinctValue from this target ever or after offline
257 if (targetDistinctValue_.count(inTarget) == 0) {
258 targetDistinctValue_.try_emplace(inTarget, inDistinctValue);
259 return;
260 }
261
262 // DistinctValue is the same as before
263 if (targetDistinctValue_[inTarget] == inDistinctValue) {
264 return;
265 }
266
267 // DistinctValue change detected !!! This must be caused by malfunctioning of underlayer communication component.
268 LOGE("[Linker][Detect] ######## DISTINCT VALUE CHANGE DETECTED : %" PRIu64 " VS %" PRIu64 " ########",
269 ULL(inDistinctValue), ULL(targetDistinctValue_[inTarget]));
270 targetDistinctValue_[inTarget] = inDistinctValue;
271 // The process of remote target must have undergone a quit and restart, the remote sequenceId will start from zero.
272 topRecvLabelSeq_.erase(inTarget);
273 RefObject::IncObjRef(this);
274 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, inTarget]() {
275 LOGD("ReTrigger label exchange because remote process restarted!");
276 this->TriggerLabelExchangeEvent(inTarget);
277 RefObject::DecObjRef(this);
278 });
279 if (errCode != E_OK) {
280 LOGD("ReTrigger label exchange failed! errCode = %d", errCode);
281 RefObject::DecObjRef(this);
282 }
283 }
284
TriggerLabelExchangeEvent(const std::string & toTarget)285 int CommunicatorLinker::TriggerLabelExchangeEvent(const std::string &toTarget)
286 {
287 // Apply for a latest sequenceId
288 uint64_t sequenceId = incSequenceId_.fetch_add(1, std::memory_order_seq_cst);
289 // Get a snapshot of current online labels
290 std::set<LabelType> onlineLabels;
291 {
292 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
293 onlineLabels = localOnlineLabels_;
294 }
295 // Build LabelExchange Frame
296 int error = E_OK;
297 SerialBuffer *buffer = ProtocolProto::BuildLabelExchange(localDistinctValue_, sequenceId, onlineLabels, error);
298 if (error != E_OK) {
299 LOGE("[Linker][TriggerLabel] BuildLabel fail, error=%d", error);
300 return error;
301 }
302 // Update waitAckSeq, Check whether new event be triggered in other thread
303 {
304 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
305 if (waitAckSeq_.count(toTarget) == 0) {
306 // Firstly send LabelExchange to this target
307 waitAckSeq_[toTarget] = sequenceId;
308 } else if (waitAckSeq_[toTarget] > sequenceId) {
309 // New LabelExchangeEvent had been trigger for this target, so this event can be abort
310 LOGI("[Linker][TriggerLabel] Detect newSeqId=%" PRIu64 " than thisSeqId=%" PRIu64
311 " be triggered for target=%s{private}", ULL(waitAckSeq_[toTarget]), ULL(sequenceId), toTarget.c_str());
312 delete buffer;
313 buffer = nullptr;
314 return E_OK;
315 } else {
316 waitAckSeq_[toTarget] = sequenceId;
317 }
318 }
319 // Synchronously call SendLabelExchange and hand over buffer to it
320 RefObject::IncObjRef(this); // SendLabelExchange will only DecRef when total done if no need to send
321 SendLabelExchange(toTarget, buffer, sequenceId, 0); // Initially retransmitCount is 0
322 return E_OK;
323 }
324
TriggerLabelExchangeAckEvent(const std::string & toTarget,uint64_t inSequenceId)325 int CommunicatorLinker::TriggerLabelExchangeAckEvent(const std::string &toTarget, uint64_t inSequenceId)
326 {
327 // Build LabelExchangeAck Frame
328 int errCode = E_OK;
329 SerialBuffer *buffer = ProtocolProto::BuildLabelExchangeAck(localDistinctValue_, inSequenceId, errCode);
330 if (errCode != E_OK) {
331 LOGE("[Linker][TriggerAck] BuildAck fail, error=%d", errCode);
332 return errCode;
333 }
334 // Apply for a latest ackId and update ackTriggerId_
335 uint64_t ackId;
336 {
337 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
338 ackId = incAckTriggerId_.fetch_add(1, std::memory_order_seq_cst);
339 ackTriggerId_[toTarget] = ackId;
340 }
341 // Synchronously call SendLabelExchangeAck and hand over buffer to it
342 RefObject::IncObjRef(this); // SendLabelExchangeAck will only DecRef when total done if no need to send
343 SendLabelExchangeAck(toTarget, buffer, inSequenceId, ackId);
344 return E_OK;
345 }
346
347 namespace {
GetDynamicTimeLapseForWaitingAck(uint32_t inRetransmitCount)348 inline uint32_t GetDynamicTimeLapseForWaitingAck(uint32_t inRetransmitCount)
349 {
350 if (inRetransmitCount <= RETRANSMIT_LIMIT_EQUAL_INTERVAL) {
351 return TIME_LAPSE_FOR_WAITING_ACK;
352 }
353 uint32_t subsequentRetransmit = inRetransmitCount - RETRANSMIT_LIMIT_EQUAL_INTERVAL;
354 return subsequentRetransmit * subsequentRetransmit * TIME_LAPSE_FOR_WAITING_ACK;
355 }
356 }
357
SendLabelExchange(const std::string & toTarget,SerialBuffer * inBuff,uint64_t inSequenceId,uint32_t inRetransmitCount)358 void CommunicatorLinker::SendLabelExchange(const std::string &toTarget, SerialBuffer *inBuff, uint64_t inSequenceId,
359 uint32_t inRetransmitCount)
360 {
361 // Check whether have the need to send
362 bool noNeedToSend = inRetransmitCount > RETRANSMIT_LIMIT;
363 {
364 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
365 if (remoteOnlineTarget_.count(toTarget) == 0) {
366 // Target offline
367 noNeedToSend = true;
368 }
369 if (waitAckSeq_[toTarget] > inSequenceId) {
370 // New LabelExchangeEvent had been trigger for this target, so this event can be abort
371 noNeedToSend = true;
372 }
373 if (recvAckSeq_.count(toTarget) != 0 && recvAckSeq_[toTarget] >= inSequenceId) {
374 // Ack of this sequenceId had been received or even later ack had been received
375 noNeedToSend = true;
376 }
377 if (noNeedToSend) { // ATTENTION: This Log should be inside the protection of entireInfoLockGuard!!!
378 LOGI("[Linker][SendLabel] NoNeedSend:target=%s{private}, thisSeqId=%" PRIu64 ", waitAckSeq=%" PRIu64
379 ", recvAckSeq=%" PRIu64 ",retrans=%u.", toTarget.c_str(), ULL(inSequenceId),
380 ULL(waitAckSeq_[toTarget]), ULL((recvAckSeq_.count(toTarget) != 0) ? recvAckSeq_[toTarget] : ~ULL(0)),
381 inRetransmitCount);
382 } // ~0 indicate no ack ever recv
383 }
384 if (noNeedToSend) {
385 delete inBuff;
386 inBuff = nullptr;
387 RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
388 return;
389 }
390
391 int error = E_OK;
392 SerialBuffer *cloneBuffer = inBuff->Clone(error);
393 TaskConfig config{true, 0, Priority::HIGH};
394 int errCode = aggregator_->ScheduleSendTask(toTarget, inBuff, FrameType::COMMUNICATION_LABEL_EXCHANGE, config);
395 if (errCode == E_OK) {
396 // Send ok, go on to wait ack, and maybe resend
397 if (error == E_OK) {
398 SuspendByOnceTimer([this, toTarget, cloneBuffer, inSequenceId, inRetransmitCount]() {
399 // Note: toTarget and cloneBuffer and inSequenceId should be captured by value (must not by reference)
400 SendLabelExchange(toTarget, cloneBuffer, inSequenceId, inRetransmitCount + 1); // Do retransmission
401 }, GetDynamicTimeLapseForWaitingAck(inRetransmitCount));
402 } else {
403 LOGE("[Linker][SendLabel] CloneFail: target=%s{private}, SeqId=%" PRIu64 ".",
404 toTarget.c_str(), ULL(inSequenceId));
405 }
406 } else {
407 // Send fail, go on to retry send
408 SuspendByOnceTimer([this, toTarget, inBuff, inSequenceId, inRetransmitCount]() {
409 // Note: toTarget and inBuff and inSequenceId should be captured by value (must not by reference)
410 SendLabelExchange(toTarget, inBuff, inSequenceId, inRetransmitCount); // Just do retry send
411 }, TIME_LAPSE_FOR_RETRY_SEND);
412 if (error == E_OK) {
413 delete cloneBuffer;
414 cloneBuffer = nullptr;
415 }
416 }
417 }
418
SendLabelExchangeAck(const std::string & toTarget,SerialBuffer * inBuff,uint64_t inSequenceId,uint64_t inAckTriggerId)419 void CommunicatorLinker::SendLabelExchangeAck(const std::string &toTarget, SerialBuffer *inBuff,
420 uint64_t inSequenceId, uint64_t inAckTriggerId)
421 {
422 // Check whether have the need to send
423 bool noNeedToSend = false;
424 {
425 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
426 // Now that LabelExchange is received, LabelExchangeAck should be send no matter target online or not
427 if (topRecvLabelSeq_.count(toTarget) != 0 && topRecvLabelSeq_[toTarget] > inSequenceId) {
428 // topRecvLabelSeq for this target may have been erased, detect it for avoid creating an entry
429 // New LabelExchange had been received for this target, so this event can be abort
430 noNeedToSend = true;
431 }
432 if (ackTriggerId_[toTarget] > inAckTriggerId) {
433 // New LabelExchangeAck had been trigger for this target, so this event can be abort
434 noNeedToSend = true;
435 }
436 if (noNeedToSend) { // ATTENTION: This Log should be inside the protection of entireInfoLockGuard!!!
437 LOGI("[Linker][SendAck] NoNeedSend:target=%s{private}, thisSeqId=%" PRIu64 ", topRecLabelSeq=%" PRIu64
438 ", thisAckId=%" PRIu64 ",ackTriggerId=%" PRIu64 ".",
439 toTarget.c_str(), ULL(inSequenceId), // ~0 indacate no label ever recv
440 ULL((topRecvLabelSeq_.count(toTarget) != 0) ? topRecvLabelSeq_[toTarget] : ~ULL(0)),
441 ULL(inAckTriggerId), ULL(ackTriggerId_[toTarget]));
442 }
443 }
444 if (noNeedToSend) {
445 delete inBuff;
446 inBuff = nullptr;
447 RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
448 return;
449 }
450
451 TaskConfig config{true, 0, Priority::HIGH};
452 int errCode = aggregator_->ScheduleSendTask(toTarget, inBuff, FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK, config);
453 if (errCode == E_OK) {
454 // Send ok, finish event
455 RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
456 } else {
457 // Send fail, go on to retry send
458 SuspendByOnceTimer([this, toTarget, inBuff, inSequenceId, inAckTriggerId]() {
459 // Note: toTarget, inBuff, inSequenceId, inAckTriggerId should be captured by value (must not by reference)
460 SendLabelExchangeAck(toTarget, inBuff, inSequenceId, inAckTriggerId);
461 }, TIME_LAPSE_FOR_RETRY_SEND);
462 }
463 }
464
465 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorLinker)
466 } // namespace DistributedDB
467