• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "communicator_aggregator.h"
17 
18 #include <sstream>
19 #include "communicator.h"
20 #include "communicator_linker.h"
21 #include "db_common.h"
22 #include "endian_convert.h"
23 #include "hash.h"
24 #include "log_print.h"
25 #include "protocol_proto.h"
26 
27 namespace DistributedDB {
28 namespace {
29 constexpr int MAX_SEND_RETRY = 2;
30 constexpr int RETRY_TIME_SPLIT = 4;
GetThreadId()31 inline std::string GetThreadId()
32 {
33     std::stringstream stream;
34     stream << std::this_thread::get_id();
35     return stream.str();
36 }
37 }
38 
39 std::atomic<bool> CommunicatorAggregator::isCommunicatorNotFoundFeedbackEnable_{true};
40 
CommunicatorAggregator()41 CommunicatorAggregator::CommunicatorAggregator()
42     : shutdown_(false),
43       incFrameId_(0),
44       localSourceId_(0)
45 {
46 }
47 
~CommunicatorAggregator()48 CommunicatorAggregator::~CommunicatorAggregator()
49 {
50     scheduler_.Finalize(); // Clear residual frame dumped by linker after CommunicatorAggregator finalize
51     adapterHandle_ = nullptr;
52     commLinker_ = nullptr;
53 }
54 
Initialize(IAdapter * inAdapter,const std::shared_ptr<DBStatusAdapter> & statusAdapter)55 int CommunicatorAggregator::Initialize(IAdapter *inAdapter, const std::shared_ptr<DBStatusAdapter> &statusAdapter)
56 {
57     if (inAdapter == nullptr) {
58         return -E_INVALID_ARGS;
59     }
60     adapterHandle_ = inAdapter;
61 
62     combiner_.Initialize();
63     retainer_.Initialize();
64     scheduler_.Initialize();
65 
66     int errCode;
67     commLinker_ = new (std::nothrow) CommunicatorLinker(this, statusAdapter);
68     if (commLinker_ == nullptr) {
69         errCode = -E_OUT_OF_MEMORY;
70         goto ROLL_BACK;
71     }
72     commLinker_->Initialize();
73 
74     errCode = RegCallbackToAdapter();
75     if (errCode != E_OK) {
76         goto ROLL_BACK;
77     }
78 
79     errCode = adapterHandle_->StartAdapter();
80     if (errCode != E_OK) {
81         LOGE("[CommAggr][Init] Start Adapter Fail, errCode=%d.", errCode);
82         goto ROLL_BACK;
83     }
84     GenerateLocalSourceId();
85 
86     shutdown_ = false;
87     InitSendThread();
88     dbStatusAdapter_ = statusAdapter;
89     RegDBChangeCallback();
90     return E_OK;
91 ROLL_BACK:
92     UnRegCallbackFromAdapter();
93     if (commLinker_ != nullptr) {
94         RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
95         commLinker_ = nullptr;
96     }
97     // Scheduler do not need to do finalize in this roll_back
98     retainer_.Finalize();
99     combiner_.Finalize();
100     return errCode;
101 }
102 
Finalize()103 void CommunicatorAggregator::Finalize()
104 {
105     shutdown_ = true;
106     retryCv_.notify_all();
107     {
108         std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
109         wakingSignal_ = true;
110         wakingCv_.notify_one();
111     }
112     if (useExclusiveThread_) {
113         exclusiveThread_.join(); // Waiting thread to thoroughly quit
114         LOGI("[CommAggr][Final] Sub Thread Exit.");
115     } else {
116         LOGI("[CommAggr][Final] Begin wait send task exit.");
117         std::unique_lock<std::mutex> scheduleSendTaskLock(scheduleSendTaskMutex_);
118         finalizeCv_.wait(scheduleSendTaskLock, [this]() {
119             return !sendTaskStart_;
120         });
121         LOGI("[CommAggr][Final] End wait send task exit.");
122     }
123     scheduler_.Finalize(); // scheduler_ must finalize here to make space for linker to dump residual frame
124 
125     adapterHandle_->StopAdapter();
126     UnRegCallbackFromAdapter();
127     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure all callback thread quit
128 
129     // No callback now and later, so combiner, retainer and linker can finalize or delete safely
130     RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
131     commLinker_ = nullptr;
132     retainer_.Finalize();
133     combiner_.Finalize();
134     dbStatusAdapter_ = nullptr;
135 }
136 
AllocCommunicator(uint64_t commLabel,int & outErrorNo,const std::string & userId)137 ICommunicator *CommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo, const std::string &userId)
138 {
139     uint64_t netOrderLabel = HostToNet(commLabel);
140     uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
141     std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
142     for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
143         realLabel[i] = eachByte[i];
144     }
145     return AllocCommunicator(realLabel, outErrorNo, userId);
146 }
147 
AllocCommunicator(const std::vector<uint8_t> & commLabel,int & outErrorNo,const std::string & userId)148 ICommunicator *CommunicatorAggregator::AllocCommunicator(const std::vector<uint8_t> &commLabel, int &outErrorNo,
149     const std::string &userId)
150 {
151     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
152     LOGI("[CommAggr][Alloc] Label=%.3s.", VEC_TO_STR(commLabel));
153     if (commLabel.size() != COMM_LABEL_LENGTH) {
154         outErrorNo = -E_INVALID_ARGS;
155         return nullptr;
156     }
157 
158     if (commMap_.count(userId) != 0 && commMap_[userId].count(commLabel) != 0) {
159         outErrorNo = -E_ALREADY_ALLOC;
160         return nullptr;
161     }
162 
163     Communicator *commPtr = new(std::nothrow) Communicator(this, commLabel);
164     if (commPtr == nullptr) {
165         LOGE("[CommAggr][Alloc] Communicator create failed, may be no available memory.");
166         outErrorNo = -E_OUT_OF_MEMORY;
167         return nullptr;
168     }
169     commMap_[userId][commLabel] = {commPtr, false}; // Communicator is not activated when allocated
170     return commPtr;
171 }
172 
ReleaseCommunicator(ICommunicator * inCommunicator,const std::string & userId)173 void CommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator, const std::string &userId)
174 {
175     if (inCommunicator == nullptr) {
176         return;
177     }
178     Communicator *commPtr = static_cast<Communicator *>(inCommunicator);
179     LabelType commLabel = commPtr->GetCommunicatorLabel();
180     LOGI("[CommAggr][Release] Label=%.3s.", VEC_TO_STR(commLabel));
181 
182     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
183     if (commMap_.count(userId) == 0 || commMap_[userId].count(commLabel) == 0) {
184         LOGE("[CommAggr][Release] Not Found.");
185         return;
186     }
187     commMap_[userId].erase(commLabel);
188     if (commMap_[userId].empty()) {
189         commMap_.erase(userId);
190     }
191     RefObject::DecObjRef(commPtr); // Refcount of Communicator is 1 when created, here to unref Communicator
192 
193     int errCode = commLinker_->DecreaseLocalLabel(commLabel);
194     if (errCode != E_OK) {
195         LOGE("[CommAggr][Release] DecreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
196     }
197 }
198 
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)199 int CommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
200     const Finalizer &inOper)
201 {
202     std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
203     return RegCallBack(onCommLack, onCommLackHandle_, inOper, onCommLackFinalizer_);
204 }
205 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)206 int CommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
207 {
208     std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
209     int errCode = RegCallBack(onConnect, onConnectHandle_, inOper, onConnectFinalizer_);
210     if (onConnect && errCode == E_OK) {
211         // Register action and success
212         std::set<std::string> onlineTargets = commLinker_->GetOnlineRemoteTarget();
213         for (auto &entry : onlineTargets) {
214             LOGI("[CommAggr][RegConnect] Online target=%s{private}.", entry.c_str());
215             onConnectHandle_(entry, true);
216         }
217     }
218     return errCode;
219 }
220 
GetCommunicatorAggregatorMtuSize() const221 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize() const
222 {
223     return adapterHandle_->GetMtuSize() - ProtocolProto::GetLengthBeforeSerializedData();
224 }
225 
GetCommunicatorAggregatorMtuSize(const std::string & target) const226 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize(const std::string &target) const
227 {
228     return adapterHandle_->GetMtuSize(target) - ProtocolProto::GetLengthBeforeSerializedData();
229 }
230 
GetCommunicatorAggregatorTimeout() const231 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout() const
232 {
233     return adapterHandle_->GetTimeout();
234 }
235 
GetCommunicatorAggregatorTimeout(const std::string & target) const236 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout(const std::string &target) const
237 {
238     return adapterHandle_->GetTimeout(target);
239 }
240 
IsDeviceOnline(const std::string & device) const241 bool CommunicatorAggregator::IsDeviceOnline(const std::string &device) const
242 {
243     return adapterHandle_->IsDeviceOnline(device);
244 }
245 
GetLocalIdentity(std::string & outTarget) const246 int CommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
247 {
248     return adapterHandle_->GetLocalIdentity(outTarget);
249 }
250 
ActivateCommunicator(const LabelType & commLabel,const std::string & userId)251 void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel, const std::string &userId)
252 {
253     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
254     LOGI("[CommAggr][Activate] Label=%.3s.", VEC_TO_STR(commLabel));
255     if (commMap_[userId].count(commLabel) == 0) {
256         LOGW("[CommAggr][Activate] Communicator of this label not allocated.");
257         return;
258     }
259     if (commMap_[userId].at(commLabel).second) {
260         return;
261     }
262     commMap_[userId].at(commLabel).second = true; // Mark this communicator as activated
263 
264     // IncreaseLocalLabel below and DecreaseLocalLabel in ReleaseCommunicator should all be protected by commMapMutex_
265     // To avoid disordering probably caused by concurrent call to ActivateCommunicator and ReleaseCommunicator
266     std::set<std::string> onlineTargets;
267     int errCode = commLinker_->IncreaseLocalLabel(commLabel, onlineTargets);
268     if (errCode != E_OK) {
269         LOGE("[CommAggr][Activate] IncreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
270         // Do not return here
271     }
272     for (auto &entry : onlineTargets) {
273         LOGI("[CommAggr][Activate] Already Online Target=%s{private}.", entry.c_str());
274         commMap_[userId].at(commLabel).first->OnConnectChange(entry, true);
275     }
276     // Do Redeliver, the communicator is responsible to deal with the frame
277     std::list<FrameInfo> framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel);
278     for (auto &entry : framesToRedeliver) {
279         commMap_[userId].at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer);
280     }
281 }
282 
283 namespace {
DoOnSendEndByTaskIfNeed(const OnSendEnd & onEnd,int result)284 void DoOnSendEndByTaskIfNeed(const OnSendEnd &onEnd, int result)
285 {
286     if (onEnd) { // LCOV_EXCL_BR_LINE
287         TaskAction onSendEndTask = [onEnd, result]() {
288             LOGD("[CommAggr][SendEndTask] Before On Send End.");
289             onEnd(result, true);
290             LOGD("[CommAggr][SendEndTask] After On Send End.");
291         };
292         int errCode = RuntimeContext::GetInstance()->ScheduleTask(onSendEndTask);
293         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
294             LOGE("[CommAggr][SendEndTask] ScheduleTask failed, errCode = %d.", errCode);
295         }
296     }
297 }
298 }
299 
ScheduleSendTask(const std::string & dstTarget,SerialBuffer * inBuff,FrameType inType,const TaskConfig & inConfig,const OnSendEnd & onEnd)300 int CommunicatorAggregator::ScheduleSendTask(const std::string &dstTarget, SerialBuffer *inBuff,
301     FrameType inType, const TaskConfig &inConfig, const OnSendEnd &onEnd)
302 {
303     if (inBuff == nullptr) {
304         return -E_INVALID_ARGS;
305     }
306 
307     if (!ReGenerateLocalSourceIdIfNeed()) {
308         delete inBuff;
309         inBuff = nullptr;
310         DoOnSendEndByTaskIfNeed(onEnd, -E_PERIPHERAL_INTERFACE_FAIL);
311         LOGE("[CommAggr][Create] Exit ok but discard since localSourceId zero, thread=%s.", GetThreadId().c_str());
312         return E_OK; // Returns E_OK here to indicate this buffer was accepted though discard immediately
313     }
314     bool sendLabelExchange = true;
315     if (dbStatusAdapter_ != nullptr) {
316         sendLabelExchange = dbStatusAdapter_->IsSendLabelExchange();
317     }
318     PhyHeaderInfo info{localSourceId_, incFrameId_.fetch_add(1, std::memory_order_seq_cst), inType,
319         sendLabelExchange};
320     int errCode = ProtocolProto::SetPhyHeader(inBuff, info);
321     if (errCode != E_OK) {
322         LOGE("[CommAggr][Create] Set phyHeader fail, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
323         return errCode;
324     }
325     {
326         std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
327         sendRecord_[info.frameId] = {};
328     }
329     SendTask task{inBuff, dstTarget, onEnd, info.frameId, true};
330     if (inConfig.nonBlock) {
331         errCode = scheduler_.AddSendTaskIntoSchedule(task, inConfig.prio);
332     } else {
333         errCode = RetryUntilTimeout(task, inConfig.timeout, inConfig.prio);
334     }
335     if (errCode != E_OK) {
336         LOGW("[CommAggr][Create] Exit failed, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
337         return errCode;
338     }
339     TriggerSendData();
340     LOGI("[CommAggr][Create] Exit ok, dev=%.3s, frameId=%u", dstTarget.c_str(), info.frameId);
341     return E_OK;
342 }
343 
EnableCommunicatorNotFoundFeedback(bool isEnable)344 void CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(bool isEnable)
345 {
346     isCommunicatorNotFoundFeedbackEnable_ = isEnable;
347 }
348 
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const349 int CommunicatorAggregator::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
350 {
351     std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
352     auto pair = versionMap_.find(target);
353     if (pair == versionMap_.end()) { // LCOV_EXCL_BR_LINE
354         return -E_NOT_FOUND;
355     }
356     outVersion = pair->second;
357     return E_OK;
358 }
359 
SendDataRoutine()360 void CommunicatorAggregator::SendDataRoutine()
361 {
362     while (!shutdown_) {
363         if (scheduler_.GetNoDelayTaskCount() == 0) {
364             std::unique_lock<std::mutex> wakingUniqueLock(wakingMutex_);
365             LOGI("[CommAggr][Routine] Send done and sleep.");
366             wakingCv_.wait(wakingUniqueLock, [this] { return this->wakingSignal_; });
367             LOGI("[CommAggr][Routine] Send continue.");
368             wakingSignal_ = false;
369             continue;
370         }
371         SendOnceData();
372     }
373 }
374 
SendPacketsAndDisposeTask(const SendTask & inTask,uint32_t mtu,const std::vector<std::pair<const uint8_t *,std::pair<uint32_t,uint32_t>>> & eachPacket,uint32_t totalLength)375 void CommunicatorAggregator::SendPacketsAndDisposeTask(const SendTask &inTask, uint32_t mtu,
376     const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket, uint32_t totalLength)
377 {
378     bool taskNeedFinalize = true;
379     int errCode = E_OK;
380     ResetFrameRecordIfNeed(inTask.frameId, mtu);
381     uint32_t startIndex;
382     {
383         std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
384         startIndex = sendRecord_[inTask.frameId].sendIndex;
385     }
386     uint64_t currentSendSequenceId = IncreaseSendSequenceId(inTask.dstTarget);
387     for (uint32_t index = startIndex; index < static_cast<uint32_t>(eachPacket.size()) && inTask.isValid; ++index) {
388         auto &entry = eachPacket[index];
389         LOGI("[CommAggr][SendPackets] DoSendBytes, dstTarget=%s{private}, extendHeadLength=%" PRIu32
390             ", packetLength=%" PRIu32 ".", inTask.dstTarget.c_str(), entry.second.first, entry.second.second);
391         ProtocolProto::DisplayPacketInformation(entry.first + entry.second.first, entry.second.second);
392         errCode = adapterHandle_->SendBytes(inTask.dstTarget, entry.first, entry.second.second, totalLength);
393         {
394             std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
395             sendRecord_[inTask.frameId].sendIndex = index;
396         }
397         if (errCode == -E_WAIT_RETRY) {
398             LOGE("[CommAggr][SendPackets] SendBytes temporally fail.");
399             taskNeedFinalize = false;
400             break;
401         } else if (errCode != E_OK) {
402             LOGE("[CommAggr][SendPackets] SendBytes totally fail, errCode=%d.", errCode);
403             break;
404         } else {
405             std::lock_guard<std::mutex> autoLock(retryCountMutex_);
406             retryCount_[inTask.dstTarget] = 0;
407         }
408     }
409     if (errCode == -E_WAIT_RETRY) {
410         RetrySendTaskIfNeed(inTask.dstTarget, currentSendSequenceId);
411     }
412     if (taskNeedFinalize) {
413         TaskFinalizer(inTask, errCode);
414         std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
415         sendRecord_.erase(inTask.frameId);
416     }
417 }
418 
RetryUntilTimeout(SendTask & inTask,uint32_t timeout,Priority inPrio)419 int CommunicatorAggregator::RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio)
420 {
421     int errCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
422     if (errCode != E_OK) {
423         bool notTimeout = true;
424         auto retryFunc = [this, inPrio, &inTask]()->bool {
425             if (this->shutdown_) {
426                 delete inTask.buffer;
427                 inTask.buffer = nullptr;
428                 return true;
429             }
430             int retCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
431             if (retCode != E_OK) {
432                 return false;
433             }
434             return true;
435         };
436 
437         if (timeout == 0) { // Unlimited retry
438             std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
439             retryCv_.wait(retryUniqueLock, retryFunc);
440         } else {
441             std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
442             notTimeout = retryCv_.wait_for(retryUniqueLock, std::chrono::milliseconds(timeout), retryFunc);
443         }
444 
445         if (shutdown_) {
446             return E_OK;
447         }
448         if (!notTimeout) {
449             return -E_TIMEOUT;
450         }
451     }
452     return E_OK;
453 }
454 
TaskFinalizer(const SendTask & inTask,int result)455 void CommunicatorAggregator::TaskFinalizer(const SendTask &inTask, int result)
456 {
457     // Call the OnSendEnd if need
458     if (inTask.onEnd) {
459         LOGD("[CommAggr][TaskFinal] On Send End.");
460         inTask.onEnd(result, true);
461     }
462     // Finalize the task that just scheduled
463     int errCode = scheduler_.FinalizeLastScheduleTask();
464     // Notify Sendable To All Communicator If Need
465     if (errCode == -E_CONTAINER_FULL_TO_NOTFULL) {
466         retryCv_.notify_all();
467     }
468     if (errCode == -E_CONTAINER_NOTEMPTY_TO_EMPTY) {
469         NotifySendableToAllCommunicator();
470     }
471 }
472 
NotifySendableToAllCommunicator()473 void CommunicatorAggregator::NotifySendableToAllCommunicator()
474 {
475     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
476     for (auto &userCommMap : commMap_) {
477         for (auto &entry : userCommMap.second) {
478             // Ignore nonactivated communicator
479             if (entry.second.second) {
480                 entry.second.first->OnSendAvailable();
481             }
482         }
483     }
484 }
485 
OnBytesReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const DataUserInfoProc & userInfoProc)486 void CommunicatorAggregator::OnBytesReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
487     const DataUserInfoProc &userInfoProc)
488 {
489     ProtocolProto::DisplayPacketInformation(bytes, length);
490     ParseResult packetResult;
491     int errCode = ProtocolProto::CheckAndParsePacket(srcTarget, bytes, length, packetResult);
492     if (errCode != E_OK) {
493         LOGE("[CommAggr][Receive] Parse packet fail, errCode=%d.", errCode);
494         if (errCode == -E_VERSION_NOT_SUPPORT) {
495             TriggerVersionNegotiation(srcTarget);
496         }
497         return;
498     }
499 
500     // Update version of remote target
501     SetRemoteCommunicatorVersion(srcTarget, packetResult.GetDbVersion());
502     if (dbStatusAdapter_ != nullptr) {
503         dbStatusAdapter_->SetRemoteOptimizeCommunication(srcTarget, !packetResult.IsSendLabelExchange());
504     }
505     if (packetResult.GetFrameTypeInfo() == FrameType::EMPTY) { // Empty frame will never be fragmented
506         LOGI("[CommAggr][Receive] Empty frame, just ignore in this version of distributeddb.");
507         return;
508     }
509 
510     if (packetResult.IsFragment()) {
511         OnFragmentReceive(srcTarget, bytes, length, packetResult, userInfoProc);
512     } else if (packetResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
513         errCode = OnCommLayerFrameReceive(srcTarget, packetResult);
514         if (errCode != E_OK) {
515             LOGE("[CommAggr][Receive] CommLayer receive fail, errCode=%d.", errCode);
516         }
517     } else {
518         errCode = OnAppLayerFrameReceive(srcTarget, bytes, length, packetResult, userInfoProc);
519         if (errCode != E_OK) {
520             LOGE("[CommAggr][Receive] AppLayer receive fail, errCode=%d.", errCode);
521         }
522     }
523 }
524 
OnTargetChange(const std::string & target,bool isConnect)525 void CommunicatorAggregator::OnTargetChange(const std::string &target, bool isConnect)
526 {
527     if (target.empty()) {
528         LOGE("[CommAggr][OnTarget] Target empty string.");
529         return;
530     }
531     // For process level target change
532     {
533         std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
534         if (onConnectHandle_) {
535             onConnectHandle_(target, isConnect);
536             LOGI("[CommAggr][OnTarget] On Connect End."); // Log in case callback block this thread
537         } else {
538             LOGI("[CommAggr][OnTarget] ConnectHandle invalid currently.");
539         }
540     }
541     std::set<LabelType> relatedLabels;
542     // For communicator level target change
543     if (isConnect) {
544         int errCode = commLinker_->TargetOnline(target, relatedLabels);
545         if (errCode != E_OK) {
546             LOGE("[CommAggr][OnTarget] TargetOnline fail, target=%s{private}, errCode=%d.", target.c_str(), errCode);
547         }
548     } else {
549         commLinker_->TargetOffline(target, relatedLabels);
550     }
551     // All related communicator online or offline this target, no matter TargetOnline or TargetOffline fail or not
552     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
553     for (auto &userCommMap : commMap_) {
554         for (auto &entry: userCommMap.second) {
555             // Ignore nonactivated communicator
556             if (entry.second.second && (!isConnect || (relatedLabels.count(entry.first) != 0))) {
557                 entry.second.first->OnConnectChange(target, isConnect);
558             }
559         }
560     }
561 }
562 
OnSendable(const std::string & target)563 void CommunicatorAggregator::OnSendable(const std::string &target)
564 {
565     int errCode = scheduler_.NoDelayTaskByTarget(target);
566     if (errCode != E_OK) {
567         LOGE("[CommAggr][Sendable] NoDelay target=%s{private} fail, errCode=%d.", target.c_str(), errCode);
568         return;
569     }
570     TriggerSendData();
571 }
572 
OnFragmentReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const DataUserInfoProc & userInfoProc)573 void CommunicatorAggregator::OnFragmentReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
574     const ParseResult &inResult, const DataUserInfoProc &userInfoProc)
575 {
576     int errorNo = E_OK;
577     ParseResult frameResult;
578     SerialBuffer *frameBuffer = combiner_.AssembleFrameFragment(bytes, length, inResult, frameResult, errorNo);
579     if (errorNo != E_OK) {
580         LOGE("[CommAggr][Receive] Combine fail, errCode=%d.", errorNo);
581         return;
582     }
583     if (frameBuffer == nullptr) {
584         LOGW("[CommAggr][Receive] Combine undone.");
585         return;
586     }
587 
588     int errCode = ProtocolProto::CheckAndParseFrame(frameBuffer, frameResult);
589     if (errCode != E_OK) {
590         LOGE("[CommAggr][Receive] Parse frame fail, errCode=%d.", errCode);
591         delete frameBuffer;
592         frameBuffer = nullptr;
593         if (errCode == -E_VERSION_NOT_SUPPORT) {
594             TriggerVersionNegotiation(srcTarget);
595         }
596         return;
597     }
598 
599     if (frameResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
600         errCode = OnCommLayerFrameReceive(srcTarget, frameResult);
601         if (errCode != E_OK) {
602             LOGE("[CommAggr][Receive] CommLayer receive fail after combination, errCode=%d.", errCode);
603         }
604         delete frameBuffer;
605         frameBuffer = nullptr;
606     } else {
607         errCode = OnAppLayerFrameReceive(srcTarget, frameBuffer, frameResult, userInfoProc);
608         if (errCode != E_OK) {
609             LOGE("[CommAggr][Receive] AppLayer receive fail after combination, errCode=%d.", errCode);
610         }
611     }
612 }
613 
OnCommLayerFrameReceive(const std::string & srcTarget,const ParseResult & inResult)614 int CommunicatorAggregator::OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult)
615 {
616     if (inResult.GetFrameTypeInfo() == FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK) {
617         int errCode = commLinker_->ReceiveLabelExchangeAck(srcTarget, inResult.GetLabelExchangeDistinctValue(),
618             inResult.GetLabelExchangeSequenceId());
619         if (errCode != E_OK) {
620             LOGE("[CommAggr][CommReceive] Receive LabelExchangeAck Fail.");
621             return errCode;
622         }
623     } else {
624         std::map<LabelType, bool> changedLabels;
625         int errCode = commLinker_->ReceiveLabelExchange(srcTarget, inResult.GetLatestCommLabels(),
626             inResult.GetLabelExchangeDistinctValue(), inResult.GetLabelExchangeSequenceId(), changedLabels);
627         if (errCode != E_OK) {
628             LOGE("[CommAggr][CommReceive] Receive LabelExchange Fail.");
629             return errCode;
630         }
631         NotifyConnectChange(srcTarget, changedLabels);
632     }
633     return E_OK;
634 }
635 
OnAppLayerFrameReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const DataUserInfoProc & userInfoProc)636 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, const uint8_t *bytes,
637     uint32_t length, const ParseResult &inResult, const DataUserInfoProc &userInfoProc)
638 {
639     SerialBuffer *buffer = new (std::nothrow) SerialBuffer();
640     if (buffer == nullptr) {
641         LOGE("[CommAggr][AppReceive] New SerialBuffer fail.");
642         return -E_OUT_OF_MEMORY;
643     }
644     int errCode = buffer->SetExternalBuff(bytes, length - inResult.GetPaddingLen(),
645         ProtocolProto::GetAppLayerFrameHeaderLength());
646     if (errCode != E_OK) {
647         LOGE("[CommAggr][AppReceive] SetExternalBuff fail, errCode=%d.", errCode);
648         delete buffer;
649         buffer = nullptr;
650         return -E_INTERNAL_ERROR;
651     }
652     return OnAppLayerFrameReceive(srcTarget, buffer, inResult, userInfoProc);
653 }
654 
655 // In early time, we cover "OnAppLayerFrameReceive" totally by commMapMutex_, then search communicator, if not found,
656 // we call onCommLackHandle_ if exist to ask whether to retain this frame or not, if the answer is yes we retain this
657 // frame, otherwise we discard this frame and send out CommunicatorNotFound feedback.
658 // We design so(especially cover this function totally by commMapMutex_) to avoid current situation described below
659 // 1:This func find that target communicator not allocated or activated, so decide to retain this frame.
660 // 2:Thread switch out, the target communicator is allocated and activated, previous retained frame is fetched out.
661 // 3:Thread switch back, this frame is then retained into the retainer, no chance to be fetched out.
662 // In conclusion: the decision to retain a frame and the action to retain a frame should not be separated.
663 // Otherwise, at the action time, the retain decision may be obsolete and wrong.
664 // #### BUT #### since onCommLackHandle_ callback is go beyond DistributedDB and there is the risk that the final upper
665 // user may do something such as GetKvStore(we can prevent them to so) which could result in calling AllocCommunicator
666 // in the same callback thread finally causing DeadLock on commMapMutex_.
667 // #### SO #### we have to make a change described below
668 // 1:Search communicator under commMapMutex_, if found then deliver frame to that communicator and end.
669 // 2:Call onCommLackHandle_ if exist to ask whether to retain this frame or not, without commMapMutex_.
670 // Note: during this period, commMap_ maybe changed, and communicator not found before may exist now.
671 // 3:Search communicator under commMapMutex_ again, if found then deliver frame to that communicator and end.
672 // 4:If still not found, retain this frame if need or otherwise send CommunicatorNotFound feedback.
OnAppLayerFrameReceive(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const ParseResult & inResult,const DataUserInfoProc & userInfoProc)673 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, SerialBuffer *&inFrameBuffer,
674     const ParseResult &inResult, const DataUserInfoProc &userInfoProc)
675 {
676     LabelType toLabel = inResult.GetCommLabel();
677     std::string userId;
678     int ret = GetDataUserId(inResult, toLabel, userInfoProc, userId);
679     if (ret != E_OK) {
680         LOGE("[CommAggr][AppReceive] get data user id err, ret=%d", ret);
681         delete inFrameBuffer;
682         inFrameBuffer = nullptr;
683         return ret;
684     }
685     {
686         std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
687         int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel, userId);
688         if (errCode == E_OK) { // Attention: Here is equal to E_OK
689             return E_OK;
690         }
691     }
692     LOGI("[CommAggr][AppReceive] Communicator of %.3s not found or nonactivated.", VEC_TO_STR(toLabel));
693     int errCode = -E_NOT_FOUND;
694     {
695         std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
696         if (onCommLackHandle_) {
697             errCode = onCommLackHandle_(toLabel, userId);
698             LOGI("[CommAggr][AppReceive] On CommLack End."); // Log in case callback block this thread
699         } else {
700             LOGI("[CommAggr][AppReceive] CommLackHandle invalid currently.");
701         }
702     }
703     // Here we have to lock commMapMutex_ and search communicator again.
704     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
705     int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel, userId);
706     if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK.
707         LOGI("[CommAggr][AppReceive] Communicator of %.3s found after try again(rare case).", VEC_TO_STR(toLabel));
708         return E_OK;
709     }
710     // Here, communicator is still not found, retain or discard according to the result of onCommLackHandle_
711     if (errCode != E_OK) {
712         TryToFeedbackWhenCommunicatorNotFound(srcTarget, toLabel, inFrameBuffer);
713         delete inFrameBuffer;
714         inFrameBuffer = nullptr;
715         return errCode; // The caller will display errCode in log
716     }
717     // Do Retention, the retainer is responsible to deal with the frame
718     retainer_.RetainFrame(FrameInfo{inFrameBuffer, srcTarget, toLabel, inResult.GetFrameId()});
719     inFrameBuffer = nullptr;
720     return E_OK;
721 }
722 
GetDataUserId(const ParseResult & inResult,const LabelType & toLabel,const DataUserInfoProc & userInfoProc,std::string & userId)723 int CommunicatorAggregator::GetDataUserId(const ParseResult &inResult, const LabelType &toLabel,
724     const DataUserInfoProc &userInfoProc, std::string &userId)
725 {
726     if (userInfoProc.processCommunicator == nullptr) {
727         LOGE("[CommAggr][GetDataUserId] processCommunicator is nullptr");
728         return E_INVALID_ARGS;
729     }
730     std::string label(toLabel.begin(), toLabel.end());
731     std::vector<UserInfo> userInfos;
732     DBStatus ret = userInfoProc.processCommunicator->GetDataUserInfo(userInfoProc.data, userInfoProc.length, label,
733         userInfos);
734     LOGI("[CommAggr][GetDataUserId] get data user info, ret=%d", ret);
735     if (ret == NO_PERMISSION) {
736         LOGE("[CommAggr][GetDataUserId] userId dismatched, drop packet");
737         return ret;
738     }
739     if (userInfos.size() >= 1) {
740         userId = userInfos[0].receiveUser;
741     } else {
742         LOGW("[CommAggr][GetDataUserId] userInfos is empty");
743     }
744     return E_OK;
745 }
746 
TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const LabelType & toLabel,const std::string & userId)747 int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget,
748     SerialBuffer *&inFrameBuffer, const LabelType &toLabel, const std::string &userId)
749 {
750     // Ignore nonactivated communicator, which is regarded as inexistent
751     if (commMap_[userId].count(toLabel) != 0 && commMap_[userId].at(toLabel).second) {
752         commMap_[userId].at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer);
753         // Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return.
754         inFrameBuffer = nullptr;
755         return E_OK;
756     }
757     Communicator *communicator = nullptr;
758     bool isEmpty = false;
759     for (auto &userCommMap : commMap_) {
760         for (auto &entry : userCommMap.second) {
761             if (entry.first == toLabel && entry.second.second) {
762                 communicator = entry.second.first;
763                 isEmpty = userCommMap.first.empty();
764                 LOGW("[CommAggr][TryDeliver] Found communicator of %s, but required user is %s",
765                      userCommMap.first.c_str(), userId.c_str());
766                 break;
767             }
768         }
769         if (communicator != nullptr) {
770             break;
771         }
772     }
773     if (communicator != nullptr && (userId.empty() || isEmpty)) {
774         communicator->OnBufferReceive(srcTarget, inFrameBuffer);
775         inFrameBuffer = nullptr;
776         return E_OK;
777     }
778     LOGE("[CommAggr][TryDeliver] Communicator not found");
779     return -E_NOT_FOUND;
780 }
781 
RegCallbackToAdapter()782 int CommunicatorAggregator::RegCallbackToAdapter()
783 {
784     RefObject::IncObjRef(this); // Reference to be hold by adapter
785     int errCode = adapterHandle_->RegBytesReceiveCallback(
786         [this](const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
787             const DataUserInfoProc &userInfoProc) {
788             OnBytesReceive(srcTarget, bytes, length, userInfoProc);
789         }, [this]() { RefObject::DecObjRef(this); });
790     if (errCode != E_OK) {
791         RefObject::DecObjRef(this); // Rollback in case reg failed
792         return errCode;
793     }
794 
795     RefObject::IncObjRef(this); // Reference to be hold by adapter
796     errCode = adapterHandle_->RegTargetChangeCallback(
797         [this](const std::string &target, bool isConnect) { OnTargetChange(target, isConnect); },
798         [this]() { RefObject::DecObjRef(this); });
799     if (errCode != E_OK) {
800         RefObject::DecObjRef(this); // Rollback in case reg failed
801         return errCode;
802     }
803 
804     RefObject::IncObjRef(this); // Reference to be hold by adapter
805     errCode = adapterHandle_->RegSendableCallback([this](const std::string &target, int deviceCommErrCode) {
806             LOGI("[CommAggr] Send able dev=%.3s, deviceCommErrCode=%d", target.c_str(), deviceCommErrCode);
807             if (deviceCommErrCode == E_OK) {
808                 (void)IncreaseSendSequenceId(target);
809                 OnSendable(target);
810             }
811             scheduler_.SetDeviceCommErrCode(target, deviceCommErrCode);
812         },
813         [this]() { RefObject::DecObjRef(this); });
814     if (errCode != E_OK) {
815         RefObject::DecObjRef(this); // Rollback in case reg failed
816         return errCode;
817     }
818 
819     return E_OK;
820 }
821 
UnRegCallbackFromAdapter()822 void CommunicatorAggregator::UnRegCallbackFromAdapter()
823 {
824     adapterHandle_->RegBytesReceiveCallback(nullptr, nullptr);
825     adapterHandle_->RegTargetChangeCallback(nullptr, nullptr);
826     adapterHandle_->RegSendableCallback(nullptr, nullptr);
827     if (dbStatusAdapter_ != nullptr) {
828         dbStatusAdapter_->SetDBStatusChangeCallback(nullptr, nullptr, nullptr);
829     }
830 }
831 
GenerateLocalSourceId()832 void CommunicatorAggregator::GenerateLocalSourceId()
833 {
834     std::string identity;
835     adapterHandle_->GetLocalIdentity(identity);
836     // When GetLocalIdentity fail, the identity be an empty string, the localSourceId be zero, need regenerate
837     // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk
838     uint64_t identityHash = Hash::HashFunc(identity);
839     if (identityHash != localSourceId_) {
840         LOGI("[CommAggr][GenSrcId] identity=%s{private}, localSourceId=%" PRIu64, identity.c_str(), ULL(identityHash));
841     }
842     localSourceId_ = identityHash;
843 }
844 
ReGenerateLocalSourceIdIfNeed()845 bool CommunicatorAggregator::ReGenerateLocalSourceIdIfNeed()
846 {
847     // The deviceId will change when switch user from A to B
848     // We can't listen to the user change, because it's hard to ensure the timing is correct.
849     // So we regenerate to make sure the deviceId and localSourceId is correct when we create send task.
850     // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk, no need lockguard here.
851     GenerateLocalSourceId();
852     return (localSourceId_ != 0);
853 }
854 
TriggerVersionNegotiation(const std::string & dstTarget)855 void CommunicatorAggregator::TriggerVersionNegotiation(const std::string &dstTarget)
856 {
857     LOGI("[CommAggr][TrigVer] Do version negotiate with target=%s{private}.", dstTarget.c_str());
858     int errCode = E_OK;
859     SerialBuffer *buffer = ProtocolProto::BuildEmptyFrameForVersionNegotiate(errCode);
860     if (errCode != E_OK) {
861         LOGE("[CommAggr][TrigVer] Build empty frame fail, errCode=%d", errCode);
862         return;
863     }
864 
865     TaskConfig config{true, 0, Priority::HIGH};
866     errCode = ScheduleSendTask(dstTarget, buffer, FrameType::EMPTY, config);
867     if (errCode != E_OK) {
868         LOGE("[CommAggr][TrigVer] Send empty frame fail, errCode=%d", errCode);
869         // if send fails, free buffer, otherwise buffer will be taked over by SendTaskScheduler
870         delete buffer;
871         buffer = nullptr;
872     }
873 }
874 
TryToFeedbackWhenCommunicatorNotFound(const std::string & dstTarget,const LabelType & dstLabel,const SerialBuffer * inOriFrame)875 void CommunicatorAggregator::TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget,
876     const LabelType &dstLabel, const SerialBuffer *inOriFrame)
877 {
878     if (!isCommunicatorNotFoundFeedbackEnable_ || dstTarget.empty() || inOriFrame == nullptr) {
879         return;
880     }
881     int errCode = E_OK;
882     Message *message = ProtocolProto::ToMessage(inOriFrame, errCode, true);
883     if (message == nullptr) {
884         if (errCode == -E_VERSION_NOT_SUPPORT) {
885             TriggerVersionNegotiation(dstTarget);
886         }
887         return;
888     }
889     // Message is release in TriggerCommunicatorNotFoundFeedback
890     TriggerCommunicatorNotFoundFeedback(dstTarget, dstLabel, message);
891 }
892 
TriggerCommunicatorNotFoundFeedback(const std::string & dstTarget,const LabelType & dstLabel,Message * & oriMsg)893 void CommunicatorAggregator::TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget,
894     const LabelType &dstLabel, Message* &oriMsg)
895 {
896     if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) {
897         LOGI("[CommAggr][TrigNotFound] Do nothing for message with type not request.");
898         // Do not have to do feedback if the message is not a request type message
899         delete oriMsg;
900         oriMsg = nullptr;
901         return;
902     }
903 
904     LOGI("[CommAggr][TrigNotFound] Do communicator not found feedback with target=%s{private}.", dstTarget.c_str());
905     oriMsg->SetMessageType(TYPE_RESPONSE);
906     oriMsg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
907 
908     int errCode = E_OK;
909     SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, dstLabel, errCode);
910     delete oriMsg;
911     oriMsg = nullptr;
912     if (errCode != E_OK) {
913         LOGE("[CommAggr][TrigNotFound] Build communicator not found feedback frame fail, errCode=%d", errCode);
914         return;
915     }
916 
917     TaskConfig config{true, 0, Priority::HIGH};
918     errCode = ScheduleSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, config);
919     if (errCode != E_OK) {
920         LOGE("[CommAggr][TrigNotFound] Send communicator not found feedback frame fail, errCode=%d", errCode);
921         // if send fails, free buffer, otherwise buffer will be taked over by ScheduleSendTask
922         delete buffer;
923         buffer = nullptr;
924     }
925 }
926 
SetRemoteCommunicatorVersion(const std::string & target,uint16_t version)927 void CommunicatorAggregator::SetRemoteCommunicatorVersion(const std::string &target, uint16_t version)
928 {
929     std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
930     versionMap_[target] = version;
931 }
932 
GetExtendHeaderHandle(const ExtendInfo & paramInfo)933 std::shared_ptr<ExtendHeaderHandle> CommunicatorAggregator::GetExtendHeaderHandle(const ExtendInfo &paramInfo)
934 {
935     if (adapterHandle_ == nullptr) {
936         return nullptr;
937     }
938     return adapterHandle_->GetExtendHeaderHandle(paramInfo);
939 }
940 
OnRemoteDBStatusChange(const std::string & devInfo,const std::vector<DBInfo> & dbInfos)941 void CommunicatorAggregator::OnRemoteDBStatusChange(const std::string &devInfo, const std::vector<DBInfo> &dbInfos)
942 {
943     std::map<LabelType, bool> changedLabels;
944     for (const auto &dbInfo: dbInfos) {
945         std::string label = DBCommon::GenerateHashLabel(dbInfo);
946         LabelType labelType(label.begin(), label.end());
947         changedLabels[labelType] = dbInfo.isNeedSync;
948     }
949     if (commLinker_ != nullptr) {
950         commLinker_->UpdateOnlineLabels(devInfo, changedLabels);
951     }
952     NotifyConnectChange(devInfo, changedLabels);
953 }
954 
NotifyConnectChange(const std::string & srcTarget,const std::map<LabelType,bool> & changedLabels)955 void CommunicatorAggregator::NotifyConnectChange(const std::string &srcTarget,
956     const std::map<LabelType, bool> &changedLabels)
957 {
958     if (commLinker_ != nullptr && !commLinker_->IsRemoteTargetOnline(srcTarget)) {
959         LOGW("[CommAggr][NotifyConnectChange] from offline target=%s{private}.", srcTarget.c_str());
960         for (const auto &entry : changedLabels) {
961             LOGW("[CommAggr] REMEMBER: label=%s, inOnline=%d.", VEC_TO_STR(entry.first), entry.second);
962         }
963         return;
964     }
965     // Do target change notify
966     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
967     for (auto &entry : changedLabels) {
968         for (auto &userCommMap : commMap_) {
969             // Ignore nonactivated communicator
970             if (userCommMap.second.count(entry.first) != 0 && userCommMap.second.at(entry.first).second) {
971                 LOGI("[CommAggr][NotifyConnectChange] label=%s, srcTarget=%s{private}, isOnline=%d.",
972                      VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second);
973                 userCommMap.second.at(entry.first).first->OnConnectChange(srcTarget, entry.second);
974             }
975         }
976     }
977 }
978 
RegDBChangeCallback()979 void CommunicatorAggregator::RegDBChangeCallback()
980 {
981     if (dbStatusAdapter_ != nullptr) {
982         dbStatusAdapter_->SetDBStatusChangeCallback(
983             [this](const std::string &devInfo, const std::vector<DBInfo> &dbInfos) {
984                 OnRemoteDBStatusChange(devInfo, dbInfos);
985             },
986             [this]() {
987                 if (commLinker_ != nullptr) {
988                     (void)commLinker_->TriggerLabelExchangeEvent(false);
989                 }
990             },
991             [this](const std::string &dev) {
992                 if (commLinker_ != nullptr) {
993                     std::set<LabelType> relatedLabels;
994                     (void)commLinker_->TargetOnline(dev, relatedLabels);
995                 }
996             });
997     }
998 }
InitSendThread()999 void CommunicatorAggregator::InitSendThread()
1000 {
1001     if (RuntimeContext::GetInstance()->GetThreadPool() != nullptr) {
1002         return;
1003     }
1004     exclusiveThread_ = std::thread([this] { SendDataRoutine(); });
1005     useExclusiveThread_ = true;
1006 }
1007 
SendOnceData()1008 void CommunicatorAggregator::SendOnceData()
1009 {
1010     SendTask taskToSend;
1011     uint32_t totalLength = 0;
1012     int errCode = scheduler_.ScheduleOutSendTask(taskToSend, totalLength);
1013     if (errCode != E_OK) {
1014         return; // Not possible to happen
1015     }
1016     // <vector, extendHeadSize>
1017     std::vector<std::pair<std::vector<uint8_t>, uint32_t>> piecePackets;
1018     uint32_t mtu = adapterHandle_->GetMtuSize(taskToSend.dstTarget);
1019     if (taskToSend.buffer == nullptr) {
1020         LOGE("[CommAggr] buffer of taskToSend is nullptr.");
1021         return;
1022     }
1023     errCode = ProtocolProto::SplitFrameIntoPacketsIfNeed(taskToSend.buffer, mtu, piecePackets);
1024     if (errCode != E_OK) {
1025         LOGE("[CommAggr] Split frame fail, errCode=%d.", errCode);
1026         TaskFinalizer(taskToSend, errCode);
1027         return;
1028     }
1029     // <addr, <extendHeadSize, totalLen>>
1030     std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> eachPacket;
1031     if (piecePackets.empty()) {
1032         // Case that no need to split a frame, just use original buffer as a packet
1033         std::pair<const uint8_t *, uint32_t> tmpEntry = taskToSend.buffer->GetReadOnlyBytesForEntireBuffer();
1034         std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> entry;
1035         entry.first = tmpEntry.first - taskToSend.buffer->GetExtendHeadLength();
1036         entry.second.first = taskToSend.buffer->GetExtendHeadLength();
1037         entry.second.second = tmpEntry.second + entry.second.first;
1038         eachPacket.push_back(entry);
1039     } else {
1040         for (auto &entry : piecePackets) {
1041             std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> tmpEntry = {&(entry.first[0]),
1042                 {entry.second, entry.first.size()}};
1043             eachPacket.push_back(tmpEntry);
1044         }
1045     }
1046 
1047     SendPacketsAndDisposeTask(taskToSend, mtu, eachPacket, totalLength);
1048 }
1049 
TriggerSendData()1050 void CommunicatorAggregator::TriggerSendData()
1051 {
1052     if (useExclusiveThread_) {
1053         std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
1054         wakingSignal_ = true;
1055         wakingCv_.notify_one();
1056         return;
1057     }
1058     {
1059         std::lock_guard<std::mutex> autoLock(scheduleSendTaskMutex_);
1060         if (sendTaskStart_) {
1061             return;
1062         }
1063         sendTaskStart_ = true;
1064     }
1065     RefObject::IncObjRef(this);
1066     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
1067         LOGI("[CommAggr] Send thread start.");
1068         while (!shutdown_ && scheduler_.GetNoDelayTaskCount() != 0) {
1069             SendOnceData();
1070         }
1071         {
1072             std::lock_guard<std::mutex> autoLock(scheduleSendTaskMutex_);
1073             sendTaskStart_ = false;
1074         }
1075         if (!shutdown_ && scheduler_.GetNoDelayTaskCount() != 0) {
1076             TriggerSendData(); // avoid sendTaskStart_ was mark false after trigger thread check it
1077         }
1078         finalizeCv_.notify_one();
1079         RefObject::DecObjRef(this);
1080         LOGI("[CommAggr] Send thread end.");
1081     });
1082     if (errCode != E_OK) {
1083         LOGW("[CommAggr] Trigger send data failed %d", errCode);
1084         RefObject::DecObjRef(this);
1085     }
1086 }
1087 
ResetFrameRecordIfNeed(const uint32_t frameId,const uint32_t mtu)1088 void CommunicatorAggregator::ResetFrameRecordIfNeed(const uint32_t frameId, const uint32_t mtu)
1089 {
1090     std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
1091     if (sendRecord_[frameId].splitMtu == 0u || sendRecord_[frameId].splitMtu != mtu) {
1092         sendRecord_[frameId].splitMtu = mtu;
1093         sendRecord_[frameId].sendIndex = 0u;
1094     }
1095 }
1096 
RetrySendTaskIfNeed(const std::string & target,uint64_t sendSequenceId)1097 void CommunicatorAggregator::RetrySendTaskIfNeed(const std::string &target, uint64_t sendSequenceId)
1098 {
1099     if (IsRetryOutOfLimit(target)) {
1100         LOGD("[CommAggr] Retry send task is out of limit! target is %s{private}", target.c_str());
1101         scheduler_.InvalidSendTask(target);
1102         std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1103         retryCount_[target] = 0;
1104     } else {
1105         if (sendSequenceId != GetSendSequenceId(target)) {
1106             LOGD("[CommAggr] %.3s Send sequence id has changed", target.c_str());
1107             return;
1108         }
1109         scheduler_.DelayTaskByTarget(target);
1110         RetrySendTask(target, sendSequenceId);
1111     }
1112 }
1113 
RetrySendTask(const std::string & target,uint64_t sendSequenceId)1114 void CommunicatorAggregator::RetrySendTask(const std::string &target, uint64_t sendSequenceId)
1115 {
1116     int32_t currentRetryCount = 0;
1117     {
1118         std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1119         retryCount_[target]++;
1120         currentRetryCount = retryCount_[target];
1121         LOGD("[CommAggr] Target %s{private} retry count is %" PRId32, target.c_str(), currentRetryCount);
1122     }
1123     TimerId timerId = 0u;
1124     RefObject::IncObjRef(this);
1125     (void)RuntimeContext::GetInstance()->SetTimer(GetNextRetryInterval(target, currentRetryCount),
1126         [this, target, sendSequenceId](TimerId id) {
1127         if (sendSequenceId == GetSendSequenceId(target)) {
1128             OnSendable(target);
1129         } else {
1130             LOGD("[CommAggr] %.3s Send sequence id has changed in timer", target.c_str());
1131         }
1132         RefObject::DecObjRef(this);
1133         return -E_END_TIMER;
1134     }, nullptr, timerId);
1135 }
1136 
IsRetryOutOfLimit(const std::string & target)1137 bool CommunicatorAggregator::IsRetryOutOfLimit(const std::string &target)
1138 {
1139     std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1140     return retryCount_[target] >= MAX_SEND_RETRY;
1141 }
1142 
GetNextRetryInterval(const std::string & target,int32_t currentRetryCount)1143 int32_t CommunicatorAggregator::GetNextRetryInterval(const std::string &target, int32_t currentRetryCount)
1144 {
1145     uint32_t timeout = DBConstant::MIN_TIMEOUT;
1146     if (adapterHandle_ != nullptr) {
1147         timeout = adapterHandle_->GetTimeout(target);
1148     }
1149     return static_cast<int32_t>(timeout) * currentRetryCount / RETRY_TIME_SPLIT;
1150 }
1151 
GetSendSequenceId(const std::string & target)1152 uint64_t CommunicatorAggregator::GetSendSequenceId(const std::string &target)
1153 {
1154     std::lock_guard<std::mutex> autoLock(sendSequenceMutex_);
1155     return sendSequence_[target];
1156 }
1157 
IncreaseSendSequenceId(const std::string & target)1158 uint64_t CommunicatorAggregator::IncreaseSendSequenceId(const std::string &target)
1159 {
1160     std::lock_guard<std::mutex> autoLock(sendSequenceMutex_);
1161     return ++sendSequence_[target];
1162 }
1163 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorAggregator)
1164 } // namespace DistributedDB
1165