• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "communicator_aggregator.h"
17 
18 #include <sstream>
19 #include "communicator.h"
20 #include "communicator_linker.h"
21 #include "db_common.h"
22 #include "endian_convert.h"
23 #include "hash.h"
24 #include "log_print.h"
25 #include "protocol_proto.h"
26 
27 namespace DistributedDB {
28 namespace {
GetThreadId()29 inline std::string GetThreadId()
30 {
31     std::stringstream stream;
32     stream << std::this_thread::get_id();
33     return stream.str();
34 }
35 }
36 
37 std::atomic<bool> CommunicatorAggregator::isCommunicatorNotFoundFeedbackEnable_{true};
38 
CommunicatorAggregator()39 CommunicatorAggregator::CommunicatorAggregator()
40     : shutdown_(false),
41       incFrameId_(0),
42       localSourceId_(0)
43 {
44 }
45 
~CommunicatorAggregator()46 CommunicatorAggregator::~CommunicatorAggregator()
47 {
48     scheduler_.Finalize(); // Clear residual frame dumped by linker after CommunicatorAggregator finalize
49     adapterHandle_ = nullptr;
50     commLinker_ = nullptr;
51 }
52 
Initialize(IAdapter * inAdapter)53 int CommunicatorAggregator::Initialize(IAdapter *inAdapter)
54 {
55     if (inAdapter == nullptr) {
56         return -E_INVALID_ARGS;
57     }
58     adapterHandle_ = inAdapter;
59 
60     combiner_.Initialize();
61     retainer_.Initialize();
62     scheduler_.Initialize();
63 
64     int errCode;
65     commLinker_ = new (std::nothrow) CommunicatorLinker(this);
66     if (commLinker_ == nullptr) {
67         errCode = -E_OUT_OF_MEMORY;
68         goto ROLL_BACK;
69     }
70     commLinker_->Initialize();
71 
72     errCode = RegCallbackToAdapter();
73     if (errCode != E_OK) {
74         goto ROLL_BACK;
75     }
76 
77     errCode = adapterHandle_->StartAdapter();
78     if (errCode != E_OK) {
79         LOGE("[CommAggr][Init] Start Adapter Fail, errCode=%d.", errCode);
80         goto ROLL_BACK;
81     }
82     GenerateLocalSourceId();
83 
84     shutdown_ = false;
85     InitSendThread();
86     return E_OK;
87 ROLL_BACK:
88     UnRegCallbackFromAdapter();
89     if (commLinker_ != nullptr) {
90         RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
91         commLinker_ = nullptr;
92     }
93     // Scheduler do not need to do finalize in this roll_back
94     retainer_.Finalize();
95     combiner_.Finalize();
96     return errCode;
97 }
98 
Finalize()99 void CommunicatorAggregator::Finalize()
100 {
101     shutdown_ = true;
102     retryCv_.notify_all();
103     {
104         std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
105         wakingSignal_ = true;
106         wakingCv_.notify_one();
107     }
108     if (useExclusiveThread_) {
109         exclusiveThread_.join(); // Waiting thread to thoroughly quit
110         LOGI("[CommAggr][Final] Sub Thread Exit.");
111     } else {
112         LOGI("[CommAggr][Final] Begin wait send task exit.");
113         std::unique_lock<std::mutex> scheduleSendTaskLock(scheduleSendTaskMutex_);
114         finalizeCv_.wait(scheduleSendTaskLock, [this]() {
115             return !sendTaskStart_;
116         });
117         LOGI("[CommAggr][Final] End wait send task exit.");
118     }
119     scheduler_.Finalize(); // scheduler_ must finalize here to make space for linker to dump residual frame
120 
121     adapterHandle_->StopAdapter();
122     UnRegCallbackFromAdapter();
123     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure all callback thread quit
124 
125     // No callback now and later, so combiner, retainer and linker can finalize or delete safely
126     RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
127     commLinker_ = nullptr;
128     retainer_.Finalize();
129     combiner_.Finalize();
130 }
131 
AllocCommunicator(uint64_t commLabel,int & outErrorNo)132 ICommunicator *CommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo)
133 {
134     uint64_t netOrderLabel = HostToNet(commLabel);
135     uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
136     std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
137     for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
138         realLabel[i] = eachByte[i];
139     }
140     return AllocCommunicator(realLabel, outErrorNo);
141 }
142 
AllocCommunicator(const std::vector<uint8_t> & commLabel,int & outErrorNo)143 ICommunicator *CommunicatorAggregator::AllocCommunicator(const std::vector<uint8_t> &commLabel, int &outErrorNo)
144 {
145     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
146     LOGI("[CommAggr][Alloc] Label=%.3s.", VEC_TO_STR(commLabel));
147     if (commLabel.size() != COMM_LABEL_LENGTH) {
148         outErrorNo = -E_INVALID_ARGS;
149         return nullptr;
150     }
151 
152     if (commMap_.count(commLabel) != 0) {
153         outErrorNo = -E_ALREADY_ALLOC;
154         return nullptr;
155     }
156 
157     Communicator *commPtr = new (std::nothrow) Communicator(this, commLabel);
158     if (commPtr == nullptr) {
159         outErrorNo = -E_OUT_OF_MEMORY;
160         return nullptr;
161     }
162     commMap_[commLabel] = {commPtr, false}; // Communicator is not activated when allocated
163     return commPtr;
164 }
165 
ReleaseCommunicator(ICommunicator * inCommunicator)166 void CommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator)
167 {
168     if (inCommunicator == nullptr) {
169         return;
170     }
171     Communicator *commPtr = static_cast<Communicator *>(inCommunicator);
172     LabelType commLabel = commPtr->GetCommunicatorLabel();
173     LOGI("[CommAggr][Release] Label=%.3s.", VEC_TO_STR(commLabel));
174 
175     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
176     if (commMap_.count(commLabel) == 0) {
177         LOGE("[CommAggr][Release] Not Found.");
178         return;
179     }
180     commMap_.erase(commLabel);
181     RefObject::DecObjRef(commPtr); // Refcount of Communicator is 1 when created, here to unref Communicator
182 
183     int errCode = commLinker_->DecreaseLocalLabel(commLabel);
184     if (errCode != E_OK) {
185         LOGE("[CommAggr][Release] DecreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
186     }
187 }
188 
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)189 int CommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
190     const Finalizer &inOper)
191 {
192     std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
193     return RegCallBack(onCommLack, onCommLackHandle_, inOper, onCommLackFinalizer_);
194 }
195 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)196 int CommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
197 {
198     std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
199     int errCode = RegCallBack(onConnect, onConnectHandle_, inOper, onConnectFinalizer_);
200     if (onConnect && errCode == E_OK) {
201         // Register action and success
202         std::set<std::string> onlineTargets = commLinker_->GetOnlineRemoteTarget();
203         for (auto &entry : onlineTargets) {
204             LOGI("[CommAggr][RegConnect] Online target=%s{private}.", entry.c_str());
205             onConnectHandle_(entry, true);
206         }
207     }
208     return errCode;
209 }
210 
GetCommunicatorAggregatorMtuSize() const211 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize() const
212 {
213     return adapterHandle_->GetMtuSize() - ProtocolProto::GetLengthBeforeSerializedData();
214 }
215 
GetCommunicatorAggregatorMtuSize(const std::string & target) const216 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize(const std::string &target) const
217 {
218     return adapterHandle_->GetMtuSize(target) - ProtocolProto::GetLengthBeforeSerializedData();
219 }
220 
GetCommunicatorAggregatorTimeout() const221 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout() const
222 {
223     return adapterHandle_->GetTimeout();
224 }
225 
GetCommunicatorAggregatorTimeout(const std::string & target) const226 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout(const std::string &target) const
227 {
228     return adapterHandle_->GetTimeout(target);
229 }
230 
IsDeviceOnline(const std::string & device) const231 bool CommunicatorAggregator::IsDeviceOnline(const std::string &device) const
232 {
233     return adapterHandle_->IsDeviceOnline(device);
234 }
235 
GetLocalIdentity(std::string & outTarget) const236 int CommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
237 {
238     return adapterHandle_->GetLocalIdentity(outTarget);
239 }
240 
ActivateCommunicator(const LabelType & commLabel)241 void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel)
242 {
243     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
244     LOGI("[CommAggr][Activate] Label=%.3s.", VEC_TO_STR(commLabel));
245     if (commMap_.count(commLabel) == 0) {
246         LOGW("[CommAggr][Activate] Communicator of this label not allocated.");
247         return;
248     }
249     if (commMap_.at(commLabel).second) {
250         return;
251     }
252     commMap_.at(commLabel).second = true; // Mark this communicator as activated
253 
254     // IncreaseLocalLabel below and DecreaseLocalLabel in ReleaseCommunicator should all be protected by commMapMutex_
255     // To avoid disordering probably caused by concurrent call to ActivateCommunicator and ReleaseCommunicator
256     std::set<std::string> onlineTargets;
257     int errCode = commLinker_->IncreaseLocalLabel(commLabel, onlineTargets);
258     if (errCode != E_OK) {
259         LOGE("[CommAggr][Activate] IncreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
260         // Do not return here
261     }
262     for (auto &entry : onlineTargets) {
263         LOGI("[CommAggr][Activate] Already Online Target=%s{private}.", entry.c_str());
264         commMap_.at(commLabel).first->OnConnectChange(entry, true);
265     }
266     // Do Redeliver, the communicator is responsible to deal with the frame
267     std::list<FrameInfo> framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel);
268     for (auto &entry : framesToRedeliver) {
269         commMap_.at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer);
270     }
271 }
272 
273 namespace {
DoOnSendEndByTaskIfNeed(const OnSendEnd & onEnd,int result)274 void DoOnSendEndByTaskIfNeed(const OnSendEnd &onEnd, int result)
275 {
276     if (onEnd) {
277         TaskAction onSendEndTask = [onEnd, result]() {
278             LOGD("[CommAggr][SendEndTask] Before On Send End.");
279             onEnd(result);
280             LOGD("[CommAggr][SendEndTask] After On Send End.");
281         };
282         int errCode = RuntimeContext::GetInstance()->ScheduleTask(onSendEndTask);
283         if (errCode != E_OK) {
284             LOGE("[CommAggr][SendEndTask] ScheduleTask failed, errCode = %d.", errCode);
285         }
286     }
287 }
288 }
289 
ScheduleSendTask(const std::string & dstTarget,SerialBuffer * inBuff,FrameType inType,const TaskConfig & inConfig,const OnSendEnd & onEnd)290 int CommunicatorAggregator::ScheduleSendTask(const std::string &dstTarget, SerialBuffer *inBuff,
291     FrameType inType, const TaskConfig &inConfig, const OnSendEnd &onEnd)
292 {
293     if (inBuff == nullptr) {
294         return -E_INVALID_ARGS;
295     }
296 
297     if (!ReGenerateLocalSourceIdIfNeed()) {
298         delete inBuff;
299         inBuff = nullptr;
300         DoOnSendEndByTaskIfNeed(onEnd, -E_PERIPHERAL_INTERFACE_FAIL);
301         LOGE("[CommAggr][Create] Exit ok but discard since localSourceId zero, thread=%s.", GetThreadId().c_str());
302         return E_OK; // Returns E_OK here to indicate this buffer was accepted though discard immediately
303     }
304     PhyHeaderInfo info{localSourceId_, incFrameId_.fetch_add(1, std::memory_order_seq_cst), inType};
305     int errCode = ProtocolProto::SetPhyHeader(inBuff, info);
306     if (errCode != E_OK) {
307         LOGE("[CommAggr][Create] Set phyHeader fail, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
308         return errCode;
309     }
310     {
311         std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
312         sendRecord_[info.frameId] = {};
313     }
314     SendTask task{inBuff, dstTarget, onEnd, info.frameId};
315     if (inConfig.nonBlock) {
316         errCode = scheduler_.AddSendTaskIntoSchedule(task, inConfig.prio);
317     } else {
318         errCode = RetryUntilTimeout(task, inConfig.timeout, inConfig.prio);
319     }
320     if (errCode != E_OK) {
321         LOGW("[CommAggr][Create] Exit failed, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
322         return errCode;
323     }
324     TriggerSendData();
325     LOGI("[CommAggr][Create] Exit ok, dev=%.3s, frameId=%u", dstTarget.c_str(), info.frameId);
326     return E_OK;
327 }
328 
EnableCommunicatorNotFoundFeedback(bool isEnable)329 void CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(bool isEnable)
330 {
331     isCommunicatorNotFoundFeedbackEnable_ = isEnable;
332 }
333 
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const334 int CommunicatorAggregator::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
335 {
336     std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
337     auto pair = versionMap_.find(target);
338     if (pair == versionMap_.end()) {
339         return -E_NOT_FOUND;
340     }
341     outVersion = pair->second;
342     return E_OK;
343 }
344 
SendDataRoutine()345 void CommunicatorAggregator::SendDataRoutine()
346 {
347     while (!shutdown_) {
348         if (scheduler_.GetNoDelayTaskCount() == 0) {
349             std::unique_lock<std::mutex> wakingUniqueLock(wakingMutex_);
350             LOGI("[CommAggr][Routine] Send done and sleep.");
351             wakingCv_.wait(wakingUniqueLock, [this] { return this->wakingSignal_; });
352             LOGI("[CommAggr][Routine] Send continue.");
353             wakingSignal_ = false;
354             continue;
355         }
356         SendOnceData();
357     }
358 }
359 
SendPacketsAndDisposeTask(const SendTask & inTask,uint32_t mtu,const std::vector<std::pair<const uint8_t *,std::pair<uint32_t,uint32_t>>> & eachPacket,uint32_t totalLength)360 void CommunicatorAggregator::SendPacketsAndDisposeTask(const SendTask &inTask, uint32_t mtu,
361     const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket, uint32_t totalLength)
362 {
363     bool taskNeedFinalize = true;
364     int errCode = E_OK;
365     ResetFrameRecordIfNeed(inTask.frameId, mtu);
366     uint32_t startIndex;
367     {
368         std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
369         startIndex = sendRecord_[inTask.frameId].sendIndex;
370     }
371     for (uint32_t index = startIndex; index < static_cast<uint32_t>(eachPacket.size()); ++index) {
372         auto &entry = eachPacket[index];
373         LOGI("[CommAggr][SendPackets] DoSendBytes, dstTarget=%s{private}, extendHeadLength=%" PRIu32
374             ", totalLength=%" PRIu32 ".", inTask.dstTarget.c_str(), entry.second.first, entry.second.second);
375         ProtocolProto::DisplayPacketInformation(entry.first + entry.second.first, entry.second.second);
376         errCode = adapterHandle_->SendBytes(inTask.dstTarget, entry.first, entry.second.second, totalLength);
377         {
378             std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
379             sendRecord_[inTask.frameId].sendIndex = index;
380         }
381         if (errCode == -E_WAIT_RETRY) {
382             LOGE("[CommAggr][SendPackets] SendBytes temporally fail.");
383             scheduler_.DelayTaskByTarget(inTask.dstTarget);
384             taskNeedFinalize = false;
385             break;
386         } else if (errCode != E_OK) {
387             LOGE("[CommAggr][SendPackets] SendBytes totally fail, errCode=%d.", errCode);
388             break;
389         }
390     }
391     if (errCode == -E_WAIT_RETRY) {
392         const int RETRY_INTERVAL = 1000;
393         TimerId timerId = 0u;
394         const std::string target = inTask.dstTarget;
395         RefObject::IncObjRef(this);
396         errCode = RuntimeContext::GetInstance()->SetTimer(RETRY_INTERVAL, [this, target](TimerId id) {
397             OnSendable(target);
398             RefObject::DecObjRef(this);
399             return -E_END_TIMER;
400         }, nullptr, timerId);
401     }
402     if (taskNeedFinalize) {
403         TaskFinalizer(inTask, errCode);
404         std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
405         sendRecord_.erase(inTask.frameId);
406     }
407 }
408 
RetryUntilTimeout(SendTask & inTask,uint32_t timeout,Priority inPrio)409 int CommunicatorAggregator::RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio)
410 {
411     int errCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
412     if (errCode != E_OK) {
413         bool notTimeout = true;
414         auto retryFunc = [this, inPrio, &inTask]()->bool {
415             if (this->shutdown_) {
416                 delete inTask.buffer;
417                 inTask.buffer = nullptr;
418                 return true;
419             }
420             int retCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
421             if (retCode != E_OK) {
422                 return false;
423             }
424             return true;
425         };
426 
427         if (timeout == 0) { // Unlimited retry
428             std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
429             retryCv_.wait(retryUniqueLock, retryFunc);
430         } else {
431             std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
432             notTimeout = retryCv_.wait_for(retryUniqueLock, std::chrono::milliseconds(timeout), retryFunc);
433         }
434 
435         if (shutdown_) {
436             return E_OK;
437         }
438         if (!notTimeout) {
439             return -E_TIMEOUT;
440         }
441     }
442     return E_OK;
443 }
444 
TaskFinalizer(const SendTask & inTask,int result)445 void CommunicatorAggregator::TaskFinalizer(const SendTask &inTask, int result)
446 {
447     // Call the OnSendEnd if need
448     if (inTask.onEnd) {
449         LOGD("[CommAggr][TaskFinal] On Send End.");
450         inTask.onEnd(result);
451     }
452     // Finalize the task that just scheduled
453     int errCode = scheduler_.FinalizeLastScheduleTask();
454     // Notify Sendable To All Communicator If Need
455     if (errCode == -E_CONTAINER_FULL_TO_NOTFULL) {
456         retryCv_.notify_all();
457     }
458     if (errCode == -E_CONTAINER_NOTEMPTY_TO_EMPTY) {
459         NotifySendableToAllCommunicator();
460     }
461 }
462 
NotifySendableToAllCommunicator()463 void CommunicatorAggregator::NotifySendableToAllCommunicator()
464 {
465     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
466     for (auto &entry : commMap_) {
467         // Ignore nonactivated communicator
468         if (entry.second.second) {
469             entry.second.first->OnSendAvailable();
470         }
471     }
472 }
473 
OnBytesReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const std::string & userId)474 void CommunicatorAggregator::OnBytesReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
475     const std::string &userId)
476 {
477     ProtocolProto::DisplayPacketInformation(bytes, length);
478     ParseResult packetResult;
479     int errCode = ProtocolProto::CheckAndParsePacket(srcTarget, bytes, length, packetResult);
480     if (errCode != E_OK) {
481         LOGE("[CommAggr][Receive] Parse packet fail, errCode=%d.", errCode);
482         if (errCode == -E_VERSION_NOT_SUPPORT) {
483             TriggerVersionNegotiation(srcTarget);
484         }
485         return;
486     }
487 
488     // Update version of remote target
489     SetRemoteCommunicatorVersion(srcTarget, packetResult.GetDbVersion());
490     if (packetResult.GetFrameTypeInfo() == FrameType::EMPTY) { // Empty frame will never be fragmented
491         LOGI("[CommAggr][Receive] Empty frame, just ignore in this version of distributeddb.");
492         return;
493     }
494 
495     if (packetResult.IsFragment()) {
496         OnFragmentReceive(srcTarget, bytes, length, packetResult, userId);
497     } else if (packetResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
498         errCode = OnCommLayerFrameReceive(srcTarget, packetResult);
499         if (errCode != E_OK) {
500             LOGE("[CommAggr][Receive] CommLayer receive fail, errCode=%d.", errCode);
501         }
502     } else {
503         errCode = OnAppLayerFrameReceive(srcTarget, bytes, length, packetResult, userId);
504         if (errCode != E_OK) {
505             LOGE("[CommAggr][Receive] AppLayer receive fail, errCode=%d.", errCode);
506         }
507     }
508 }
509 
OnTargetChange(const std::string & target,bool isConnect)510 void CommunicatorAggregator::OnTargetChange(const std::string &target, bool isConnect)
511 {
512     if (target.empty()) {
513         LOGE("[CommAggr][OnTarget] Target empty string.");
514         return;
515     }
516     // For process level target change
517     {
518         std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
519         if (onConnectHandle_) {
520             onConnectHandle_(target, isConnect);
521             LOGI("[CommAggr][OnTarget] On Connect End."); // Log in case callback block this thread
522         } else {
523             LOGI("[CommAggr][OnTarget] ConnectHandle invalid currently.");
524         }
525     }
526     std::set<LabelType> relatedLabels;
527     // For communicator level target change
528     if (isConnect) {
529         int errCode = commLinker_->TargetOnline(target, relatedLabels);
530         if (errCode != E_OK) {
531             LOGE("[CommAggr][OnTarget] TargetOnline fail, target=%s{private}, errCode=%d.", target.c_str(), errCode);
532         }
533     } else {
534         commLinker_->TargetOffline(target, relatedLabels);
535     }
536     // All related communicator online or offline this target, no matter TargetOnline or TargetOffline fail or not
537     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
538     for (auto &entry : commMap_) {
539         // Ignore nonactivated communicator
540         if (entry.second.second && (!isConnect || (relatedLabels.count(entry.first) != 0))) {
541             entry.second.first->OnConnectChange(target, isConnect);
542         }
543     }
544 }
545 
OnSendable(const std::string & target)546 void CommunicatorAggregator::OnSendable(const std::string &target)
547 {
548     int errCode = scheduler_.NoDelayTaskByTarget(target);
549     if (errCode != E_OK) {
550         LOGE("[CommAggr][Sendable] NoDelay target=%s{private} fail, errCode=%d.", target.c_str(), errCode);
551         return;
552     }
553     TriggerSendData();
554 }
555 
OnFragmentReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const std::string & userId)556 void CommunicatorAggregator::OnFragmentReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
557     const ParseResult &inResult, const std::string &userId)
558 {
559     int errorNo = E_OK;
560     ParseResult frameResult;
561     SerialBuffer *frameBuffer = combiner_.AssembleFrameFragment(bytes, length, inResult, frameResult, errorNo);
562     if (errorNo != E_OK) {
563         LOGE("[CommAggr][Receive] Combine fail, errCode=%d.", errorNo);
564         return;
565     }
566     if (frameBuffer == nullptr) {
567         LOGW("[CommAggr][Receive] Combine undone.");
568         return;
569     }
570 
571     int errCode = ProtocolProto::CheckAndParseFrame(frameBuffer, frameResult);
572     if (errCode != E_OK) {
573         LOGE("[CommAggr][Receive] Parse frame fail, errCode=%d.", errCode);
574         delete frameBuffer;
575         frameBuffer = nullptr;
576         if (errCode == -E_VERSION_NOT_SUPPORT) {
577             TriggerVersionNegotiation(srcTarget);
578         }
579         return;
580     }
581 
582     if (frameResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
583         errCode = OnCommLayerFrameReceive(srcTarget, frameResult);
584         if (errCode != E_OK) {
585             LOGE("[CommAggr][Receive] CommLayer receive fail after combination, errCode=%d.", errCode);
586         }
587         delete frameBuffer;
588         frameBuffer = nullptr;
589     } else {
590         errCode = OnAppLayerFrameReceive(srcTarget, frameBuffer, frameResult, userId);
591         if (errCode != E_OK) {
592             LOGE("[CommAggr][Receive] AppLayer receive fail after combination, errCode=%d.", errCode);
593         }
594     }
595 }
596 
OnCommLayerFrameReceive(const std::string & srcTarget,const ParseResult & inResult)597 int CommunicatorAggregator::OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult)
598 {
599     if (inResult.GetFrameTypeInfo() == FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK) {
600         int errCode = commLinker_->ReceiveLabelExchangeAck(srcTarget, inResult.GetLabelExchangeDistinctValue(),
601             inResult.GetLabelExchangeSequenceId());
602         if (errCode != E_OK) {
603             LOGE("[CommAggr][CommReceive] Receive LabelExchangeAck Fail.");
604             return errCode;
605         }
606     } else {
607         std::map<LabelType, bool> changedLabels;
608         int errCode = commLinker_->ReceiveLabelExchange(srcTarget, inResult.GetLatestCommLabels(),
609             inResult.GetLabelExchangeDistinctValue(), inResult.GetLabelExchangeSequenceId(), changedLabels);
610         if (errCode != E_OK) {
611             LOGE("[CommAggr][CommReceive] Receive LabelExchange Fail.");
612             return errCode;
613         }
614         if (!commLinker_->IsRemoteTargetOnline(srcTarget)) {
615             LOGW("[CommAggr][CommReceive] Receive LabelExchange from offline target=%s{private}.", srcTarget.c_str());
616             for (const auto &entry : changedLabels) {
617                 LOGW("[CommAggr][CommReceive] REMEMBER: label=%.3s, inOnline=%d.", VEC_TO_STR(entry.first),
618                     entry.second);
619             }
620             return E_OK;
621         }
622         // Do target change notify
623         std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
624         for (auto &entry : changedLabels) {
625             // Ignore nonactivated communicator
626             if (commMap_.count(entry.first) != 0 && commMap_.at(entry.first).second) {
627                 LOGI("[CommAggr][CommReceive] label=%.3s, srcTarget=%s{private}, isOnline=%d.",
628                     VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second);
629                 commMap_.at(entry.first).first->OnConnectChange(srcTarget, entry.second);
630             }
631         }
632     }
633     return E_OK;
634 }
635 
OnAppLayerFrameReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const std::string & userId)636 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, const uint8_t *bytes,
637     uint32_t length, const ParseResult &inResult, const std::string &userId)
638 {
639     SerialBuffer *buffer = new (std::nothrow) SerialBuffer();
640     if (buffer == nullptr) {
641         LOGE("[CommAggr][AppReceive] New SerialBuffer fail.");
642         return -E_OUT_OF_MEMORY;
643     }
644     int errCode = buffer->SetExternalBuff(bytes, length - inResult.GetPaddingLen(),
645         ProtocolProto::GetAppLayerFrameHeaderLength());
646     if (errCode != E_OK) {
647         LOGE("[CommAggr][AppReceive] SetExternalBuff fail, errCode=%d.", errCode);
648         delete buffer;
649         buffer = nullptr;
650         return -E_INTERNAL_ERROR;
651     }
652     return OnAppLayerFrameReceive(srcTarget, buffer, inResult, userId);
653 }
654 
655 // In early time, we cover "OnAppLayerFrameReceive" totally by commMapMutex_, then search communicator, if not found,
656 // we call onCommLackHandle_ if exist to ask whether to retain this frame or not, if the answer is yes we retain this
657 // frame, otherwise we discard this frame and send out CommunicatorNotFound feedback.
658 // We design so(especially cover this function totally by commMapMutex_) to avoid current situation described below
659 // 1:This func find that target communicator not allocated or activated, so decide to retain this frame.
660 // 2:Thread switch out, the target communicator is allocated and activated, previous retained frame is fetched out.
661 // 3:Thread switch back, this frame is then retained into the retainer, no chance to be fetched out.
662 // In conclusion: the decision to retain a frame and the action to retain a frame should not be separated.
663 // Otherwise, at the action time, the retain decision may be obsolete and wrong.
664 // #### BUT #### since onCommLackHandle_ callback is go beyond DistributedDB and there is the risk that the final upper
665 // user may do something such as GetKvStore(we can prevent them to so) which could result in calling AllocCommunicator
666 // in the same callback thread finally causing DeadLock on commMapMutex_.
667 // #### SO #### we have to make a change described below
668 // 1:Search communicator under commMapMutex_, if found then deliver frame to that communicator and end.
669 // 2:Call onCommLackHandle_ if exist to ask whether to retain this frame or not, without commMapMutex_.
670 // Note: during this period, commMap_ maybe changed, and communicator not found before may exist now.
671 // 3:Search communicator under commMapMutex_ again, if found then deliver frame to that communicator and end.
672 // 4:If still not found, retain this frame if need or otherwise send CommunicatorNotFound feedback.
OnAppLayerFrameReceive(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const ParseResult & inResult,const std::string & userId)673 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, SerialBuffer *&inFrameBuffer,
674     const ParseResult &inResult, const std::string &userId)
675 {
676     LabelType toLabel = inResult.GetCommLabel();
677     {
678         std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
679         int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel);
680         if (errCode == E_OK) { // Attention: Here is equal to E_OK
681             return E_OK;
682         }
683     }
684     LOGI("[CommAggr][AppReceive] Communicator of %.3s not found or nonactivated.", VEC_TO_STR(toLabel));
685     int errCode = -E_NOT_FOUND;
686     {
687         std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
688         if (onCommLackHandle_) {
689             errCode = onCommLackHandle_(toLabel, userId);
690             LOGI("[CommAggr][AppReceive] On CommLack End."); // Log in case callback block this thread
691         } else {
692             LOGI("[CommAggr][AppReceive] CommLackHandle invalid currently.");
693         }
694     }
695     // Here we have to lock commMapMutex_ and search communicator again.
696     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
697     int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel);
698     if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK.
699         LOGI("[CommAggr][AppReceive] Communicator of %.3s found after try again(rare case).", VEC_TO_STR(toLabel));
700         return E_OK;
701     }
702     // Here, communicator is still not found, retain or discard according to the result of onCommLackHandle_
703     if (errCode != E_OK) {
704         TryToFeedbackWhenCommunicatorNotFound(srcTarget, toLabel, inFrameBuffer);
705         delete inFrameBuffer;
706         inFrameBuffer = nullptr;
707         return errCode; // The caller will display errCode in log
708     }
709     // Do Retention, the retainer is responsible to deal with the frame
710     retainer_.RetainFrame(FrameInfo{inFrameBuffer, srcTarget, toLabel, inResult.GetFrameId()});
711     inFrameBuffer = nullptr;
712     return E_OK;
713 }
714 
TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const LabelType & toLabel)715 int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget,
716     SerialBuffer *&inFrameBuffer, const LabelType &toLabel)
717 {
718     // Ignore nonactivated communicator, which is regarded as inexistent
719     if (commMap_.count(toLabel) != 0 && commMap_.at(toLabel).second) {
720         commMap_.at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer);
721         // Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return.
722         inFrameBuffer = nullptr;
723         return E_OK;
724     }
725     return -E_NOT_FOUND;
726 }
727 
RegCallbackToAdapter()728 int CommunicatorAggregator::RegCallbackToAdapter()
729 {
730     RefObject::IncObjRef(this); // Reference to be hold by adapter
731     int errCode = adapterHandle_->RegBytesReceiveCallback(
732         std::bind(&CommunicatorAggregator::OnBytesReceive, this, std::placeholders::_1, std::placeholders::_2,
733             std::placeholders::_3, std::placeholders::_4),
734         [this]() { RefObject::DecObjRef(this); });
735     if (errCode != E_OK) {
736         RefObject::DecObjRef(this); // Rollback in case reg failed
737         return errCode;
738     }
739 
740     RefObject::IncObjRef(this); // Reference to be hold by adapter
741     errCode = adapterHandle_->RegTargetChangeCallback(
742         std::bind(&CommunicatorAggregator::OnTargetChange, this, std::placeholders::_1, std::placeholders::_2),
743         [this]() { RefObject::DecObjRef(this); });
744     if (errCode != E_OK) {
745         RefObject::DecObjRef(this); // Rollback in case reg failed
746         return errCode;
747     }
748 
749     RefObject::IncObjRef(this); // Reference to be hold by adapter
750     errCode = adapterHandle_->RegSendableCallback(
751         std::bind(&CommunicatorAggregator::OnSendable, this, std::placeholders::_1),
752         [this]() { RefObject::DecObjRef(this); });
753     if (errCode != E_OK) {
754         RefObject::DecObjRef(this); // Rollback in case reg failed
755         return errCode;
756     }
757 
758     return E_OK;
759 }
760 
UnRegCallbackFromAdapter()761 void CommunicatorAggregator::UnRegCallbackFromAdapter()
762 {
763     adapterHandle_->RegBytesReceiveCallback(nullptr, nullptr);
764     adapterHandle_->RegTargetChangeCallback(nullptr, nullptr);
765     adapterHandle_->RegSendableCallback(nullptr, nullptr);
766 }
767 
GenerateLocalSourceId()768 void CommunicatorAggregator::GenerateLocalSourceId()
769 {
770     std::string identity;
771     adapterHandle_->GetLocalIdentity(identity);
772     // When GetLocalIdentity fail, the identity be an empty string, the localSourceId be zero, need regenerate
773     // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk
774     uint64_t identityHash = Hash::HashFunc(identity);
775     if (identityHash != localSourceId_) {
776         LOGI("[CommAggr][GenSrcId] identity=%s{private}, localSourceId=%" PRIu64, identity.c_str(), ULL(identityHash));
777     }
778     localSourceId_ = identityHash;
779 }
780 
ReGenerateLocalSourceIdIfNeed()781 bool CommunicatorAggregator::ReGenerateLocalSourceIdIfNeed()
782 {
783     // The deviceId will change when switch user from A to B
784     // We can't listen to the user change, because it's hard to ensure the timing is correct.
785     // So we regenerate to make sure the deviceId and localSourceId is correct when we create send task.
786     // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk, no need lockguard here.
787     GenerateLocalSourceId();
788     return (localSourceId_ != 0);
789 }
790 
TriggerVersionNegotiation(const std::string & dstTarget)791 void CommunicatorAggregator::TriggerVersionNegotiation(const std::string &dstTarget)
792 {
793     LOGI("[CommAggr][TrigVer] Do version negotiate with target=%s{private}.", dstTarget.c_str());
794     int errCode = E_OK;
795     SerialBuffer *buffer = ProtocolProto::BuildEmptyFrameForVersionNegotiate(errCode);
796     if (errCode != E_OK) {
797         LOGE("[CommAggr][TrigVer] Build empty frame fail, errCode=%d", errCode);
798         return;
799     }
800 
801     TaskConfig config{true, 0, Priority::HIGH};
802     errCode = ScheduleSendTask(dstTarget, buffer, FrameType::EMPTY, config);
803     if (errCode != E_OK) {
804         LOGE("[CommAggr][TrigVer] Send empty frame fail, errCode=%d", errCode);
805         // if send fails, free buffer, otherwise buffer will be taked over by SendTaskScheduler
806         delete buffer;
807         buffer = nullptr;
808     }
809 }
810 
TryToFeedbackWhenCommunicatorNotFound(const std::string & dstTarget,const LabelType & dstLabel,const SerialBuffer * inOriFrame)811 void CommunicatorAggregator::TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget,
812     const LabelType &dstLabel, const SerialBuffer *inOriFrame)
813 {
814     if (!isCommunicatorNotFoundFeedbackEnable_ || dstTarget.empty() || inOriFrame == nullptr) {
815         return;
816     }
817     int errCode = E_OK;
818     Message *message = ProtocolProto::ToMessage(inOriFrame, errCode, true);
819     if (message == nullptr) {
820         if (errCode == -E_VERSION_NOT_SUPPORT) {
821             TriggerVersionNegotiation(dstTarget);
822         }
823         return;
824     }
825     // Message is release in TriggerCommunicatorNotFoundFeedback
826     TriggerCommunicatorNotFoundFeedback(dstTarget, dstLabel, message);
827 }
828 
TriggerCommunicatorNotFoundFeedback(const std::string & dstTarget,const LabelType & dstLabel,Message * & oriMsg)829 void CommunicatorAggregator::TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget,
830     const LabelType &dstLabel, Message* &oriMsg)
831 {
832     if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) {
833         LOGI("[CommAggr][TrigNotFound] Do nothing for message with type not request.");
834         // Do not have to do feedback if the message is not a request type message
835         delete oriMsg;
836         oriMsg = nullptr;
837         return;
838     }
839 
840     LOGI("[CommAggr][TrigNotFound] Do communicator not found feedback with target=%s{private}.", dstTarget.c_str());
841     oriMsg->SetMessageType(TYPE_RESPONSE);
842     oriMsg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
843 
844     int errCode = E_OK;
845     SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, dstLabel, errCode);
846     delete oriMsg;
847     oriMsg = nullptr;
848     if (errCode != E_OK) {
849         LOGE("[CommAggr][TrigNotFound] Build communicator not found feedback frame fail, errCode=%d", errCode);
850         return;
851     }
852 
853     TaskConfig config{true, 0, Priority::HIGH};
854     errCode = ScheduleSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, config);
855     if (errCode != E_OK) {
856         LOGE("[CommAggr][TrigNotFound] Send communicator not found feedback frame fail, errCode=%d", errCode);
857         // if send fails, free buffer, otherwise buffer will be taked over by ScheduleSendTask
858         delete buffer;
859         buffer = nullptr;
860     }
861 }
862 
SetRemoteCommunicatorVersion(const std::string & target,uint16_t version)863 void CommunicatorAggregator::SetRemoteCommunicatorVersion(const std::string &target, uint16_t version)
864 {
865     std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
866     versionMap_[target] = version;
867 }
868 
GetExtendHeaderHandle(const ExtendInfo & paramInfo)869 std::shared_ptr<ExtendHeaderHandle> CommunicatorAggregator::GetExtendHeaderHandle(const ExtendInfo &paramInfo)
870 {
871     if (adapterHandle_ == nullptr) {
872         return nullptr;
873     }
874     return adapterHandle_->GetExtendHeaderHandle(paramInfo);
875 }
876 
InitSendThread()877 void CommunicatorAggregator::InitSendThread()
878 {
879     if (RuntimeContext::GetInstance()->GetThreadPool() != nullptr) {
880         return;
881     }
882     exclusiveThread_ = std::thread(&CommunicatorAggregator::SendDataRoutine, this);
883     useExclusiveThread_ = true;
884 }
885 
SendOnceData()886 void CommunicatorAggregator::SendOnceData()
887 {
888     SendTask taskToSend;
889     uint32_t totalLength = 0;
890     int errCode = scheduler_.ScheduleOutSendTask(taskToSend, totalLength);
891     if (errCode != E_OK) {
892         return; // Not possible to happen
893     }
894     // <vector, extendHeadSize>
895     std::vector<std::pair<std::vector<uint8_t>, uint32_t>> piecePackets;
896     uint32_t mtu = adapterHandle_->GetMtuSize(taskToSend.dstTarget);
897     errCode = ProtocolProto::SplitFrameIntoPacketsIfNeed(taskToSend.buffer, mtu, piecePackets);
898     if (errCode != E_OK) {
899         LOGE("[CommAggr] Split frame fail, errCode=%d.", errCode);
900         TaskFinalizer(taskToSend, errCode);
901         return;
902     }
903     // <addr, <extendHeadSize, totalLen>>
904     std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> eachPacket;
905     if (piecePackets.empty()) {
906         // Case that no need to split a frame, just use original buffer as a packet
907         std::pair<const uint8_t *, uint32_t> tmpEntry = taskToSend.buffer->GetReadOnlyBytesForEntireBuffer();
908         std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> entry;
909         entry.first = tmpEntry.first - taskToSend.buffer->GetExtendHeadLength();
910         entry.second.first = taskToSend.buffer->GetExtendHeadLength();
911         entry.second.second = tmpEntry.second + entry.second.first;
912         eachPacket.push_back(entry);
913     } else {
914         for (auto &entry : piecePackets) {
915             std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> tmpEntry = {&(entry.first[0]),
916                 {entry.second, entry.first.size()}};
917             eachPacket.push_back(tmpEntry);
918         }
919     }
920 
921     SendPacketsAndDisposeTask(taskToSend, mtu, eachPacket, totalLength);
922 }
923 
TriggerSendData()924 void CommunicatorAggregator::TriggerSendData()
925 {
926     if (useExclusiveThread_) {
927         std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
928         wakingSignal_ = true;
929         wakingCv_.notify_one();
930         return;
931     }
932     {
933         std::lock_guard<std::mutex> autoLock(scheduleSendTaskMutex_);
934         if (sendTaskStart_) {
935             return;
936         }
937         sendTaskStart_ = true;
938     }
939     RefObject::IncObjRef(this);
940     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
941         LOGI("[CommAggr] Send thread start.");
942         while (!shutdown_ && scheduler_.GetNoDelayTaskCount() != 0) {
943             SendOnceData();
944         }
945         {
946             std::lock_guard<std::mutex> autoLock(scheduleSendTaskMutex_);
947             sendTaskStart_ = false;
948         }
949         if (!shutdown_ && scheduler_.GetNoDelayTaskCount() != 0) {
950             TriggerSendData(); // avoid sendTaskStart_ was mark false after trigger thread check it
951         }
952         finalizeCv_.notify_one();
953         RefObject::DecObjRef(this);
954         LOGI("[CommAggr] Send thread end.");
955     });
956     if (errCode != E_OK) {
957         LOGW("[CommAggr] Trigger send data failed %d", errCode);
958         RefObject::DecObjRef(this);
959     }
960 }
961 
ResetFrameRecordIfNeed(const uint32_t frameId,const uint32_t mtu)962 void CommunicatorAggregator::ResetFrameRecordIfNeed(const uint32_t frameId, const uint32_t mtu)
963 {
964     std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
965     if (sendRecord_[frameId].splitMtu == 0u || sendRecord_[frameId].splitMtu != mtu) {
966         sendRecord_[frameId].splitMtu = mtu;
967         sendRecord_[frameId].sendIndex = 0u;
968     }
969 }
970 
971 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorAggregator)
972 } // namespace DistributedDB
973