• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "communicator_aggregator.h"
17 
18 #include <sstream>
19 #include "communicator.h"
20 #include "communicator_linker.h"
21 #include "db_common.h"
22 #include "endian_convert.h"
23 #include "hash.h"
24 #include "log_print.h"
25 #include "protocol_proto.h"
26 
27 namespace DistributedDB {
28 namespace {
29 constexpr int MAX_SEND_RETRY = 2;
30 constexpr int RETRY_TIME_SPLIT = 4;
GetThreadId()31 inline std::string GetThreadId()
32 {
33     std::stringstream stream;
34     stream << std::this_thread::get_id();
35     return stream.str();
36 }
37 }
38 
39 std::atomic<bool> CommunicatorAggregator::isCommunicatorNotFoundFeedbackEnable_{true};
40 
CommunicatorAggregator()41 CommunicatorAggregator::CommunicatorAggregator()
42     : shutdown_(false),
43       incFrameId_(0),
44       localSourceId_(0)
45 {
46 }
47 
~CommunicatorAggregator()48 CommunicatorAggregator::~CommunicatorAggregator()
49 {
50     scheduler_.Finalize(); // Clear residual frame dumped by linker after CommunicatorAggregator finalize
51     adapterHandle_ = nullptr;
52     commLinker_ = nullptr;
53 }
54 
Initialize(IAdapter * inAdapter,const std::shared_ptr<DBStatusAdapter> & statusAdapter)55 int CommunicatorAggregator::Initialize(IAdapter *inAdapter, const std::shared_ptr<DBStatusAdapter> &statusAdapter)
56 {
57     if (inAdapter == nullptr) {
58         return -E_INVALID_ARGS;
59     }
60     adapterHandle_ = inAdapter;
61 
62     combiner_.Initialize();
63     retainer_.Initialize();
64     scheduler_.Initialize();
65 
66     int errCode;
67     commLinker_ = new (std::nothrow) CommunicatorLinker(this, statusAdapter);
68     if (commLinker_ == nullptr) {
69         errCode = -E_OUT_OF_MEMORY;
70         goto ROLL_BACK;
71     }
72     commLinker_->Initialize();
73 
74     errCode = RegCallbackToAdapter();
75     if (errCode != E_OK) {
76         goto ROLL_BACK;
77     }
78 
79     errCode = adapterHandle_->StartAdapter();
80     if (errCode != E_OK) {
81         LOGE("[CommAggr][Init] Start Adapter Fail, errCode=%d.", errCode);
82         goto ROLL_BACK;
83     }
84     GenerateLocalSourceId();
85 
86     shutdown_ = false;
87     InitSendThread();
88     dbStatusAdapter_ = statusAdapter;
89     RegDBChangeCallback();
90     return E_OK;
91 ROLL_BACK:
92     UnRegCallbackFromAdapter();
93     if (commLinker_ != nullptr) {
94         RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
95         commLinker_ = nullptr;
96     }
97     // Scheduler do not need to do finalize in this roll_back
98     retainer_.Finalize();
99     combiner_.Finalize();
100     return errCode;
101 }
102 
Finalize()103 void CommunicatorAggregator::Finalize()
104 {
105     shutdown_ = true;
106     retryCv_.notify_all();
107     {
108         std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
109         wakingSignal_ = true;
110         wakingCv_.notify_one();
111     }
112     if (useExclusiveThread_) {
113         exclusiveThread_.join(); // Waiting thread to thoroughly quit
114         LOGI("[CommAggr][Final] Sub Thread Exit.");
115     } else {
116         LOGI("[CommAggr][Final] Begin wait send task exit.");
117         std::unique_lock<std::mutex> scheduleSendTaskLock(scheduleSendTaskMutex_);
118         finalizeCv_.wait(scheduleSendTaskLock, [this]() {
119             return !sendTaskStart_;
120         });
121         LOGI("[CommAggr][Final] End wait send task exit.");
122     }
123     scheduler_.Finalize(); // scheduler_ must finalize here to make space for linker to dump residual frame
124 
125     adapterHandle_->StopAdapter();
126     UnRegCallbackFromAdapter();
127     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure all callback thread quit
128 
129     // No callback now and later, so combiner, retainer and linker can finalize or delete safely
130     RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
131     commLinker_ = nullptr;
132     retainer_.Finalize();
133     combiner_.Finalize();
134     dbStatusAdapter_ = nullptr;
135 }
136 
AllocCommunicator(uint64_t commLabel,int & outErrorNo,const std::string & userId)137 ICommunicator *CommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo, const std::string &userId)
138 {
139     uint64_t netOrderLabel = HostToNet(commLabel);
140     uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
141     std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
142     for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
143         realLabel[i] = eachByte[i];
144     }
145     return AllocCommunicator(realLabel, outErrorNo, userId);
146 }
147 
AllocCommunicator(const std::vector<uint8_t> & commLabel,int & outErrorNo,const std::string & userId)148 ICommunicator *CommunicatorAggregator::AllocCommunicator(const std::vector<uint8_t> &commLabel, int &outErrorNo,
149     const std::string &userId)
150 {
151     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
152     LOGI("[CommAggr][Alloc] Label=%.3s.", VEC_TO_STR(commLabel));
153     if (commLabel.size() != COMM_LABEL_LENGTH) {
154         outErrorNo = -E_INVALID_ARGS;
155         return nullptr;
156     }
157 
158     if (commMap_.count(userId) != 0 && commMap_[userId].count(commLabel) != 0) {
159         outErrorNo = -E_ALREADY_ALLOC;
160         return nullptr;
161     }
162 
163     Communicator *commPtr = new(std::nothrow) Communicator(this, commLabel);
164     if (commPtr == nullptr) {
165         LOGE("[CommAggr][Alloc] Communicator create failed, may be no available memory.");
166         outErrorNo = -E_OUT_OF_MEMORY;
167         return nullptr;
168     }
169     commMap_[userId][commLabel] = {commPtr, false}; // Communicator is not activated when allocated
170     return commPtr;
171 }
172 
ReleaseCommunicator(ICommunicator * inCommunicator,const std::string & userId)173 void CommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator, const std::string &userId)
174 {
175     if (inCommunicator == nullptr) {
176         return;
177     }
178     Communicator *commPtr = static_cast<Communicator *>(inCommunicator);
179     LabelType commLabel = commPtr->GetCommunicatorLabel();
180     LOGI("[CommAggr][Release] Label=%.3s.", VEC_TO_STR(commLabel));
181 
182     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
183     if (commMap_.count(userId) == 0 || commMap_[userId].count(commLabel) == 0) {
184         LOGE("[CommAggr][Release] Not Found.");
185         return;
186     }
187     commMap_[userId].erase(commLabel);
188     if (commMap_[userId].empty()) {
189         commMap_.erase(userId);
190     }
191     RefObject::DecObjRef(commPtr); // Refcount of Communicator is 1 when created, here to unref Communicator
192 
193     int errCode = commLinker_->DecreaseLocalLabel(commLabel);
194     if (errCode != E_OK) {
195         LOGE("[CommAggr][Release] DecreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
196     }
197 }
198 
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)199 int CommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
200     const Finalizer &inOper)
201 {
202     std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
203     return RegCallBack(onCommLack, onCommLackHandle_, inOper, onCommLackFinalizer_);
204 }
205 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)206 int CommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
207 {
208     std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
209     int errCode = RegCallBack(onConnect, onConnectHandle_, inOper, onConnectFinalizer_);
210     if (onConnect && errCode == E_OK) {
211         // Register action and success
212         std::set<std::string> onlineTargets = commLinker_->GetOnlineRemoteTarget();
213         for (auto &entry : onlineTargets) {
214             LOGI("[CommAggr][RegConnect] Online target=%s{private}.", entry.c_str());
215             onConnectHandle_(entry, true);
216         }
217     }
218     return errCode;
219 }
220 
GetCommunicatorAggregatorMtuSize() const221 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize() const
222 {
223     return adapterHandle_->GetMtuSize() - ProtocolProto::GetLengthBeforeSerializedData();
224 }
225 
GetCommunicatorAggregatorMtuSize(const std::string & target) const226 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize(const std::string &target) const
227 {
228     return adapterHandle_->GetMtuSize(target) - ProtocolProto::GetLengthBeforeSerializedData();
229 }
230 
GetCommunicatorAggregatorTimeout() const231 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout() const
232 {
233     return adapterHandle_->GetTimeout();
234 }
235 
GetCommunicatorAggregatorTimeout(const std::string & target) const236 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout(const std::string &target) const
237 {
238     return adapterHandle_->GetTimeout(target);
239 }
240 
IsDeviceOnline(const std::string & device) const241 bool CommunicatorAggregator::IsDeviceOnline(const std::string &device) const
242 {
243     return adapterHandle_->IsDeviceOnline(device);
244 }
245 
GetLocalIdentity(std::string & outTarget) const246 int CommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
247 {
248     return adapterHandle_->GetLocalIdentity(outTarget);
249 }
250 
ActivateCommunicator(const LabelType & commLabel,const std::string & userId)251 void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel, const std::string &userId)
252 {
253     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
254     LOGI("[CommAggr][Activate] Label=%.3s.", VEC_TO_STR(commLabel));
255     if (commMap_[userId].count(commLabel) == 0) {
256         LOGW("[CommAggr][Activate] Communicator of this label not allocated.");
257         return;
258     }
259     if (commMap_[userId].at(commLabel).second) {
260         return;
261     }
262     commMap_[userId].at(commLabel).second = true; // Mark this communicator as activated
263 
264     // IncreaseLocalLabel below and DecreaseLocalLabel in ReleaseCommunicator should all be protected by commMapMutex_
265     // To avoid disordering probably caused by concurrent call to ActivateCommunicator and ReleaseCommunicator
266     std::set<std::string> onlineTargets;
267     int errCode = commLinker_->IncreaseLocalLabel(commLabel, onlineTargets);
268     if (errCode != E_OK) {
269         LOGE("[CommAggr][Activate] IncreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
270         // Do not return here
271     }
272     for (auto &entry : onlineTargets) {
273         LOGI("[CommAggr][Activate] Already Online Target=%s{private}.", entry.c_str());
274         commMap_[userId].at(commLabel).first->OnConnectChange(entry, true);
275     }
276     // Do Redeliver, the communicator is responsible to deal with the frame
277     std::list<FrameInfo> framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel);
278     for (auto &entry : framesToRedeliver) {
279         commMap_[userId].at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer, entry.sendUser,
280             entry.remoteDbVersion);
281     }
282 }
283 
284 namespace {
DoOnSendEndByTaskIfNeed(const OnSendEnd & onEnd,int result)285 void DoOnSendEndByTaskIfNeed(const OnSendEnd &onEnd, int result)
286 {
287     if (onEnd) {
288         TaskAction onSendEndTask = [onEnd, result]() {
289             LOGD("[CommAggr][SendEndTask] Before On Send End.");
290             onEnd(result, true);
291             LOGD("[CommAggr][SendEndTask] After On Send End.");
292         };
293         int errCode = RuntimeContext::GetInstance()->ScheduleTask(onSendEndTask);
294         if (errCode != E_OK) {
295             LOGE("[CommAggr][SendEndTask] ScheduleTask failed, errCode = %d.", errCode);
296         }
297     }
298 }
299 }
300 
ScheduleSendTask(const std::string & dstTarget,SerialBuffer * inBuff,FrameType inType,const TaskConfig & inConfig,const OnSendEnd & onEnd)301 int CommunicatorAggregator::ScheduleSendTask(const std::string &dstTarget, SerialBuffer *inBuff,
302     FrameType inType, const TaskConfig &inConfig, const OnSendEnd &onEnd)
303 {
304     if (inBuff == nullptr) {
305         return -E_INVALID_ARGS;
306     }
307 
308     if (!ReGenerateLocalSourceIdIfNeed()) {
309         delete inBuff;
310         inBuff = nullptr;
311         DoOnSendEndByTaskIfNeed(onEnd, -E_PERIPHERAL_INTERFACE_FAIL);
312         LOGE("[CommAggr][Create] Exit ok but discard since localSourceId zero, thread=%s.", GetThreadId().c_str());
313         return E_OK; // Returns E_OK here to indicate this buffer was accepted though discard immediately
314     }
315     bool sendLabelExchange = true;
316     if (dbStatusAdapter_ != nullptr) {
317         sendLabelExchange = dbStatusAdapter_->IsSendLabelExchange();
318     }
319     PhyHeaderInfo info{localSourceId_, incFrameId_.fetch_add(1, std::memory_order_seq_cst), inType,
320         sendLabelExchange};
321     int errCode = ProtocolProto::SetPhyHeader(inBuff, info);
322     if (errCode != E_OK) {
323         LOGE("[CommAggr][Create] Set phyHeader fail, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
324         return errCode;
325     }
326     {
327         std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
328         sendRecord_[info.frameId] = {};
329     }
330     SendTask task{inBuff, dstTarget, onEnd, info.frameId, true, inConfig.isRetryTask, inConfig.infos};
331     if (inConfig.nonBlock) {
332         errCode = scheduler_.AddSendTaskIntoSchedule(task, inConfig.prio);
333     } else {
334         errCode = RetryUntilTimeout(task, inConfig.timeout, inConfig.prio);
335     }
336     if (errCode != E_OK) {
337         LOGW("[CommAggr][Create] Exit failed, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
338         return errCode;
339     }
340     TriggerSendData();
341     LOGI("[CommAggr][Create] Exit ok, dev=%.3s, frameId=%u, isRetry=%d", dstTarget.c_str(), info.frameId,
342         task.isRetryTask);
343     return E_OK;
344 }
345 
EnableCommunicatorNotFoundFeedback(bool isEnable)346 void CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(bool isEnable)
347 {
348     isCommunicatorNotFoundFeedbackEnable_ = isEnable;
349 }
350 
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const351 int CommunicatorAggregator::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
352 {
353     std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
354     auto pair = versionMap_.find(target);
355     if (pair == versionMap_.end()) {
356         return -E_NOT_FOUND;
357     }
358     outVersion = pair->second;
359     return E_OK;
360 }
361 
SendDataRoutine()362 void CommunicatorAggregator::SendDataRoutine()
363 {
364     while (!shutdown_) {
365         if (scheduler_.GetNoDelayTaskCount() == 0) {
366             std::unique_lock<std::mutex> wakingUniqueLock(wakingMutex_);
367             LOGI("[CommAggr][Routine] Send done and sleep.");
368             wakingCv_.wait(wakingUniqueLock, [this] { return this->wakingSignal_; });
369             LOGI("[CommAggr][Routine] Send continue.");
370             wakingSignal_ = false;
371             continue;
372         }
373         SendOnceData();
374     }
375 }
376 
SendPacketsAndDisposeTask(const SendTask & inTask,uint32_t mtu,const std::vector<std::pair<const uint8_t *,std::pair<uint32_t,uint32_t>>> & eachPacket,uint32_t totalLength)377 void CommunicatorAggregator::SendPacketsAndDisposeTask(const SendTask &inTask, uint32_t mtu,
378     const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket, uint32_t totalLength)
379 {
380     bool taskNeedFinalize = true;
381     int errCode = E_OK;
382     ResetFrameRecordIfNeed(inTask.frameId, mtu);
383     uint32_t startIndex;
384     {
385         std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
386         startIndex = sendRecord_[inTask.frameId].sendIndex;
387     }
388     uint64_t currentSendSequenceId = IncreaseSendSequenceId(inTask.dstTarget);
389     DeviceInfos deviceInfos = {inTask.dstTarget, inTask.infos, inTask.isRetryTask};
390     for (uint32_t index = startIndex; index < static_cast<uint32_t>(eachPacket.size()) && inTask.isValid; ++index) {
391         auto &entry = eachPacket[index];
392         LOGI("[CommAggr][SendPackets] DoSendBytes, dstTarget=%s{private}, extendHeadLength=%" PRIu32
393             ", packetLength=%" PRIu32 ".", inTask.dstTarget.c_str(), entry.second.first, entry.second.second);
394         ProtocolProto::DisplayPacketInformation(entry.first + entry.second.first, entry.second.second);
395         errCode = adapterHandle_->SendBytes(deviceInfos, entry.first, entry.second.second, totalLength);
396         {
397             std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
398             sendRecord_[inTask.frameId].sendIndex = index;
399         }
400         if (errCode == -E_WAIT_RETRY) {
401             LOGE("[CommAggr][SendPackets] SendBytes temporally fail.");
402             taskNeedFinalize = false;
403             break;
404         } else if (errCode != E_OK) {
405             LOGE("[CommAggr][SendPackets] SendBytes totally fail, errCode=%d.", errCode);
406             break;
407         } else {
408             std::lock_guard<std::mutex> autoLock(retryCountMutex_);
409             retryCount_[inTask.dstTarget] = 0;
410         }
411     }
412     if (errCode == -E_WAIT_RETRY) {
413         RetrySendTaskIfNeed(inTask.dstTarget, currentSendSequenceId);
414     }
415     if (taskNeedFinalize) {
416         TaskFinalizer(inTask, errCode);
417         std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
418         sendRecord_.erase(inTask.frameId);
419     }
420 }
421 
RetryUntilTimeout(SendTask & inTask,uint32_t timeout,Priority inPrio)422 int CommunicatorAggregator::RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio)
423 {
424     int errCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
425     if (errCode != E_OK) {
426         bool notTimeout = true;
427         auto retryFunc = [this, inPrio, &inTask]()->bool {
428             if (this->shutdown_) {
429                 delete inTask.buffer;
430                 inTask.buffer = nullptr;
431                 return true;
432             }
433             int retCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
434             if (retCode != E_OK) {
435                 return false;
436             }
437             return true;
438         };
439 
440         if (timeout == 0) { // Unlimited retry
441             std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
442             retryCv_.wait(retryUniqueLock, retryFunc);
443         } else {
444             std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
445             notTimeout = retryCv_.wait_for(retryUniqueLock, std::chrono::milliseconds(timeout), retryFunc);
446         }
447 
448         if (shutdown_) {
449             return E_OK;
450         }
451         if (!notTimeout) {
452             return -E_TIMEOUT;
453         }
454     }
455     return E_OK;
456 }
457 
TaskFinalizer(const SendTask & inTask,int result)458 void CommunicatorAggregator::TaskFinalizer(const SendTask &inTask, int result)
459 {
460     // Call the OnSendEnd if need
461     if (inTask.onEnd) {
462         LOGD("[CommAggr][TaskFinal] On Send End.");
463         inTask.onEnd(result, true);
464     }
465     // Finalize the task that just scheduled
466     int errCode = scheduler_.FinalizeLastScheduleTask();
467     // Notify Sendable To All Communicator If Need
468     if (errCode == -E_CONTAINER_FULL_TO_NOTFULL) {
469         retryCv_.notify_all();
470     }
471     if (errCode == -E_CONTAINER_NOTEMPTY_TO_EMPTY) {
472         NotifySendableToAllCommunicator();
473     }
474 }
475 
NotifySendableToAllCommunicator()476 void CommunicatorAggregator::NotifySendableToAllCommunicator()
477 {
478     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
479     for (auto &userCommMap : commMap_) {
480         for (auto &entry : userCommMap.second) {
481             // Ignore nonactivated communicator
482             if (entry.second.second) {
483                 entry.second.first->OnSendAvailable();
484             }
485         }
486     }
487 }
488 
OnBytesReceive(const ReceiveBytesInfo & receiveBytesInfo,const DataUserInfoProc & userInfoProc)489 void CommunicatorAggregator::OnBytesReceive(const ReceiveBytesInfo &receiveBytesInfo,
490     const DataUserInfoProc &userInfoProc)
491 {
492     ProtocolProto::DisplayPacketInformation(receiveBytesInfo.bytes, receiveBytesInfo.length);
493     ParseResult packetResult;
494     int errCode = ProtocolProto::CheckAndParsePacket(receiveBytesInfo.srcTarget, receiveBytesInfo.bytes,
495         receiveBytesInfo.length, packetResult);
496     if (errCode != E_OK) {
497         LOGE("[CommAggr][Receive] Parse packet fail, errCode=%d.", errCode);
498         if (errCode == -E_VERSION_NOT_SUPPORT) {
499             TriggerVersionNegotiation(receiveBytesInfo.srcTarget);
500         }
501         return;
502     }
503 
504     // Update version of remote target
505     SetRemoteCommunicatorVersion(receiveBytesInfo.srcTarget, packetResult.GetDbVersion());
506     if (dbStatusAdapter_ != nullptr) {
507         dbStatusAdapter_->SetRemoteOptimizeCommunication(receiveBytesInfo.srcTarget,
508             !packetResult.IsSendLabelExchange());
509     }
510     if (packetResult.GetFrameTypeInfo() == FrameType::EMPTY) { // Empty frame will never be fragmented
511         LOGI("[CommAggr][Receive] Empty frame, just ignore in this version of distributeddb.");
512         return;
513     }
514 
515     if (packetResult.IsFragment()) {
516         OnFragmentReceive(receiveBytesInfo, packetResult, userInfoProc);
517     } else if (packetResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
518         errCode = OnCommLayerFrameReceive(receiveBytesInfo.srcTarget, packetResult);
519         if (errCode != E_OK) {
520             LOGE("[CommAggr][Receive] CommLayer receive fail, errCode=%d.", errCode);
521         }
522     } else {
523         errCode = OnAppLayerFrameReceive(receiveBytesInfo, packetResult, userInfoProc);
524         if (errCode != E_OK) {
525             LOGE("[CommAggr][Receive] AppLayer receive fail, errCode=%d.", errCode);
526         }
527     }
528 }
529 
OnTargetChange(const std::string & target,bool isConnect)530 void CommunicatorAggregator::OnTargetChange(const std::string &target, bool isConnect)
531 {
532     if (target.empty()) {
533         LOGE("[CommAggr][OnTarget] Target empty string.");
534         return;
535     }
536     // For process level target change
537     {
538         std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
539         if (onConnectHandle_) {
540             onConnectHandle_(target, isConnect);
541             LOGI("[CommAggr][OnTarget] On Connect End."); // Log in case callback block this thread
542         } else {
543             LOGI("[CommAggr][OnTarget] ConnectHandle invalid currently.");
544         }
545     }
546     std::set<LabelType> relatedLabels;
547     // For communicator level target change
548     if (isConnect) {
549         int errCode = commLinker_->TargetOnline(target, relatedLabels);
550         if (errCode != E_OK) {
551             LOGE("[CommAggr][OnTarget] TargetOnline fail, target=%s{private}, errCode=%d.", target.c_str(), errCode);
552         }
553     } else {
554         commLinker_->TargetOffline(target, relatedLabels);
555     }
556     // All related communicator online or offline this target, no matter TargetOnline or TargetOffline fail or not
557     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
558     for (auto &userCommMap : commMap_) {
559         for (auto &entry: userCommMap.second) {
560             // Ignore nonactivated communicator
561             if (entry.second.second && (!isConnect || (relatedLabels.count(entry.first) != 0))) {
562                 entry.second.first->OnConnectChange(target, isConnect);
563             }
564         }
565     }
566 }
567 
OnSendable(const std::string & target)568 void CommunicatorAggregator::OnSendable(const std::string &target)
569 {
570     int errCode = scheduler_.NoDelayTaskByTarget(target);
571     if (errCode != E_OK) {
572         LOGE("[CommAggr][Sendable] NoDelay target=%s{private} fail, errCode=%d.", target.c_str(), errCode);
573         return;
574     }
575     TriggerSendData();
576 }
577 
OnFragmentReceive(const ReceiveBytesInfo & receiveBytesInfo,const ParseResult & inResult,const DataUserInfoProc & userInfoProc)578 void CommunicatorAggregator::OnFragmentReceive(const ReceiveBytesInfo &receiveBytesInfo,
579     const ParseResult &inResult, const DataUserInfoProc &userInfoProc)
580 {
581     int errorNo = E_OK;
582     ParseResult frameResult;
583     SerialBuffer *frameBuffer = combiner_.AssembleFrameFragment(receiveBytesInfo.bytes, receiveBytesInfo.length,
584         inResult, frameResult, errorNo);
585     if (errorNo != E_OK) {
586         LOGE("[CommAggr][Receive] Combine fail, errCode=%d.", errorNo);
587         return;
588     }
589     if (frameBuffer == nullptr) {
590         LOGW("[CommAggr][Receive] Combine undone.");
591         return;
592     }
593 
594     int errCode = ProtocolProto::CheckAndParseFrame(frameBuffer, frameResult);
595     if (errCode != E_OK) {
596         LOGE("[CommAggr][Receive] Parse frame fail, errCode=%d.", errCode);
597         delete frameBuffer;
598         frameBuffer = nullptr;
599         if (errCode == -E_VERSION_NOT_SUPPORT) {
600             TriggerVersionNegotiation(receiveBytesInfo.srcTarget);
601         }
602         return;
603     }
604 
605     if (frameResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
606         errCode = OnCommLayerFrameReceive(receiveBytesInfo.srcTarget, frameResult);
607         if (errCode != E_OK) {
608             LOGE("[CommAggr][Receive] CommLayer receive fail after combination, errCode=%d.", errCode);
609         }
610         delete frameBuffer;
611         frameBuffer = nullptr;
612     } else {
613         errCode = OnAppLayerFrameReceive(receiveBytesInfo, frameBuffer, frameResult, userInfoProc);
614         if (errCode != E_OK) {
615             LOGE("[CommAggr][Receive] AppLayer receive fail after combination, errCode=%d.", errCode);
616         }
617     }
618 }
619 
OnCommLayerFrameReceive(const std::string & srcTarget,const ParseResult & inResult)620 int CommunicatorAggregator::OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult)
621 {
622     if (inResult.GetFrameTypeInfo() == FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK) {
623         int errCode = commLinker_->ReceiveLabelExchangeAck(srcTarget, inResult.GetLabelExchangeDistinctValue(),
624             inResult.GetLabelExchangeSequenceId());
625         if (errCode != E_OK) {
626             LOGE("[CommAggr][CommReceive] Receive LabelExchangeAck Fail.");
627             return errCode;
628         }
629     } else {
630         std::map<LabelType, bool> changedLabels;
631         int errCode = commLinker_->ReceiveLabelExchange(srcTarget, inResult.GetLatestCommLabels(),
632             inResult.GetLabelExchangeDistinctValue(), inResult.GetLabelExchangeSequenceId(), changedLabels);
633         if (errCode != E_OK) {
634             LOGE("[CommAggr][CommReceive] Receive LabelExchange Fail.");
635             return errCode;
636         }
637         NotifyConnectChange(srcTarget, changedLabels);
638     }
639     return E_OK;
640 }
641 
OnAppLayerFrameReceive(const ReceiveBytesInfo & receiveBytesInfo,const ParseResult & inResult,const DataUserInfoProc & userInfoProc)642 int CommunicatorAggregator::OnAppLayerFrameReceive(const ReceiveBytesInfo &receiveBytesInfo,
643     const ParseResult &inResult, const DataUserInfoProc &userInfoProc)
644 {
645     SerialBuffer *buffer = new (std::nothrow) SerialBuffer();
646     if (buffer == nullptr) {
647         LOGE("[CommAggr][AppReceive] New SerialBuffer fail.");
648         return -E_OUT_OF_MEMORY;
649     }
650     int errCode = buffer->SetExternalBuff(receiveBytesInfo.bytes, receiveBytesInfo.length - inResult.GetPaddingLen(),
651         ProtocolProto::GetAppLayerFrameHeaderLength());
652     if (errCode != E_OK) {
653         LOGE("[CommAggr][AppReceive] SetExternalBuff fail, errCode=%d.", errCode);
654         delete buffer;
655         buffer = nullptr;
656         return -E_INTERNAL_ERROR;
657     }
658     return OnAppLayerFrameReceive(receiveBytesInfo, buffer, inResult, userInfoProc);
659 }
660 
661 // In early time, we cover "OnAppLayerFrameReceive" totally by commMapMutex_, then search communicator, if not found,
662 // we call onCommLackHandle_ if exist to ask whether to retain this frame or not, if the answer is yes we retain this
663 // frame, otherwise we discard this frame and send out CommunicatorNotFound feedback.
664 // We design so(especially cover this function totally by commMapMutex_) to avoid current situation described below
665 // 1:This func find that target communicator not allocated or activated, so decide to retain this frame.
666 // 2:Thread switch out, the target communicator is allocated and activated, previous retained frame is fetched out.
667 // 3:Thread switch back, this frame is then retained into the retainer, no chance to be fetched out.
668 // In conclusion: the decision to retain a frame and the action to retain a frame should not be separated.
669 // Otherwise, at the action time, the retain decision may be obsolete and wrong.
670 // #### BUT #### since onCommLackHandle_ callback is go beyond DistributedDB and there is the risk that the final upper
671 // user may do something such as GetKvStore(we can prevent them to so) which could result in calling AllocCommunicator
672 // in the same callback thread finally causing DeadLock on commMapMutex_.
673 // #### SO #### we have to make a change described below
674 // 1:Search communicator under commMapMutex_, if found then deliver frame to that communicator and end.
675 // 2:Call onCommLackHandle_ if exist to ask whether to retain this frame or not, without commMapMutex_.
676 // Note: during this period, commMap_ maybe changed, and communicator not found before may exist now.
677 // 3:Search communicator under commMapMutex_ again, if found then deliver frame to that communicator and end.
678 // 4:If still not found, retain this frame if need or otherwise send CommunicatorNotFound feedback.
OnAppLayerFrameReceive(const ReceiveBytesInfo & receiveBytesInfo,SerialBuffer * & inFrameBuffer,const ParseResult & inResult,const DataUserInfoProc & userInfoProc)679 int CommunicatorAggregator::OnAppLayerFrameReceive(const ReceiveBytesInfo &receiveBytesInfo,
680     SerialBuffer *&inFrameBuffer, const ParseResult &inResult, const DataUserInfoProc &userInfoProc)
681 {
682     LabelType toLabel = inResult.GetCommLabel();
683     uint16_t remoteDbVersion = inResult.GetDbVersion();
684     UserInfo userInfo = { .sendUser = DBConstant::DEFAULT_USER };
685     if (receiveBytesInfo.isNeedGetUserInfo) {
686         int ret = GetDataUserId(inResult, toLabel, userInfoProc, receiveBytesInfo.srcTarget, userInfo);
687         if (ret == NEED_CORRECT_TARGET_USER) {
688             TryToFeedBackWithErr(receiveBytesInfo.srcTarget, toLabel, inFrameBuffer,
689                 E_NEED_CORRECT_TARGET_USER);
690             delete inFrameBuffer;
691             inFrameBuffer = nullptr;
692             return -E_NEED_CORRECT_TARGET_USER;
693         }
694         if (ret != E_OK || userInfo.sendUser.empty()) {
695             LOGE("[CommAggr][AppReceive] get data user id err, ret=%d, empty receiveUser=%d, empty sendUser=%d", ret,
696                 userInfo.receiveUser.empty(), userInfo.sendUser.empty());
697             delete inFrameBuffer;
698             inFrameBuffer = nullptr;
699             return ret != E_OK ? ret : -E_NO_TRUSTED_USER;
700         }
701     }
702     {
703         std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
704         int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(remoteDbVersion, receiveBytesInfo.srcTarget,
705             inFrameBuffer, toLabel, userInfo);
706         if (errCode == E_OK) { // Attention: Here is equal to E_OK
707             return E_OK;
708         } else if (errCode == -E_FEEDBACK_DB_CLOSING) {
709             TryToFeedbackWhenCommunicatorNotFound(receiveBytesInfo.srcTarget, toLabel, inFrameBuffer,
710                 E_FEEDBACK_DB_CLOSING);
711             delete inFrameBuffer;
712             inFrameBuffer = nullptr;
713             return errCode; // The caller will display errCode in log
714         }
715     }
716     LOGI("[CommAggr][AppReceive] Communicator of %.3s not found or nonactivated.", VEC_TO_STR(toLabel));
717     return ReTryDeliverAppLayerFrameOnCommunicatorNotFound(receiveBytesInfo, inFrameBuffer, inResult, userInfoProc,
718         userInfo);
719 }
720 
GetDataUserId(const ParseResult & inResult,const LabelType & toLabel,const DataUserInfoProc & userInfoProc,const std::string & device,UserInfo & userInfo)721 int CommunicatorAggregator::GetDataUserId(const ParseResult &inResult, const LabelType &toLabel,
722     const DataUserInfoProc &userInfoProc, const std::string &device, UserInfo &userInfo)
723 {
724     if (userInfoProc.processCommunicator == nullptr) {
725         LOGE("[CommAggr][GetDataUserId] processCommunicator is nullptr");
726         return E_INVALID_ARGS;
727     }
728     std::string label(toLabel.begin(), toLabel.end());
729     std::vector<UserInfo> userInfos;
730     DataUserInfo dataUserInfo = {userInfoProc.data, userInfoProc.length, label, device};
731     DBStatus ret = userInfoProc.processCommunicator->GetDataUserInfo(dataUserInfo, userInfos);
732     LOGI("[CommAggr][GetDataUserId] get data user info, ret=%d", ret);
733     if (ret == NO_PERMISSION) {
734         LOGE("[CommAggr][GetDataUserId] userId dismatched, drop packet");
735         return ret;
736     } else if (ret == NEED_CORRECT_TARGET_USER) {
737         LOGW("[CommAggr][GetDataUserId] the target user is incorrect and needs to be corrected");
738         return ret;
739     }
740     if (!userInfos.empty()) {
741         userInfo = userInfos[0];
742     } else {
743         LOGW("[CommAggr][GetDataUserId] userInfos is empty");
744     }
745     return E_OK;
746 }
747 
TryDeliverAppLayerFrameToCommunicatorNoMutex(uint16_t remoteDbVersion,const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const LabelType & toLabel,const UserInfo & userInfo)748 int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(uint16_t remoteDbVersion,
749     const std::string &srcTarget, SerialBuffer *&inFrameBuffer, const LabelType &toLabel, const UserInfo &userInfo)
750 {
751     // Ignore nonactivated communicator, which is regarded as inexistent
752     const std::string &sendUser = userInfo.sendUser;
753     const std::string &receiveUser = userInfo.receiveUser;
754     if (commMap_[receiveUser].count(toLabel) != 0 && commMap_[receiveUser].at(toLabel).second) {
755         int ret = commMap_[receiveUser].at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer, sendUser,
756             remoteDbVersion);
757         // Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return.
758         if (ret == E_OK) {
759             inFrameBuffer = nullptr;
760         }
761         return ret;
762     }
763     Communicator *communicator = nullptr;
764     bool isEmpty = false;
765     for (auto &userCommMap : commMap_) {
766         for (auto &entry : userCommMap.second) {
767             if (entry.first == toLabel && entry.second.second) {
768                 communicator = entry.second.first;
769                 isEmpty = userCommMap.first.empty();
770                 LOGW("[CommAggr][TryDeliver] Found communicator of %s, but required user is %s",
771                      userCommMap.first.c_str(), receiveUser.c_str());
772                 break;
773             }
774         }
775         if (communicator != nullptr) {
776             break;
777         }
778     }
779     if (communicator != nullptr && (receiveUser.empty() || isEmpty)) {
780         int ret = communicator->OnBufferReceive(srcTarget, inFrameBuffer, sendUser, remoteDbVersion);
781         if (ret == E_OK) {
782             inFrameBuffer = nullptr;
783         }
784         return ret;
785     }
786     LOGE("[CommAggr][TryDeliver] Communicator not found");
787     return -E_NOT_FOUND;
788 }
789 
RegCallbackToAdapter()790 int CommunicatorAggregator::RegCallbackToAdapter()
791 {
792     RefObject::IncObjRef(this); // Reference to be hold by adapter
793     int errCode = adapterHandle_->RegBytesReceiveCallback(
794         [this](const ReceiveBytesInfo &receiveBytesInfo, const DataUserInfoProc &userInfoProc) {
795             OnBytesReceive(receiveBytesInfo, userInfoProc);
796         }, [this]() { RefObject::DecObjRef(this); });
797     if (errCode != E_OK) {
798         RefObject::DecObjRef(this); // Rollback in case reg failed
799         return errCode;
800     }
801 
802     RefObject::IncObjRef(this); // Reference to be hold by adapter
803     errCode = adapterHandle_->RegTargetChangeCallback(
804         [this](const std::string &target, bool isConnect) { OnTargetChange(target, isConnect); },
805         [this]() { RefObject::DecObjRef(this); });
806     if (errCode != E_OK) {
807         RefObject::DecObjRef(this); // Rollback in case reg failed
808         return errCode;
809     }
810 
811     RefObject::IncObjRef(this); // Reference to be hold by adapter
812     errCode = adapterHandle_->RegSendableCallback([this](const std::string &target, int deviceCommErrCode) {
813             LOGI("[CommAggr] Send able dev=%.3s, deviceCommErrCode=%d", target.c_str(), deviceCommErrCode);
814             (void)IncreaseSendSequenceId(target);
815             scheduler_.SetDeviceCommErrCode(target, deviceCommErrCode);
816             OnSendable(target);
817         },
818         [this]() { RefObject::DecObjRef(this); });
819     if (errCode != E_OK) {
820         RefObject::DecObjRef(this); // Rollback in case reg failed
821         return errCode;
822     }
823 
824     return E_OK;
825 }
826 
UnRegCallbackFromAdapter()827 void CommunicatorAggregator::UnRegCallbackFromAdapter()
828 {
829     adapterHandle_->RegBytesReceiveCallback(nullptr, nullptr);
830     adapterHandle_->RegTargetChangeCallback(nullptr, nullptr);
831     adapterHandle_->RegSendableCallback(nullptr, nullptr);
832     if (dbStatusAdapter_ != nullptr) {
833         dbStatusAdapter_->SetDBStatusChangeCallback(nullptr, nullptr, nullptr);
834     }
835 }
836 
GenerateLocalSourceId()837 void CommunicatorAggregator::GenerateLocalSourceId()
838 {
839     std::string identity;
840     adapterHandle_->GetLocalIdentity(identity);
841     // When GetLocalIdentity fail, the identity be an empty string, the localSourceId be zero, need regenerate
842     // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk
843     uint64_t identityHash = Hash::HashFunc(identity);
844     if (identityHash != localSourceId_) {
845         LOGI("[CommAggr][GenSrcId] identity=%s{private}, localSourceId=%" PRIu64, identity.c_str(), ULL(identityHash));
846     }
847     localSourceId_ = identityHash;
848 }
849 
ReGenerateLocalSourceIdIfNeed()850 bool CommunicatorAggregator::ReGenerateLocalSourceIdIfNeed()
851 {
852     // The deviceId will change when switch user from A to B
853     // We can't listen to the user change, because it's hard to ensure the timing is correct.
854     // So we regenerate to make sure the deviceId and localSourceId is correct when we create send task.
855     // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk, no need lockguard here.
856     GenerateLocalSourceId();
857     return (localSourceId_ != 0);
858 }
859 
TriggerVersionNegotiation(const std::string & dstTarget)860 void CommunicatorAggregator::TriggerVersionNegotiation(const std::string &dstTarget)
861 {
862     LOGI("[CommAggr][TrigVer] Do version negotiate with target=%s{private}.", dstTarget.c_str());
863     int errCode = E_OK;
864     SerialBuffer *buffer = ProtocolProto::BuildEmptyFrameForVersionNegotiate(errCode);
865     if (errCode != E_OK) {
866         LOGE("[CommAggr][TrigVer] Build empty frame fail, errCode=%d", errCode);
867         return;
868     }
869 
870     TaskConfig config{true, true, 0, Priority::HIGH};
871     errCode = ScheduleSendTask(dstTarget, buffer, FrameType::EMPTY, config);
872     if (errCode != E_OK) {
873         LOGE("[CommAggr][TrigVer] Send empty frame fail, errCode=%d", errCode);
874         // if send fails, free buffer, otherwise buffer will be taked over by SendTaskScheduler
875         delete buffer;
876         buffer = nullptr;
877     }
878 }
879 
TryToFeedbackWhenCommunicatorNotFound(const std::string & dstTarget,const LabelType & dstLabel,const SerialBuffer * inOriFrame,int inErrCode)880 void CommunicatorAggregator::TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget,
881     const LabelType &dstLabel, const SerialBuffer *inOriFrame, int inErrCode)
882 {
883     if (!isCommunicatorNotFoundFeedbackEnable_) {
884         return;
885     }
886     TryToFeedBackWithErr(dstTarget, dstLabel, inOriFrame, inErrCode);
887 }
888 
TryToFeedBackWithErr(const std::string & dstTarget,const DistributedDB::LabelType & dstLabel,const DistributedDB::SerialBuffer * inOriFrame,int inErrCode)889 void CommunicatorAggregator::TryToFeedBackWithErr(const std::string &dstTarget,
890     const DistributedDB::LabelType &dstLabel, const DistributedDB::SerialBuffer *inOriFrame, int inErrCode)
891 {
892     if (dstTarget.empty() || inOriFrame == nullptr) {
893         return;
894     }
895     int errCode = E_OK;
896     Message *message = ProtocolProto::ToMessage(inOriFrame, errCode, true);
897     if (message == nullptr) {
898         if (errCode == -E_VERSION_NOT_SUPPORT) {
899             TriggerVersionNegotiation(dstTarget);
900         }
901         return;
902     }
903     // Message is release in TriggerCommunicatorNotFoundFeedback
904     TriggerCommunicatorFeedback(dstTarget, dstLabel, message, inErrCode);
905 }
906 
TriggerCommunicatorFeedback(const std::string & dstTarget,const LabelType & dstLabel,Message * & oriMsg,int sendErrNo)907 void CommunicatorAggregator::TriggerCommunicatorFeedback(const std::string &dstTarget,
908     const LabelType &dstLabel, Message* &oriMsg, int sendErrNo)
909 {
910     if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) {
911         LOGI("[CommAggr][TrigNotFound] Do nothing for message with type not request.");
912         // Do not have to do feedback if the message is not a request type message
913         delete oriMsg;
914         oriMsg = nullptr;
915         return;
916     }
917 
918     LOGI("[CommAggr][TrigNotFound] Do communicator feedback with target=%s{private}, send error code=%d.",
919         dstTarget.c_str(), sendErrNo);
920     oriMsg->SetMessageType(TYPE_RESPONSE);
921     oriMsg->SetErrorNo(sendErrNo);
922 
923     int errCode = E_OK;
924     SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, dstLabel, errCode);
925     delete oriMsg;
926     oriMsg = nullptr;
927     if (errCode != E_OK) {
928         LOGE("[CommAggr][TrigNotFound] Build communicator feedback frame fail, errCode=%d", errCode);
929         return;
930     }
931 
932     TaskConfig config{true, true, 0, Priority::HIGH};
933     errCode = ScheduleSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, config);
934     if (errCode != E_OK) {
935         LOGE("[CommAggr][TrigNotFound] Send communicator feedback frame fail, errCode=%d", errCode);
936         // if send fails, free buffer, otherwise buffer will be taked over by ScheduleSendTask
937         delete buffer;
938         buffer = nullptr;
939     }
940 }
941 
SetRemoteCommunicatorVersion(const std::string & target,uint16_t version)942 void CommunicatorAggregator::SetRemoteCommunicatorVersion(const std::string &target, uint16_t version)
943 {
944     std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
945     versionMap_[target] = version;
946 }
947 
GetExtendHeaderHandle(const ExtendInfo & paramInfo)948 std::shared_ptr<ExtendHeaderHandle> CommunicatorAggregator::GetExtendHeaderHandle(const ExtendInfo &paramInfo)
949 {
950     if (adapterHandle_ == nullptr) {
951         return nullptr;
952     }
953     return adapterHandle_->GetExtendHeaderHandle(paramInfo);
954 }
955 
OnRemoteDBStatusChange(const std::string & devInfo,const std::vector<DBInfo> & dbInfos)956 void CommunicatorAggregator::OnRemoteDBStatusChange(const std::string &devInfo, const std::vector<DBInfo> &dbInfos)
957 {
958     std::map<LabelType, bool> changedLabels;
959     for (const auto &dbInfo: dbInfos) {
960         std::string label = DBCommon::GenerateHashLabel(dbInfo);
961         LabelType labelType(label.begin(), label.end());
962         changedLabels[labelType] = dbInfo.isNeedSync;
963     }
964     if (commLinker_ != nullptr) {
965         commLinker_->UpdateOnlineLabels(devInfo, changedLabels);
966     }
967     NotifyConnectChange(devInfo, changedLabels);
968 }
969 
NotifyConnectChange(const std::string & srcTarget,const std::map<LabelType,bool> & changedLabels)970 void CommunicatorAggregator::NotifyConnectChange(const std::string &srcTarget,
971     const std::map<LabelType, bool> &changedLabels)
972 {
973     if (commLinker_ != nullptr && !commLinker_->IsRemoteTargetOnline(srcTarget)) {
974         LOGW("[CommAggr][NotifyConnectChange] from offline target=%s{private}.", srcTarget.c_str());
975         for (const auto &entry : changedLabels) {
976             LOGW("[CommAggr] REMEMBER: label=%s, inOnline=%d.", VEC_TO_STR(entry.first), entry.second);
977         }
978         return;
979     }
980     // Do target change notify
981     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
982     for (auto &entry : changedLabels) {
983         for (auto &userCommMap : commMap_) {
984             // Ignore nonactivated communicator
985             if (userCommMap.second.count(entry.first) != 0 && userCommMap.second.at(entry.first).second) {
986                 LOGI("[CommAggr][NotifyConnectChange] label=%s, srcTarget=%s{private}, isOnline=%d.",
987                      VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second);
988                 userCommMap.second.at(entry.first).first->OnConnectChange(srcTarget, entry.second);
989             }
990         }
991     }
992 }
993 
RegDBChangeCallback()994 void CommunicatorAggregator::RegDBChangeCallback()
995 {
996     if (dbStatusAdapter_ != nullptr) {
997         dbStatusAdapter_->SetDBStatusChangeCallback(
998             [this](const std::string &devInfo, const std::vector<DBInfo> &dbInfos) {
999                 OnRemoteDBStatusChange(devInfo, dbInfos);
1000             },
1001             [this]() {
1002                 if (commLinker_ != nullptr) {
1003                     (void)commLinker_->TriggerLabelExchangeEvent(false);
1004                 }
1005             },
1006             [this](const std::string &dev) {
1007                 if (commLinker_ != nullptr) {
1008                     std::set<LabelType> relatedLabels;
1009                     (void)commLinker_->TargetOnline(dev, relatedLabels);
1010                 }
1011             });
1012     }
1013 }
InitSendThread()1014 void CommunicatorAggregator::InitSendThread()
1015 {
1016     if (RuntimeContext::GetInstance()->GetThreadPool() != nullptr) {
1017         return;
1018     }
1019     exclusiveThread_ = std::thread([this] { SendDataRoutine(); });
1020     useExclusiveThread_ = true;
1021 }
1022 
SendOnceData()1023 void CommunicatorAggregator::SendOnceData()
1024 {
1025     SendTask taskToSend;
1026     uint32_t totalLength = 0;
1027     int errCode = scheduler_.ScheduleOutSendTask(taskToSend, totalLength);
1028     if (errCode != E_OK) {
1029         return; // Not possible to happen
1030     }
1031     // <vector, extendHeadSize>
1032     std::vector<std::pair<std::vector<uint8_t>, uint32_t>> piecePackets;
1033     uint32_t mtu = adapterHandle_->GetMtuSize(taskToSend.dstTarget);
1034     if (taskToSend.buffer == nullptr) {
1035         LOGE("[CommAggr] buffer of taskToSend is nullptr.");
1036         return;
1037     }
1038     errCode = ProtocolProto::SplitFrameIntoPacketsIfNeed(taskToSend.buffer, mtu, piecePackets);
1039     if (errCode != E_OK) {
1040         LOGE("[CommAggr] Split frame fail, errCode=%d.", errCode);
1041         TaskFinalizer(taskToSend, errCode);
1042         return;
1043     }
1044     // <addr, <extendHeadSize, totalLen>>
1045     std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> eachPacket;
1046     if (piecePackets.empty()) {
1047         // Case that no need to split a frame, just use original buffer as a packet
1048         std::pair<const uint8_t *, uint32_t> tmpEntry = taskToSend.buffer->GetReadOnlyBytesForEntireBuffer();
1049         std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> entry;
1050         entry.first = tmpEntry.first - taskToSend.buffer->GetExtendHeadLength();
1051         entry.second.first = taskToSend.buffer->GetExtendHeadLength();
1052         entry.second.second = tmpEntry.second + entry.second.first;
1053         eachPacket.push_back(entry);
1054     } else {
1055         for (auto &entry : piecePackets) {
1056             std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> tmpEntry = {&(entry.first[0]),
1057                 {entry.second, entry.first.size()}};
1058             eachPacket.push_back(tmpEntry);
1059         }
1060     }
1061 
1062     SendPacketsAndDisposeTask(taskToSend, mtu, eachPacket, totalLength);
1063 }
1064 
TriggerSendData()1065 void CommunicatorAggregator::TriggerSendData()
1066 {
1067     if (useExclusiveThread_) {
1068         std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
1069         wakingSignal_ = true;
1070         wakingCv_.notify_one();
1071         return;
1072     }
1073     {
1074         std::lock_guard<std::mutex> autoLock(scheduleSendTaskMutex_);
1075         if (sendTaskStart_) {
1076             return;
1077         }
1078         sendTaskStart_ = true;
1079     }
1080     RefObject::IncObjRef(this);
1081     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
1082         LOGI("[CommAggr] Send thread start.");
1083         while (!shutdown_ && scheduler_.GetNoDelayTaskCount() != 0) {
1084             SendOnceData();
1085         }
1086         {
1087             std::lock_guard<std::mutex> autoLock(scheduleSendTaskMutex_);
1088             sendTaskStart_ = false;
1089         }
1090         if (!shutdown_ && scheduler_.GetNoDelayTaskCount() != 0) {
1091             TriggerSendData(); // avoid sendTaskStart_ was mark false after trigger thread check it
1092         }
1093         finalizeCv_.notify_one();
1094         RefObject::DecObjRef(this);
1095         LOGI("[CommAggr] Send thread end.");
1096     });
1097     if (errCode != E_OK) {
1098         LOGW("[CommAggr] Trigger send data failed %d", errCode);
1099         RefObject::DecObjRef(this);
1100     }
1101 }
1102 
ResetFrameRecordIfNeed(const uint32_t frameId,const uint32_t mtu)1103 void CommunicatorAggregator::ResetFrameRecordIfNeed(const uint32_t frameId, const uint32_t mtu)
1104 {
1105     std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
1106     if (sendRecord_[frameId].splitMtu == 0u || sendRecord_[frameId].splitMtu != mtu) {
1107         sendRecord_[frameId].splitMtu = mtu;
1108         sendRecord_[frameId].sendIndex = 0u;
1109     }
1110 }
1111 
RetrySendTaskIfNeed(const std::string & target,uint64_t sendSequenceId)1112 void CommunicatorAggregator::RetrySendTaskIfNeed(const std::string &target, uint64_t sendSequenceId)
1113 {
1114     if (IsRetryOutOfLimit(target)) {
1115         LOGD("[CommAggr] Retry send task is out of limit! target is %s{private}", target.c_str());
1116         scheduler_.InvalidSendTask(target);
1117         std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1118         retryCount_[target] = 0;
1119     } else {
1120         RetrySendTask(target, sendSequenceId);
1121         if (sendSequenceId != GetSendSequenceId(target)) {
1122             LOGD("[CommAggr] %.3s Send sequence id has changed", target.c_str());
1123             return;
1124         }
1125         scheduler_.DelayTaskByTarget(target);
1126     }
1127 }
1128 
RetrySendTask(const std::string & target,uint64_t sendSequenceId)1129 void CommunicatorAggregator::RetrySendTask(const std::string &target, uint64_t sendSequenceId)
1130 {
1131     int32_t currentRetryCount = 0;
1132     {
1133         std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1134         retryCount_[target]++;
1135         currentRetryCount = retryCount_[target];
1136         LOGD("[CommAggr] Target %s{private} retry count is %" PRId32, target.c_str(), currentRetryCount);
1137     }
1138     TimerId timerId = 0u;
1139     RefObject::IncObjRef(this);
1140     (void)RuntimeContext::GetInstance()->SetTimer(GetNextRetryInterval(target, currentRetryCount),
1141         [this, target, sendSequenceId](TimerId id) {
1142         if (sendSequenceId == GetSendSequenceId(target)) {
1143             OnSendable(target);
1144         } else {
1145             LOGD("[CommAggr] %.3s Send sequence id has changed in timer", target.c_str());
1146         }
1147         RefObject::DecObjRef(this);
1148         return -E_END_TIMER;
1149     }, nullptr, timerId);
1150 }
1151 
IsRetryOutOfLimit(const std::string & target)1152 bool CommunicatorAggregator::IsRetryOutOfLimit(const std::string &target)
1153 {
1154     std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1155     return retryCount_[target] >= MAX_SEND_RETRY;
1156 }
1157 
GetNextRetryInterval(const std::string & target,int32_t currentRetryCount)1158 int32_t CommunicatorAggregator::GetNextRetryInterval(const std::string &target, int32_t currentRetryCount)
1159 {
1160     uint32_t timeout = DBConstant::MIN_TIMEOUT;
1161     if (adapterHandle_ != nullptr) {
1162         timeout = adapterHandle_->GetTimeout(target);
1163     }
1164     return static_cast<int32_t>(timeout) * currentRetryCount / RETRY_TIME_SPLIT;
1165 }
1166 
GetSendSequenceId(const std::string & target)1167 uint64_t CommunicatorAggregator::GetSendSequenceId(const std::string &target)
1168 {
1169     std::lock_guard<std::mutex> autoLock(sendSequenceMutex_);
1170     return sendSequence_[target];
1171 }
1172 
IncreaseSendSequenceId(const std::string & target)1173 uint64_t CommunicatorAggregator::IncreaseSendSequenceId(const std::string &target)
1174 {
1175     std::lock_guard<std::mutex> autoLock(sendSequenceMutex_);
1176     return ++sendSequence_[target];
1177 }
1178 
ClearOnlineLabel()1179 void CommunicatorAggregator::ClearOnlineLabel()
1180 {
1181     std::lock_guard<std::mutex> autoLock(commMapMutex_);
1182     if (commLinker_ == nullptr) {
1183         LOGE("[CommAggr] clear online label with null linker");
1184         return;
1185     }
1186     commLinker_->ClearOnlineLabel();
1187 }
1188 
ReTryDeliverAppLayerFrameOnCommunicatorNotFound(const ReceiveBytesInfo & receiveBytesInfo,SerialBuffer * & inFrameBuffer,const ParseResult & inResult,const DataUserInfoProc & userInfoProc,const UserInfo & userInfo)1189 int CommunicatorAggregator::ReTryDeliverAppLayerFrameOnCommunicatorNotFound(const ReceiveBytesInfo &receiveBytesInfo,
1190     SerialBuffer *&inFrameBuffer, const ParseResult &inResult, const DataUserInfoProc &userInfoProc,
1191     const UserInfo &userInfo)
1192 {
1193     LabelType toLabel = inResult.GetCommLabel();
1194     uint16_t remoteDbVersion = inResult.GetDbVersion();
1195     int errCode = -E_NOT_FOUND;
1196     {
1197         std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
1198         if (onCommLackHandle_) {
1199             errCode = onCommLackHandle_(toLabel, userInfo.receiveUser);
1200             LOGI("[CommAggr][AppReceive] On CommLack End."); // Log in case callback block this thread
1201         } else {
1202             LOGI("[CommAggr][AppReceive] CommLackHandle invalid currently.");
1203         }
1204     }
1205     // Here we have to lock commMapMutex_ and search communicator again.
1206     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
1207     int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(remoteDbVersion, receiveBytesInfo.srcTarget,
1208         inFrameBuffer, toLabel, userInfo);
1209     if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK.
1210         LOGI("[CommAggr][AppReceive] Communicator of %.3s found after try again(rare case).", VEC_TO_STR(toLabel));
1211         return E_OK;
1212     }
1213     // Here, communicator is still not found, retain or discard according to the result of onCommLackHandle_
1214     if (errCode != E_OK || errCodeAgain == -E_FEEDBACK_DB_CLOSING) {
1215         TryToFeedbackWhenCommunicatorNotFound(receiveBytesInfo.srcTarget, toLabel, inFrameBuffer,
1216             errCodeAgain == -E_FEEDBACK_DB_CLOSING ? E_FEEDBACK_DB_CLOSING : E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
1217         if (inFrameBuffer != nullptr) {
1218             delete inFrameBuffer;
1219             inFrameBuffer = nullptr;
1220         }
1221         return errCode == E_OK ? errCodeAgain : errCode; // The caller will display errCode in log
1222     }
1223     // Do Retention, the retainer is responsible to deal with the frame
1224     retainer_.RetainFrame(FrameInfo{inFrameBuffer, receiveBytesInfo.srcTarget, userInfo.sendUser, toLabel,
1225         inResult.GetFrameId(), remoteDbVersion});
1226     inFrameBuffer = nullptr;
1227     return E_OK;
1228 }
1229 
ResetRetryCount()1230 void CommunicatorAggregator::ResetRetryCount()
1231 {
1232     std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1233     retryCount_.clear();
1234 }
1235 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorAggregator)
1236 } // namespace DistributedDB
1237